Update API

更新请求

UpdateRequest 要求下列参数:

UpdateRequest request = new UpdateRequest(
        "posts", // Index
        "doc",  // Type
        "1"); // Document id

Update API允许通过使用脚本或传递部分文档来更新现有文档。

使用脚本更新

该脚本可以是一个内联脚本:

Map<String, Object> parameters = singletonMap("count", 4); // 脚本参数以一个 Map 对象提供。
Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.field += params.count", parameters);  // 使用 painless 语言创建内联脚本,并传入参数值。
request.script(inline);  //  将脚本传递给更新请求对象

或者是一个存储脚本:

// 引用一个名为 increment-field 的painless 的存储脚本
Script stored =
        new Script(ScriptType.STORED, "painless", "increment-field", parameters);   
request.script(stored); // 给请求设置脚本

使用局部文档更新

使用局部文档更新时,局部文档将与现有文档合并。

局部文档可以以不同的方式提供:

UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON); // 以一个 JSON 格式的字符串提供的局部文档源
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(jsonMap); // 以一个可自动转换为 JSON 格式的 Map 提供局部文档源
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(builder); // 以 XContentBuilder 对象提供局部文档源, Elasticsearch 构建辅助器将生成 JSON 格式。
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); // 以 Object 键值对提供局部文档源,它将被转换为 JSON 格式。

更新或插入

如果文档不存在,可以使用upsert方法定义一些将作为新文档插入的内容:

String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);  // 以字符串提供更新插入的文档源
与局部文档更新类似,可以使用接受 String, Map , XContentBuilder 或 Object 键值对的方式使用upsert 方法更新或插入文档的内容。

可选参数

提供下列可选参数:

request.routing("routing");  // 路由值
request.parent("parent");  //Parent 值
request.timeout(TimeValue.timeValueMinutes(2)); // TimeValue 类型的等待主分片可用的超时时间
request.timeout("2m");  // 字符串类型的等待主分片可用的超时时间
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);  // Refresh policy as a WriteRequest.RefreshPolicy instance
request.setRefreshPolicy("wait_for");  // Refresh policy as a String
request.retryOnConflict(3);  //    How many times to retry the update operation if the document to update has been changed by another operation between the get and indexing phases of the update operation
request.fetchSource(true); //Enable source retrieval, disabled by default
request.version(2); // 版本号
request.detectNoop(false);  // Disable the noop detection
request.scriptedUpsert(true); // Indicate that the script must run regardless of whether the document exists or not, ie the script takes care of creating the document if it does not already exist.
request.docAsUpsert(true); // Indicate that the partial document must be used as the upsert document if it does not exist yet.
request.waitForActiveShards(2);  //Sets the number of shard copies that must be active before proceeding with the update operation.
request.waitForActiveShards(ActiveShardCount.ALL); //Number of shard copies provided as a ActiveShardCount: can be ActiveShardCount.ALL, ActiveShardCount.ONE or ActiveShardCount.DEFAULT (default)
String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes)); // Configure source inclusion for specific fields
String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(new FetchSourceContext(true, includes, excludes)); //Configure source exclusion for specific fields

同步执行

UpdateResponse updateResponse = client.update(request);

异步执行

client.updateAsync(request, new ActionListener<UpdateResponse>() {
    @Override
    public void onResponse(UpdateResponse updateResponse) {
        //    Called when the execution is successfully completed. The response is provided as an argument.
    }
    @Override
    public void onFailure(Exception e) {
        // Called in case of failure. The raised exception is provided as an argument.
    }
});

更新响应

返回的UpdateResponse允许获取执行操作的相关信息,如下所示:

String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
//Handle the case where the document was created for the first time (upsert)
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//    Handle the case where the document was updated
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
//Handle the case where the document was deleted
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
//Handle the case where the document was not impacted by the update, ie no operation (noop) was executed on the document
}

当通过 fetchSource 方法在UpdateRequest 里设置了启用接收源,相应对象将包含被更新的文档源:

GetResult result = updateResponse.getGetResult(); //Retrieve the updated document as a GetResult
if (result.isExists()) {
    String sourceAsString = result.sourceAsString(); //Retrieve the source of the updated document as a String
    Map<String, Object> sourceAsMap = result.sourceAsMap(); //Retrieve the source of the updated document as a Map<String, Object>
    byte[] sourceAsBytes = result.source(); //Retrieve the source of the updated document as a byte[]
} else {
    //Handle the scenario where the source of the document is not present in the response (this is the case by default)
}

这也可以用了检查分片故障:

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    //Handle the situation where number of successful shards is less than total shards
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); //Handle the potential failures
    }
}

当对一个不存在的文档执行 UpdateRequest 时,响应将包含 404 状态码,并抛出一个需要如下所示处理的 ElasticsearchException 异常:

UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist").doc("field", "value");
try {
    UpdateResponse updateResponse = client.update(request);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 处理由于文档不存在导致的异常。
    }
}

如果有文档版本冲突,也会抛出 ElasticsearchException:

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("field", "value")
        .version(1);
try {
    UpdateResponse updateResponse = client.update(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 表明异常是由于返回来了版本冲突错误导致的
    }
}

results matching ""

    No results matching ""