一般情况下,我们在使用elasticsearch的时候,会用rest api去操作。这样的操作相对比较直观,就好像我们操作数据库一样。但是,由于es大部分情况下,是用来做业务数据的,比如从数据库中把数据聚合之后存到es中,用户再到es里进行检索。这个场景很多时候,其实是批量操作的。这里我们使用RestHighLevelClient 和 BulkProcessor来实现批量操作。
首先,创建BulkProcessor
@RequiredArgsConstructor
@Slf4j
@Component
public class ElasticBulk {
private final RestHighLevelClient client;
public BulkProcessor bulkProcessor() {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
log.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
log.warn("Bulk [{}] executed with failures", executionId);
} else {
log.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
log.error("Failed to execute bulk", failure);
}
};
BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
listener);
//根据当前添加的操作数量设置刷新新bulk请求的时间(默认为1000,使用-1禁用它)
bulkProcessorBuilder.setBulkActions(500);
//根据当前添加的操作大小设置刷新新bulk请求的时间(默认为5Mb,使用-1禁用)
bulkProcessorBuilder.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB));
//设置允许执行的并发请求数量(默认为1,使用0只允许执行单个请求)
bulkProcessorBuilder.setConcurrentRequests(1);
//设置刷新间隔,如果间隔通过,则刷新任何挂起的BulkRequest(默认为未设置)
bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueSeconds(30L));
//设置一个常量后退策略,该策略最初等待1秒并重试最多3次
bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
return bulkProcessorBuilder.build();
}
}
我看到很多文章是直接把BulkProcessor注册成了bean,这种做法是不推荐的,因为操作完成之后,需要进行close,要不然时间久了要出问题。
写一个方法来执行批量插入。由于BulkProcessor实现了Closeable所以直接用try来做,否则的话 就自己来执行close方法。
@Slf4j
@RequiredArgsConstructor
@Component
public class BulkService {
private final ElasticBulk elasticBulk;
public void add(){
try (BulkProcessor bulkProcessor = elasticBulk.bulkProcessor()){
for (int i=0;i<1000;i++){
String id = new Random().toString()+"i";
Map<String,String> user = new HashMap<>();
user.put("id",id);
user.put("name",id+"name");
IndexRequest request = new IndexRequest().index("test").id(id).source(user);
bulkProcessor.add(request);
}
};
}
}
测试
public class BulkServiceTests extends BaseTests {
@Autowired
private BulkService bulkService;
@Test
public void add(){
bulkService.add();
bulkService.add();
}
}
执行成功