Bulk API

注意: Java高级REST客户端提供批量处理器来协助大量请求

Bulk 请求

BulkRequest 可以被用在使用单个请求执行多个 索引,更新 和/或 删除 操作的情况下。

它要求至少要一个操作被添加到 Bulk 请求上:

BulkRequest request = new BulkRequest();  // 创建 BulkRequest
request.add(new IndexRequest("posts", "doc", "1")  // 添加第一个 IndexRequest 到 Bulk 请求上, 参看 [Index API](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-index.html) 获取更多关于如何构建 IndexRequest 的信息.
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")   // 添加第二个 IndexRequest
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")  // 添加第三个 IndexRequest
        .source(XContentType.JSON,"field", "baz"));

警告: Bulk API仅支持以JSON或SMILE编码的文档。 以任何其他格式提供文件将导致错误。

不同的操作类型也可以添加到同一个BulkRequest中:

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3")); //
Adds a DeleteRequest to the BulkRequest. See Delete API for more information on how to build DeleteRequest.
request.add(new UpdateRequest("posts", "doc", "2")
        .doc(XContentType.JSON,"other", "test")); //
Adds an UpdateRequest to the BulkRequest. See Update API for more information on how to build UpdateRequest.
request.add(new IndexRequest("posts", "doc", "4")   //Adds an IndexRequest using the SMILE format
        .source(XContentType.JSON,"field", "baz"));

可选参数

提供下列可选参数:

request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");
Timeout to wait for the bulk request to be performed as a TimeValue
Timeout to wait for the bulk request to be performed as a String
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");                            
Refresh policy as a WriteRequest.RefreshPolicy instance
Refresh policy as a String
request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL);
Sets the number of shard copies that must be active before proceeding with the index/update/delete operations.
Number of shard copies provided as a ActiveShardCount: can be ActiveShardCount.ALL, ActiveShardCount.ONE or ActiveShardCount.DEFAULT (default)

同步执行

BulkResponse bulkResponse = client.bulk(request);

异步执行

client.bulkAsync(request, new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        //Called when the execution is successfully completed. The response is provided as an argument and contains a list of individual results for each operation that was executed. Note that one or more operations might have failed while the others have been successfully executed.
    }
    @Override
    public void onFailure(Exception e) {
        //Called when the whole BulkRequest fails. In this case the raised exception is provided as an argument and no operation has been executed.
    }
});

Bulk 响应

返回的BulkResponse包含有关执行操作的信息,并允许对每个结果进行迭代,如下所示:

for (BulkItemResponse bulkItemResponse : bulkResponse) { //迭代所有操作的结果
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); //Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances
    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
                //    Handle the response of an index operation
        IndexResponse indexResponse = (IndexResponse) itemResponse;
    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
        //Handle the response of a update operation
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
        //    Handle the response of a delete operation
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

批量响应提供了一种快速检查一个或多个操作是否失败的方法:

if (bulkResponse.hasFailures()) { // 只要有一个操作失败了,这个方法就返回 true
}

在这种情况下,有必要迭代所有运算结果,以检查操作是否失败,如果是,则检索相应的故障:

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { //Indicate if a given operation failed
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); //Retrieve the failure of the failed operation
    }
}

Bulk 处理器

BulkProcessor通过提供允许索引/更新/删除操作在添加到处理器时透明执行的实用程序类来简化Bulk API的使用。

为了执行请求,BulkProcessor需要3个组件:

  • RestHighLevelClient

这个客户端用来执行 BulkRequest 并接收 BulkResponse 。

  • BulkProcessor.Listener

这个监听器会在每个 BulkRequest 执行之前和之后被调用,或者当 BulkRequest 失败时调用。

  • ThreadPool

BulkRequest执行是使用这个池的线程完成的,允许BulkProcessor以非阻塞的方式工作,并允许在批量请求执行的同时接受新的索引/更新/删除请求。

然后 BulkProcessor.Builder 类可以被用来构建新的 BulkProcessor :

ThreadPool threadPool = new ThreadPool(settings); // 使用已有的 Settings 对象创建 ThreadPool。
BulkProcessor.Listener listener = new BulkProcessor.Listener() { // 创建 BulkProcessor.Listener
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        //This method is called before each execution of a BulkRequest
    }
    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        //    This method is called after each execution of a BulkRequest
    }
    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        //This method is called when a BulkRequest failed
    }
};
BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
        .build(); // 通过调用 BulkProcessor.Builder 的 build() 方法创建 BulkProcessor。 RestHighLevelClient.bulkAsync() 将被用来执行 BulkRequest。

BulkProcessor.Builder 提供了方法来配置 BulkProcessor 应该如何处理请求的执行:

BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool);
builder.setBulkActions(500);  //Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));  //    Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it)
builder.setConcurrentRequests(0); //Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //    Set a flush interval flushing any BulkRequest pending if the interval passes (defaults to not set)
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //Set a constant back off policy that initially waits for 1 second and retries up to 3 times. See BackoffPolicy.noBackoff(), BackoffPolicy.constantBackoff() and BackoffPolicy.exponentialBackoff() for more options.

一旦创建了BulkProcessor,可以向其添加请求:

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
        .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
        .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");
bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

这些请求将由 BulkProcessor 执行,它负责为每个批量请求调用 BulkProcessor.Listener 。 监听器提供方法接收 BulkResponse 和 BulkResponse :

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); //Called before each execution of a BulkRequest, this method allows to know the number of operations that are going to be executed within the BulkRequest
        logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
    }
    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        if (response.hasFailures()) {
            //在每个 BulkRequest 执行之后调用,此方法允许获知 BulkResponse 是否包含错误
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
        }
    }
    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.error("Failed to execute bulk", failure); //如果 BulkRequest 执行失败则调用,此方法可获知失败情况。
    }
};

一旦将所有请求都添加到BulkProcessor,其实例需要使用两种可用的关闭方法之一关闭。

一旦所有请求都被添加到了 BulkProcessor, 它的实例就需要使用两种可用的关闭方法之一进行关闭。

awaitClose() 可以被用来等待到所有请求都被处理,或者到指定的等待时间:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

如果所有批量请求完成,则该方法返回 true ,如果在完成所有批量请求之前等待时间过长,则返回 false 。

close() 方法可以被用来立即关闭 BulkProcessor :

bulkProcessor.close();

在关闭处理器之前,两个方法都会刷新已经被天教导处理器的请求,并禁止添加任何新的请求。

results matching ""

    No results matching ""