文章
问答
冒泡
使用BulkProcessor对elasticsearch批量操作

一般情况下,我们在使用elasticsearch的时候,会用rest api去操作。这样的操作相对比较直观,就好像我们操作数据库一样。但是,由于es大部分情况下,是用来做业务数据的,比如从数据库中把数据聚合之后存到es中,用户再到es里进行检索。这个场景很多时候,其实是批量操作的。这里我们使用RestHighLevelClient 和 BulkProcessor来实现批量操作。
首先,创建BulkProcessor

@RequiredArgsConstructor
@Slf4j
@Component
public class ElasticBulk {

    private final RestHighLevelClient client;

    public BulkProcessor bulkProcessor() {
        BulkProcessor.Listener listener = new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                int numberOfActions = request.numberOfActions();
                log.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (response.hasFailures()) {
                    log.warn("Bulk [{}] executed with failures", executionId);
                } else {
                    log.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                log.error("Failed to execute bulk", failure);
            }
        };

        BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder(
                (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
                listener);
        //根据当前添加的操作数量设置刷新新bulk请求的时间(默认为1000,使用-1禁用它)
        bulkProcessorBuilder.setBulkActions(500);
        //根据当前添加的操作大小设置刷新新bulk请求的时间(默认为5Mb,使用-1禁用)
        bulkProcessorBuilder.setBulkSize(new ByteSizeValue(10, ByteSizeUnit.MB));
        //设置允许执行的并发请求数量(默认为1,使用0只允许执行单个请求)
        bulkProcessorBuilder.setConcurrentRequests(1);
        //设置刷新间隔,如果间隔通过,则刷新任何挂起的BulkRequest(默认为未设置)
        bulkProcessorBuilder.setFlushInterval(TimeValue.timeValueSeconds(30L));
        //设置一个常量后退策略,该策略最初等待1秒并重试最多3次
        bulkProcessorBuilder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
        return bulkProcessorBuilder.build();
    }

}





我看到很多文章是直接把BulkProcessor注册成了bean,这种做法是不推荐的,因为操作完成之后,需要进行close,要不然时间久了要出问题。

写一个方法来执行批量插入。由于BulkProcessor实现了Closeable所以直接用try来做,否则的话 就自己来执行close方法。

@Slf4j
@RequiredArgsConstructor
@Component
public class BulkService {

    private final ElasticBulk elasticBulk;

    public void add(){
        try (BulkProcessor bulkProcessor = elasticBulk.bulkProcessor()){
            for (int i=0;i<1000;i++){
                String id = new Random().toString()+"i";
                Map<String,String> user = new HashMap<>();
                user.put("id",id);
                user.put("name",id+"name");
                IndexRequest request = new IndexRequest().index("test").id(id).source(user);
                bulkProcessor.add(request);
            }
        };
    }
}





测试

public class BulkServiceTests extends BaseTests {

    @Autowired
    private BulkService bulkService;

    @Test
    public void add(){
        bulkService.add();
        bulkService.add();
    }

}





执行成功

elasticsearch

关于作者

落雁沙
非典型码农
获得点赞
文章被阅读