Elasticsearch Java API 很全的整理

来源:https://www.cnblogs.com/laoqing/archive/2019/10/17/11693144.html
-Advertisement-
Play Games

Elasticsearch 的API 分為 REST Client API(http請求形式)以及 transportClient API兩種。相比來說transportClient API效率更高,transportClient 是通過Elasticsearch內部RPC的形式進行請求的,連接可以 ...


Elasticsearch 的API 分為 REST Client API(http請求形式)以及 transportClient API兩種。相比來說transportClient API效率更高,transportClient 是通過Elasticsearch內部RPC的形式進行請求的,連接可以是一個長連接,相當於是把客戶端的請求當成

Elasticsearch 集群的一個節點。但是從Elasticsearch 7 後就會移除transportClient 。主要原因是transportClient 難以向下相容版本。

本文中所有的講解和操作都是基於jdk 1.8 和elasticsearch 6.2.4版本。

備註:本文參考了很多Elasticsearch 的官方文檔以及部l網路資料做的綜合整理。

一、High REST Client

High Client 基於 Low Client, 主要目的是暴露一些 API,這些 API 可以接受請求對象為參數,返迴響應對象,而對請求和響應細節的處理都是由 client 自動完成的。

API 在調用時都可以是同步或者非同步兩種形式
同步 API 會導致阻塞,一直等待數據返回
非同步 API 在命名上會加上 async 尾碼,需要有一個 listener 作為參數,等這個請求返回結果或者發生錯誤時,這個 listener 就會被調用,listener主要是解決自動回調的問題,有點像安卓 開發裡面的listener監聽回調。

Elasticsearch REST APi 官方 地址:https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html

Maven 依賴

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>

client初始化:

RestHighLevelClient 實例依賴 REST low-level client builder

public class ElasticSearchClient {
private String[] hostsAndPorts;

public ElasticSearchClient(String[] hostsAndPorts) {
this.hostsAndPorts = hostsAndPorts;
}
public RestHighLevelClient getClient() {
        RestHighLevelClient client = null;
        List<HttpHost> httpHosts = new ArrayList<HttpHost>();
        if (hostsAndPorts.length > 0) {
            for (String hostsAndPort : hostsAndPorts) {
                String[] hp = hostsAndPort.split(":");
                httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http"));
            }
            client = new RestHighLevelClient(
                    RestClient.builder(httpHosts.toArray(new HttpHost[0])));
        } else {
            client = new RestHighLevelClient(
                    RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
        }
        return client;
    }
}

 

文檔 API(High level rest 客戶端支持下麵的 文檔(Document) API):

  • 單文檔 API:
  • index API
  • Get API
  • Delete API
  • Update API
  • 多文檔 API:
  • Bulk API
  • Multi-Get API

1、Index API:
IndexRequest:
封裝好的參考方法:

private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) {
        IndexRequest indexRequest = null;
        if (null == index || null == indexType) {
            throw new ElasticsearchException("index or indexType must not be null");
        }
        if (null == docId) {
            indexRequest = new IndexRequest(index, indexType);
        } else {
            indexRequest = new IndexRequest(index, indexType, docId);
        }
        return indexRequest;
    }

    /**
     * 同步執行索引
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @throws IOException
     */
    public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap));
    }

    /**
     * 非同步執行
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param indexResponseActionListener
     * @throws IOException
     */
    public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException {
        getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener);
    }

API解釋:  

 

IndexRequest request = new IndexRequest(
        "posts",  // 索引 Index
        "doc",  // Type 
        "1");  // 文檔 Document Id 
String jsonString = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
        "}";
request.source(jsonString, XContentType.JSON); // 文檔源格式為 json string

Document Source
document source 可以是下麵的格式

Map類型的輸入:

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(jsonMap);  // 會自動將 Map 轉換為 JSON 格式

XContentBuilder : 這是 Document Source 提供的幫助類,專門用來產生 json 格式的數據:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy");
    builder.timeField("postDate", new Date());
    builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(builder);

Object 鍵對:

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source("user", "kimchy",
                "postDate", new Date(),
                "message", "trying out Elasticsearch"); 

同步索引:

IndexResponse indexResponse = client.index(request);

非同步索引:非同步執行函數需要添加 listener, 而對於 index 而言,這個 listener 的類型就是 ActionListener

client.indexAsync(request, listener); 

非同步方法執行後會立刻返回,在索引操作執行完成後,ActionListener 就會被回調:

執行成功,調用 onResponse 函數
執行失敗,調用 onFailure 函數

ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

IndexResponse:
不管是同步回調還是非同步回調,如果調用成功,都會返回 IndexRespose 對象。 

String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
   // 文檔第一次創建 
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
   // 文檔之前已存在,當前是重寫
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功的分片數量少於總分片數量 
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason();  // 處理潛在的失敗信息
    }
}

在索引時有版本衝突的話,會拋出 ElasticsearchException

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .version(1); // 這裡是文檔版本號
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
       // 衝突了 
    }
}

如果將 opType 設置為 create, 而且如果索引的文檔與已存在的文檔在 index, type 和 id 上均相同,也會拋出衝突異常。

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        
    }
}

2、GET API
GET 請求
每個 GET 請求都必須需傳入下麵 3 個參數:

  • Index
  • Type
  • Document id
GetRequest getRequest = new GetRequest(
        "posts", 
        "doc",  
        "1");  

可選參數
下麵的參數都是可選的, 裡面的選項並不完整,如要獲取完整的屬性,請參考 官方文檔

不獲取源數據,預設是獲取的

request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); 

配置返回數據中包含指定欄位

String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext); 

配置返回數據中排除指定欄位

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext); 

實時 預設為 true

request.realtime(false);

版本

request.version(2); 

版本類型

request.versionType(VersionType.EXTERNAL);

同步執行

GetResponse getResponse = client.get(getRequest);

非同步執行
此部分與 index 相似, 只有一點不同, 返回類型為 GetResponse

Get Response
返回的 GetResponse 對象包含要請求的文檔數據(包含元數據和欄位)

 

String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
    long version = getResponse.getVersion();
    String sourceAsString = getResponse.getSourceAsString(); // string 形式   
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // map 
    byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 位元組形式 
} else {
   // 沒有發現請求的文檔 
}

在請求中如果包含特定的文檔版本,如果與已存在的文檔版本不匹配, 就會出現衝突

try {
    GetRequest request = new GetRequest("posts", "doc", "1").version(2);
    GetResponse getResponse = client.get(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本衝突        
    }
}
封裝好的參考方法:
  /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes  返回需要包含的欄位,可以傳入空
     * @param excludes  返回需要不包含的欄位,可以傳入為空
     * @param excludes  version
     * @param excludes  versionType
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException {
        if (null == includes || includes.length == 0) {
            includes = Strings.EMPTY_ARRAY;
        }
        if (null == excludes || excludes.length == 0) {
            excludes = Strings.EMPTY_ARRAY;
        }
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        getRequest.realtime(true);
        if (null != version) {
            getRequest.version(version);
        }
        if (null != versionType) {
            getRequest.versionType(versionType);
        }
        return getClient().get(getRequest.fetchSourceContext(fetchSourceContext));
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException {
        return getRequest(index, indexType, docId, includes, excludes, null, null);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public GetResponse getRequest(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        return getClient().get(getRequest);
    }

3、Exists API

如果文檔存在 Exists API 返回 true, 否則返回 fasle。

Exists Request

GetRequest 用法和 Get API 差不多,兩個對象的可選參數是相同的。由於 exists() 方法只返回 true 或者 false, 建議將獲取 _source 以及任何存儲欄位的值關閉,儘量使請求輕量級。

GetRequest getRequest = new GetRequest(
    "posts",  // Index
    "doc",    // Type
    "1");     // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false));  // 禁用 _source 欄位
getRequest.storedFields("_none_"); // 禁止存儲任何欄位   

同步請求

boolean exists = client.exists(getRequest);

非同步請求
非同步請求與 Index API 相似,此處不贅述,只粘貼代碼。如需詳細瞭解,請參閱官方地址

ActionListener<Boolean> listener = new ActionListener<Boolean>() {
    @Override
    public void onResponse(Boolean exists) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.existsAsync(getRequest, listener); 

封裝的參考方法:

   /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public Boolean existDoc(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        return getClient().exists(getRequest);
    }

4、Delete API

Delete Request
DeleteRequest 必須傳入下麵參數

DeleteRequest request = new DeleteRequest(
        "posts",   // index 
        "doc",     // doc
        "1");      // document id

可選參數
超時時間

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");    

版本

request.version(2); 

版本類型

request.versionType(VersionType.EXTERNAL); 
同步執行
DeleteResponse deleteResponse = client.delete(request);

非同步執行

ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
    @Override
    public void onResponse(DeleteResponse deleteResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};


client.deleteAsync(request, listener);
Delete Response

 

DeleteResponse 可以檢索執行操作的信息

String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功分片數目小於總分片
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); // 處理潛在失敗
    }
}

也可以來檢查文檔是否存在

DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(request);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    // 文檔不存在
}
版本衝突時也會拋出 `ElasticsearchException

try {
    DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
    DeleteResponse deleteResponse = client.delete(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本衝突
    }
}

封裝好的參考方法:

  /**
     * @param index
     * @param indexType
     * @param docId
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId);
        if (null != timeValue) {
            deleteRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            deleteRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            deleteRequest.version(version);
        }
        if (null != versionType) {
            deleteRequest.versionType(versionType);
        }
        return getClient().delete(deleteRequest);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException {
        return deleteDoc(index, indexType, docId, null, null, null, null);
    }

5、Update API

Update Request
UpdateRequest 的必需參數如下

UpdateRequest request = new UpdateRequest(
        "posts",  // Index
        "doc",  // 類型
        "1");   // 文檔 Id

使用腳本更新

部分文檔更新:
在更新部分文檔時,已存在文檔與部分文檔會合併。

部分文檔可以有以下形式:

JSON 格式:

UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON); 

Map 格式:

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(jsonMap); 

XContentBuilder 對象:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.timeField("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(builder);  
Object key-pairs

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); 

Upserts:如果文檔不存在,可以使用 upserts 方法將文檔以新文檔的方式創建。

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); 

upserts 方法支持的文檔格式與 update 方法相同。

可選參數:
超時時間

request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s"); 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");  

衝突後重試次數

request.retryOnConflict(3);

獲取數據源,預設是開啟的

request.fetchSource(true); 

包括特定欄位

String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 

排除特定欄位

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 

指定版本

request.version(2); 

禁用 noop detection

request.scriptedUpsert(true); 

 

設置如果更新的文檔不存在,就必須要創建一個

request.docAsUpsert(true); 

同步執行

UpdateResponse updateResponse = client.update(request);

非同步執行

ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
    @Override
    public void onResponse(UpdateResponse updateResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.updateAsync(request, listener); 

Update Response

String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 文檔已創建
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 文檔已更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    // 文檔已刪除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    // 文檔不受更新的影響
}

如果在 UpdateRequest 中使能了獲取源數據,響應中則包含了更新後的源文檔信息。

GetResult result = updateResponse.getGetResult(); 
if (result.isExists()) {
    String sourceAsString = result.sourceAsString();  // 將獲取的文檔以 string 格式輸出
    Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式輸出
    byte[] sourceAsBytes = result.source();  // 位元組形式
} else {
    // 預設情況下,不會返迴文檔源數據
}

也可以檢測是否分片失敗

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功的分片數量小於總分片數量
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); // 得到分片失敗的原因
    }
}

如果在執行 UpdateRequest 時,文檔不存在,響應中會包含 404 狀態碼,而且會拋出 ElasticsearchException 。

UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist")
        .doc("field", "value");
try {
    UpdateResponse updateResponse = client.update(request);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 處理文檔不存在的情況
    }
}

如果版本衝突,也會拋出 ElasticsearchException

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("field", "value")
        .version(1);
try {
    UpdateResponse updateResponse = client.update(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 處理版本衝突的情況
    }
}

封裝好的參考方法:

   /**
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @param docAsUpsert
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId);
        updateRequest.doc(dataMap);
        if (null != timeValue) {
            updateRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            updateRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            updateRequest.version(version);
        }
        if (null != versionType) {
            updateRequest.versionType(versionType);
        }
        updateRequest.docAsUpsert(docAsUpsert);
        //衝突時重試的次數
        updateRequest.retryOnConflict(3);
        if (null == includes && null == excludes) {
            return getClient().update(updateRequest);
        } else {
            if (null == includes || includes.length == 0) {
                includes = Strings.EMPTY_ARRAY;
            }
            if (null == excludes || excludes.length == 0) {
                excludes = Strings.EMPTY_ARRAY;
            }
            return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes)));
        }
    }

    /**
     * 更新時不存在就插入
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null);
    }

    /**
     * 存在才更新
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null);
    }

 

6、Bulk API 批量處理

批量請求
使用 BulkRequest 可以在一次請求中執行多個索引,更新和刪除的操作。

BulkRequest request = new BulkRequest();  
request.add(new IndexRequest("posts", "doc", "1")  
        .source(XContentType.JSON,"field", "foo")); // 將第一個 IndexRequest 添加到批量請求中
request.add(new IndexRequest("posts", "doc", "2")  
        .source(XContentType.JSON,"field", "bar")); // 第二個
request.add(new IndexRequest("posts", "doc", "3")  
        .source(XContentType.JSON,"field", "baz")); // 第三個

在同一個 BulkRequest 也可以添加不同的操作類型

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3")); 
request.add(new UpdateRequest("posts", "doc", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")  
        .source(XContentType.JSON,"field", "baz"));

可選參數
超時時間

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for"); 

設置在批量操作前必須有幾個分片處於激活狀態

 

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);  // 全部分片都處於激活狀態
request.waitForActiveShards(ActiveShardCount.DEFAULT);  // 預設
request.waitForActiveShards(ActiveShardCount.ONE);  // 一個

同步請求

BulkResponse bulkResponse = client.bulk(request);

非同步請求

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.bulkAsync(request, listener); 

Bulk Response
BulkResponse 中包含執行操作後的信息,並允許對每個操作結果迭代。

for (BulkItemResponse bulkItemResponse : bulkResponse) { // 遍歷所有的操作結果
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); // 獲取操作結果的響應,可以是  IndexResponse, UpdateResponse or DeleteResponse, 它們都可以慚怍是 DocWriteResponse 實例

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
        IndexResponse indexResponse = (IndexResponse) itemResponse; // index 操作後的響應結果

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
        UpdateResponse updateResponse = (UpdateResponse) itemResponse; // update 操作後的響應結果

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse; // delete 操作後的響應結果
    }
}

此外,批量響應還有一個非常便捷的方法來檢測是否有一個或多個操作失敗

if (bulkResponse.hasFailures()) { 
    // 表示至少有一個操作失敗
}

在這種情況下,我們要遍歷所有的操作結果,檢查是否是失敗的操作,並獲取對應的失敗信息

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { // 檢測給定的操作是否失敗
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); // 獲取失敗信息

    }
}

Bulk Processor
BulkProcessor 是為了簡化 Bulk API 的操作提供的一個工具類,要執行操作,就需要下麵組件

RestHighLevelClient 用來執行 BulkRequest 並獲取 BulkResponse`
BulkProcessor.Listener 對 BulkRequest 執行前後以及失敗時監聽
BulkProcessor.builder 方法用來構建一個新的BulkProcessor

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        // 在每個 BulkRequest 執行前調用
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        // 在每個 BulkRequest 執行後調用
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        // 失敗時調用
    }
};

BulkProcessor.Builder 提供了多個方法來配置 BulkProcessor
如何來處理請求的執行。

BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500); // 指定多少操作時,就會刷新一次
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0);  // 指定多大容量,就會刷新一次
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // 允許併發執行的數量 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 
BulkProcessor 創建後,各種請求就可以添加進去:

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?"	   

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1. 使用IE瀏覽器登陸VPN2. 遠程登陸3. 在伺服器安裝最新的node.js,git等4. 下載源碼> git clone ****.git5. npm安裝依賴> cd you-project> npm i6. 使用egg單進程啟動// 安裝最新的egg包// 在項目根目錄下新建run.jsc... ...
  • 查看sudo lsof -i:port (埠號) 殺死進程 ...
  • Git 子模塊操作相關的一些命令備忘: git 添加子模塊: 保持更新,更多內容請關註 cnblogs.com/xuyaowen; ...
  • 常用命令: 查看塊設備分區信息cat /proc/mtd 查看塊設備信息mtdinfo /dev/mtd0 格式化mtd分區ubiformat /dev/mtd0 將mtd分區與ubi關聯ubiattach /dev/ubi_ctrl -m 0 取消關聯ubidetach /dev/ubi_ctrl ...
  • 下載:https://www.mongodb.com/ 安裝:略 註意: 使用前修改bin目錄下配置文件mongodb.cfg,刪除最後一行的'mp'欄位 1. 啟動服務與終止服務 2.創建管理員用戶 3.使用賬戶密碼連接mongodb 4.資料庫 查看資料庫 切換資料庫 增加資料庫 刪除資料庫 5 ...
  • kylin從入門到實戰:實際案例:https://www.cnblogs.com/bigdataer/p/6709783.html (不知道是不是這樣理解:Measures添加中必須有COUNT,然後再添加其他的SUM,MIN等等這樣不會報錯) ...
  • ``` mysql> show variables like '%increment%'; + + + | Variable_name | Value | + + + | auto_increment_increment | 2 | | auto_increment_offset | 1 | | d... ...
  • 一、linux中MySQL出現中文亂碼問題如下操作 1.編輯vi /etc/my.cnf文件,添加圖中標記三行 [client] default-character-set=utf8 [mysqld] character-set-server=utf8 [mysql] default-charact ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...