Java源码示例:io.searchbox.core.Bulk
示例1
@Override
public JestResult deleteBulk(String clustName, String indexName, String Type, List<SearchResultDetailVO> results) {
JestResult result = null ;
try {
Bulk.Builder bulkBulder = new Bulk.Builder().defaultIndex(indexName).defaultType(Type);
if(CollectionUtils.isNotEmpty(results)){
for (SearchResultDetailVO resultDetailVO:results){
bulkBulder.addAction(new Delete.Builder(resultDetailVO.getId()).index(indexName).type(Type).build());
}
}
result = JestManager.getJestClient(clustName).execute(bulkBulder.build());
} catch (Exception e) {
e.printStackTrace();
LOGGER.error("deleteBulk失败:",e);
}
return result ;
}
示例2
private long doSync( List<DataMap> list,String table,String pk) throws IOException {
long posi=0;
//List<Map> list2=new ArrayList<>();
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(Const.ES_TYPE);
for(DataMap dataMap:list){
long id=dataMap.getLong(pk);
if(id>posi) posi=id;
Map map=(convertMysql2Es(dataMap));
logger.info("[full] {}={}",table,map);
Index index = new Index.Builder(map).id(""+id).build();
bulk.addAction(index);
}
BulkResult br = jest.getJestClient().execute(bulk.build());
if(!br.isSucceeded()){
logger.error("error={}, failItems={}",br.getErrorMessage(), JSON.toJSONString(br.getFailedItems()));
// br.getFailedItems().get(0).
throw new RuntimeException("bulk error");
}
return posi;
}
示例3
@Override
public BatchBuilder<Bulk> createBatchBuilder() {
return new BatchBuilder<Bulk>() {
private final BufferedBulk.Builder builder = new BufferedBulk.Builder()
.withBuffer(pooledItemSourceFactory.createEmptySource())
.withObjectWriter(objectWriter)
.withObjectReader(objectReader);
@Override
public void add(Object item) {
builder.addAction((BulkableAction) item);
}
@Override
public Bulk build() {
return builder.build();
}
};
}
示例4
@Override
public Function<Bulk, Boolean> createFailureHandler(FailoverPolicy failover) {
return new Function<Bulk, Boolean>() {
private final JestBatchIntrospector introspector = new JestBatchIntrospector();
@Override
public Boolean apply(Bulk bulk) {
Collection items = introspector.items(bulk);
LOG.warn(String.format("Batch of %s items failed. Redirecting to %s",
items.size(),
failover.getClass().getName()));
items.forEach(item -> {
Index failedAction = (Index) item;
failover.deliver(failedItemOps.createItem(failedAction));
});
return true;
}
};
}
示例5
protected JestResultHandler<JestResult> createResultHandler(Bulk bulk, Function<Bulk, Boolean> failureHandler) {
return new JestResultHandler<JestResult>() {
@Override
public void completed(JestResult result) {
backoffPolicy.deregister(bulk);
if (!result.isSucceeded()) {
LOG.warn(result.getErrorMessage());
failureHandler.apply(bulk);
}
}
@Override
public void failed(Exception ex) {
LOG.warn(ex.getMessage(), ex);
backoffPolicy.deregister(bulk);
failureHandler.apply(bulk);
}
};
}
示例6
@Override
public Function<Bulk, Boolean> createFailureHandler(FailoverPolicy failover) {
return bulk -> {
BufferedBulk bufferedBulk = (BufferedBulk)bulk;
LOG.warn(String.format("Batch of %s items failed. Redirecting to %s", bufferedBulk.getActions().size(), failover.getClass().getName()));
try {
bufferedBulk.getActions().stream()
.map(item -> failedItemOps.createItem(((BufferedIndex) item)))
.forEach(failover::deliver);
return true;
} catch (Exception e) {
LOG.error("Unable to execute failover", e);
return false;
}
};
}
示例7
@Override
public BatchBuilder<Bulk> createBatchBuilder() {
return new BatchBuilder<Bulk>() {
private final Bulk.Builder builder = new ExtendedBulk.Builder();
@Override
public void add(Object item) {
builder.addAction((BulkableAction) item);
}
@Override
public Bulk build() {
return builder.build();
}
};
}
示例8
@Test
public void builderAddAddsSingleElement() {
// given
ExtendedBulk.Builder builder = new ExtendedBulk.Builder();
BatchIntrospector<Bulk> introspector = new JestBatchIntrospector();
String source = UUID.randomUUID().toString();
Index action = new Index.Builder(source).build();
// when
builder.addAction(action);
// then
ExtendedBulk bulk = builder.build();
assertEquals(1, introspector.items(bulk).size());
}
示例9
@Test
public void builderAddCollectionAddsAllElements() {
// given
ExtendedBulk.Builder builder = new ExtendedBulk.Builder();
BatchIntrospector<Bulk> introspector = new JestBatchIntrospector();
String source = UUID.randomUUID().toString();
Index action = new Index.Builder(source).build();
int randomSize = new Random().nextInt(1000) + 10;
Collection<BulkableAction> actions = new ArrayList<>(randomSize);
for (int ii = 0; ii < randomSize; ii++) {
actions.add(action);
}
// when
builder.addAction(actions);
// then
ExtendedBulk bulk = builder.build();
assertEquals(randomSize, introspector.items(bulk).size());
}
示例10
@Test
public void configReturnsACopyOfServerUrisList() {
// given
Builder builder = createTestObjectFactoryBuilder();
builder.withServerUris("http://localhost:9200;http://localhost:9201;http://localhost:9202");
ClientObjectFactory<JestClient, Bulk> config = builder.build();
// when
Collection<String> serverUrisList = config.getServerList();
serverUrisList.add("test");
// then
assertNotEquals(serverUrisList, config.getServerList());
}
示例11
@Test
public void failureHandlerExecutesFailoverForEachBatchItemSeparately() {
// given
Builder builder = createTestObjectFactoryBuilder();
ClientObjectFactory<JestClient, Bulk> config = builder.build();
FailoverPolicy failoverPolicy = Mockito.spy(new NoopFailoverPolicy());
String payload1 = "test1";
String payload2 = "test2";
Bulk bulk = new Bulk.Builder()
.addAction(spy(new Index.Builder(payload1)).build())
.addAction(spy(new Index.Builder(payload2)).build())
.build();
// when
config.createFailureHandler(failoverPolicy).apply(bulk);
// then
ArgumentCaptor<FailedItemSource> captor = ArgumentCaptor.forClass(FailedItemSource.class);
verify(failoverPolicy, times(2)).deliver(captor.capture());
assertTrue(captor.getAllValues().get(0).getSource().equals(payload1));
assertTrue(captor.getAllValues().get(1).getSource().equals(payload2));
}
示例12
@Test
public void clientIsCalledWhenListenerIsNotified() {
// given
Builder builder = createTestObjectFactoryBuilder();
ClientObjectFactory<JestClient, Bulk> config = spy(builder.build());
JestClient mockedJestClient = mock(JestClient.class);
when(config.createClient()).thenReturn(mockedJestClient);
FailoverPolicy failoverPolicy = spy(new NoopFailoverPolicy());
Function<Bulk, Boolean> listener = config.createBatchListener(failoverPolicy);
String payload1 = "test1";
String payload2 = "test2";
Bulk bulk = createTestBatch(payload1, payload2);
// when
listener.apply(bulk);
// then
ArgumentCaptor<Bulk> captor = ArgumentCaptor.forClass(Bulk.class);
verify(mockedJestClient, times(1)).executeAsync((Bulk) captor.capture(), Mockito.any());
assertEquals(bulk, captor.getValue());
}
示例13
@Test
public void failoverIsExecutedAfterNonSuccessfulRequest() {
// given
Builder builder = createTestObjectFactoryBuilder();
JestHttpObjectFactory config = spy(builder.build());
String payload1 = "test1";
String payload2 = "test2";
Bulk bulk = createTestBatch(payload1, payload2);
Function<Bulk, Boolean> failoverHandler = mock(Function.class);
JestResultHandler<JestResult> resultHandler = config.createResultHandler(bulk, failoverHandler);
JestResult result = mock(JestResult.class);
when(result.isSucceeded()).thenReturn(false);
// when
resultHandler.completed(result);
// then
ArgumentCaptor<Bulk> captor = ArgumentCaptor.forClass(Bulk.class);
verify(failoverHandler, times(1)).apply(captor.capture());
assertEquals(bulk, captor.getValue());
}
示例14
@Test
public void failoverIsNotExecutedAfterSuccessfulRequest() {
// given
Builder builder = createTestObjectFactoryBuilder();
JestHttpObjectFactory config = spy(builder.build());
String payload1 = "test1";
String payload2 = "test2";
Bulk bulk = createTestBatch(payload1, payload2);
Function<Bulk, Boolean> failoverHandler = mock(Function.class);
JestResultHandler<JestResult> resultHandler = config.createResultHandler(bulk, failoverHandler);
JestResult result = mock(JestResult.class);
when(result.isSucceeded()).thenReturn(true);
// when
resultHandler.completed(result);
// then
verify(failoverHandler, never()).apply(Mockito.any(Bulk.class));
}
示例15
@Test
public void failoverIsExecutedAfterFailedRequest() {
// given
Builder builder = createTestObjectFactoryBuilder();
JestHttpObjectFactory config = spy(builder.build());
String payload1 = "test1";
String payload2 = "test2";
Bulk bulk = createTestBatch(payload1, payload2);
Function<Bulk, Boolean> failoverHandler = mock(Function.class);
JestResultHandler<JestResult> resultHandler = config.createResultHandler(bulk, failoverHandler);
// when
resultHandler.failed(new IOException());
// then
ArgumentCaptor<Bulk> captor = ArgumentCaptor.forClass(Bulk.class);
verify(failoverHandler, times(1)).apply(captor.capture());
assertEquals(bulk, captor.getValue());
}
示例16
@Test
public void failoverHandlerIsNotExecutedImmediatelyIfBackoffPolicyShouldNotApply() {
// given
BackoffPolicy<AbstractAction<BulkResult>> backoffPolicy = mock(BackoffPolicy.class);
when(backoffPolicy.shouldApply(any())).thenReturn(false);
Builder builder = createTestObjectFactoryBuilder();
builder.withBackoffPolicy(backoffPolicy);
JestHttpObjectFactory config = spy(builder.build());
String payload1 = "test1";
Bulk bulk = createTestBatch(payload1);
FailoverPolicy failoverPolicy = mock(FailoverPolicy.class);
Function<Bulk, Boolean> listener = config.createBatchListener(failoverPolicy);
// when
listener.apply(bulk);
// then
ArgumentCaptor<FailedItemSource> captor = ArgumentCaptor.forClass(FailedItemSource.class);
verify(failoverPolicy, never()).deliver(captor.capture());
}
示例17
@Test
public void configReturnsACopyOfServerUrisList() {
// given
BufferedJestHttpObjectFactory.Builder builder = createTestObjectFactoryBuilder();
builder.withServerUris("http://localhost:9200;http://localhost:9201;http://localhost:9202");
ClientObjectFactory<JestClient, Bulk> config = builder.build();
// when
Collection<String> serverUrisList = config.getServerList();
serverUrisList.add("test");
// then
assertNotEquals(serverUrisList, config.getServerList());
}
示例18
@Test
public void clientIsCalledWhenListenerIsNotified() {
// given
BufferedJestHttpObjectFactory.Builder builder = createTestObjectFactoryBuilder();
ClientObjectFactory<JestClient, Bulk> config = spy(builder.build());
JestClient mockedJestClient = mock(JestClient.class);
when(config.createClient()).thenReturn(mockedJestClient);
FailoverPolicy failoverPolicy = spy(new NoopFailoverPolicy());
Function<Bulk, Boolean> listener = config.createBatchListener(failoverPolicy);
ItemSource<ByteBuf> payload1 = createDefaultTestBuffereItemSource("test1");
ItemSource<ByteBuf> payload2 = createDefaultTestBuffereItemSource("test2");
Bulk bulk = createTestBatch(payload1, payload2);
// when
listener.apply(bulk);
// then
ArgumentCaptor<Bulk> captor = ArgumentCaptor.forClass(Bulk.class);
verify(mockedJestClient, times(1)).executeAsync((Bulk) captor.capture(), Mockito.any());
assertEquals(bulk, captor.getValue());
}
示例19
@Test
public void failoverIsNotExecutedAfterSuccessfulRequest() {
// given
BufferedJestHttpObjectFactory.Builder builder = createTestObjectFactoryBuilder();
BufferedJestHttpObjectFactory config = spy(builder.build());
ItemSource<ByteBuf> payload1 = createDefaultTestBuffereItemSource("test1");
ItemSource<ByteBuf> payload2 = createDefaultTestBuffereItemSource("test2");
Bulk bulk = createTestBatch(payload1, payload2);
Function<Bulk, Boolean> failoverHandler = mock(Function.class);
JestResultHandler<JestResult> resultHandler = config.createResultHandler(bulk, failoverHandler);
JestResult result = mock(JestResult.class);
when(result.isSucceeded()).thenReturn(true);
// when
resultHandler.completed(result);
// then
verify(failoverHandler, never()).apply(Mockito.any(Bulk.class));
}
示例20
@Test
public void failoverIsExecutedAfterFailedRequest() {
// given
BufferedJestHttpObjectFactory.Builder builder = createTestObjectFactoryBuilder();
BufferedJestHttpObjectFactory config = spy(builder.build());
ItemSource<ByteBuf> payload1 = createDefaultTestBuffereItemSource("test1");
ItemSource<ByteBuf> payload2 = createDefaultTestBuffereItemSource("test2");
Bulk bulk = createTestBatch(payload1, payload2);
Function<Bulk, Boolean> failoverHandler = mock(Function.class);
JestResultHandler<JestResult> resultHandler = config.createResultHandler(bulk, failoverHandler);
// when
resultHandler.failed(new IOException());
// then
ArgumentCaptor<Bulk> captor = ArgumentCaptor.forClass(Bulk.class);
verify(failoverHandler, times(1)).apply(captor.capture());
verify((BufferedBulk)bulk, times(1)).completed();
assertEquals(bulk, captor.getValue());
}
示例21
@Test
public void bulkContainsAddedStringItem() {
// given
BatchOperations<Bulk> bulkOperations = JestHttpObjectFactoryTest.createTestObjectFactoryBuilder().build().createBatchOperations();
BatchBuilder<Bulk> batchBuilder = bulkOperations.createBatchBuilder();
String testPayload = "{ \"testfield\": \"testvalue\" }";
Index item = (Index) bulkOperations.createBatchItem("testIndex", testPayload);
// when
batchBuilder.add(item);
Bulk bulk = batchBuilder.build();
// then
JestBatchIntrospector introspector = new JestBatchIntrospector();
AbstractAction action = (AbstractAction) introspector.items(bulk).iterator().next();
assertEquals(testPayload, introspector.itemIntrospector().getPayload(action));
}
示例22
@Test
public void bulkContainsAddedSourceItem() {
// given
BatchOperations<Bulk> bulkOperations = JestHttpObjectFactoryTest.createTestObjectFactoryBuilder().build().createBatchOperations();
BatchBuilder<Bulk> batchBuilder = bulkOperations.createBatchBuilder();
String testPayload = "{ \"testfield\": \"testvalue\" }";
StringItemSource itemSource = spy(new StringItemSource(testPayload));
Index item = (Index) bulkOperations.createBatchItem("testIndex", itemSource);
// when
batchBuilder.add(item);
Bulk bulk = batchBuilder.build();
// then
verify(itemSource, times(2)).getSource();
JestBatchIntrospector introspector = new JestBatchIntrospector();
AbstractAction action = (AbstractAction) introspector.items(bulk).iterator().next();
assertEquals(testPayload, introspector.itemIntrospector().getPayload(action));
}
示例23
@Test
public void defaultJestBulkOperationsSetsDefaultMappingType() {
// given
BatchOperations<Bulk> bulkOperations = new JestBulkOperations();
String testPayload = "{ \"testfield\": \"testvalue\" }";
StringItemSource itemSource = spy(new StringItemSource(testPayload));
Index item = (Index) bulkOperations.createBatchItem("testIndex", itemSource);
// when
String type = item.getType();
// then
assertEquals("index", type);
}
示例24
@Test
public void mappingTypeCanBeSet() {
// given
String expectedMappingType = UUID.randomUUID().toString();
BatchOperations<Bulk> bulkOperations = new JestBulkOperations(expectedMappingType);
String testPayload = "{ \"testfield\": \"testvalue\" }";
StringItemSource itemSource = spy(new StringItemSource(testPayload));
Index item = (Index) bulkOperations.createBatchItem("testIndex", itemSource);
// when
String type = item.getType();
// then
assertEquals(expectedMappingType, type);
}
示例25
@Test
public void executeAsyncDelegatesToConfiguredAsyncClient() {
// given
BufferedJestHttpClient client = spy(createDefaultTestHttpClient());
CloseableHttpAsyncClient asyncClient = mockAsyncClient(client);
Bulk bulk = createDefaultTestBufferedBulk();
// when
client.executeAsync(bulk, createMockTestResultHandler());
// then
verify(client).getAsyncClient();
verify(asyncClient).execute(any(HttpUriRequest.class), any());
}
示例26
public void indexLocations(List<Location> locations) {
if (locations != null && !locations.isEmpty()) {
log.info("Indexing locations with Elasticsearch Jest....");
Bulk.Builder builder = new Bulk.Builder();
for (Location location : locations) {
builder.addAction(new Index.Builder(location).index(LOCATION_INDEX_NAME).type(LOCATION_INDEX_TYPE).build());
}
Bulk bulk = builder.build();
try {
JestResult result = jestClient.execute(bulk);
log.info("Bulk search success: " + result.isSucceeded());
} catch (Exception e) {
log.error(e.getMessage(), e);
}
log.info(String.format("Indexed %d documents", locations.size()));
}
}
示例27
/**
* Process the next item in the queue.
*/
protected void processQueue() {
try {
Collection<RequestMetric> batch = new ArrayList<>(this.batchSize);
RequestMetric rm = queue.take();
batch.add(rm);
queue.drainTo(batch, this.batchSize - 1);
Builder builder = new Bulk.Builder();
for (RequestMetric metric : batch) {
Index index = new Index.Builder(metric).refresh(false)
.index(getIndexName())
.type("request").build(); //$NON-NLS-1$
builder.addAction(index);
}
BulkResult result = getClient().execute(builder.build());
if (!result.isSucceeded()) {
logger.warn("Failed to add metric(s) to ES"); //$NON-NLS-1$
}
} catch (Exception e) {
logger.warn("Error adding metric to ES"); //$NON-NLS-1$
return;
}
}
示例28
/**
* 批量新增数据
* @param indexName
* @param typeName
* @param objs
* @return
* @throws Exception
*/
public static boolean insertBatch(JestClient jestClient,String indexName, String typeName, List<Object> objs) throws Exception {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
for (Object obj : objs) {
Index index = new Index.Builder(obj).build();
bulk.addAction(index);
}
BulkResult br = jestClient.execute(bulk.build());
return br.isSucceeded();
}
示例29
/**
* 批量新增数据
* @param indexName
* @param typeName
* @param objs
* @return
* @throws Exception
*/
public static boolean insertBatch(JestClient jestClient,String indexName, String typeName, List<Object> objs) throws Exception {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(typeName);
for (Object obj : objs) {
Index index = new Index.Builder(obj).build();
bulk.addAction(index);
}
BulkResult br = jestClient.execute(bulk.build());
return br.isSucceeded();
}
示例30
private void doSync(List<ConsumerRecord> records) throws Exception {
Bulk.Builder bulk = new Bulk.Builder().defaultIndex(indexName).defaultType(Const.ES_TYPE);
for (ConsumerRecord record : records) {
logger.info("[incr] {}={}",indexName,record.value());
Row row = JSON.parseObject(record.value(), Row.class);
if(columnMap==null){
columnMap=databaseService.getColumnMap(row.getDatabase(),row.getTable());
}
String id = record.key();
if (row.getType().equalsIgnoreCase("insert") || (row.getType().equalsIgnoreCase("update"))) {
LinkedHashMap<String, Object> data = row.getData();
Map map = (convertKafka2Es(data));
Index index = new Index.Builder(map).id(id).build();
bulk.addAction(index);
} else if (row.getType().equalsIgnoreCase("delete")) {
Delete delete = new Delete.Builder(id).build();
bulk.addAction(delete);
} else {
//
}
}
BulkResult br = jest.getJestClient().execute(bulk.build());
if (!br.isSucceeded()) {
logger.error("error={}, failItems={}", br.getErrorMessage(), JSON.toJSONString(br.getFailedItems()));
// br.getFailedItems().get(0).
throw new RuntimeException("bulk error");
}
// buffer.add(record);
}