8、Bulk API 可以把多個 或`delete bulk API`中執行。這樣可以極大地提高索引速度。 API使用如下的JSON結構: 註意,最後一行數據必須要以 結尾。發送請求時,Content Type 標頭應設置為 application /x ndjson。 可以是 ,`create d ...
8、Bulk API
可以把多個index
或delete
操作放在單個bulk API
中執行。這樣可以極大地提高索引速度。
/_bulk
API使用如下的JSON結構:
action_and_meta_data\n
optional_source\n
action_and_meta_data\n
optional_source\n
....
action_and_meta_data\n
optional_source\n
註意,最後一行數據必須要以\n
結尾。發送請求時,Content-Type 標頭應設置為 application /x-ndjson。
action
可以是index
,create
,delete
和update
操作。index
和create
操作需要在下一行指定一個文檔數據。index
操作相當於 index API
,create
操作相當於op_type=create
的index
操作。delete
操作不要指定文檔參數,參考delete API
。update
操作僅需文檔的部分參數,doc
、upsert
,script
,或其他一些可選項。
如果使用curl
導入文本作為參數,你必須使用--data-binary
標記而不是-d
參數。使用--data-binary
的json文本數據不能換行:
$ cat requests
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
$ curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"_doc","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}
因為使用\n
作為分割符,所以請確保json數據不是格式化的(需要在同一行)。例如:
POST _bulk
{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }
返回結果:
{
"took": 30,
"errors": false,
"items": [
{
"index": {
"_index": "test",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201,
"_seq_no" : 0,
"_primary_term": 1
}
},
{
"delete": {
"_index": "test",
"_type": "_doc",
"_id": "2",
"_version": 1,
"result": "not_found",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 404,
"_seq_no" : 1,
"_primary_term" : 2
}
},
{
"create": {
"_index": "test",
"_type": "_doc",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 201,
"_seq_no" : 2,
"_primary_term" : 3
}
},
{
"update": {
"_index": "test",
"_type": "_doc",
"_id": "1",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200,
"_seq_no" : 3,
"_primary_term" : 4
}
}
]
}
可以使用 /_bulk
, /{index}/_bulk
, {index}/{type}/_bulk
,在url中指定index
和type
參數。
每個操作都會分配到對應的分片執行,僅僅action_meta_data
數據在所請求的分片中解析(因為需要預先找出該操作在那個分片上執行)。
客戶端應儘量使用{index}/{type}/_bulk
減少數據傳輸。
返回結果會保存每個操作的返回結果,並且即使某個操作失敗了也不會影響剩下的操作。
bulk
api 沒有規定每個bulk
中應該包含多少個操作,具體操作數量應該和你操作的工作量相關。(例如工作量少可以包含多點,工作量大的包含少點)
如果使用HTTP API
,確保客戶端不要發送 HTTP chunks
,因為這樣會降低速度。
8.1 樂觀鎖( Optimistic Concurrency Control)
批量API調用中的每個索引和刪除操作可以在其各自的操作和元數據行中包括if_seq_no和if_primary_term參數。if_seq_no和if_primary_term參數根據對現有文檔的最後一次修改來設置。有關更多詳細信息,請參閱樂觀併發控制。
8.2 版本號(Versioning)
每個 bulk
項 都可指定 _version
或version
欄位。它的行為和index
或delete
操作一樣,具體還要基於映射中_version
設置。它同樣也支持 version_type
查閱versioning
8.3 路由(Routing)
bulk
中的每一項都可以指定_routing
或routing
欄位。它會根據_routing
自動跟蹤索引 / 刪除操作的行為。
8.4 等待活躍分片(Wait For Active Shards)
在執行bulk
之前,可以指定最小活躍分片數。有關更多詳細信息和用法示例,查閱這裡
8.5 刷新(refresh)
設置bulk
操作所做的更改到搜索可見的時間。查閱 refresh
只有收到批量請求的分片才會受到刷新的影響。想象一下這樣的請求
_bulk?refresh = wait_for
,其中包含三個文檔,這些文檔恰好被路由到具有五個分片的索引,且請求打落在其中3個不同分片上。那麼請求將會等待這三個分片刷新。其他兩個分片不參與_bulk請求。
8.6 更新
使用更新操作時,retry_on_conflict
可用作操作本身的欄位(不在額外的有效負載行中),以指定在版本衝突的情況下應重試更新的次數。
更新操作支持的參數有:doc(部分文檔)
、upsert
, doc_as_upsert
,script
, params (用於腳本)
, lang (用於腳本)
和_source
。
更新操作的示例:
POST _bulk
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_type" : "_doc", "_index" : "index1", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_type" : "_doc", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "_source": true}