6、Update By Query API 介面可以在不改變 source 的情況下對 index 中的每個文檔進行更新。這對於獲取新屬性或其他聯機映射更改很有用。以下是 API: 這將返回如下內容: _update_by_query 在開始執行的時候獲得一個快照,並使用內部版本控制對找到的內容進行 ...
6、Update By Query API
_update_by_query
介面可以在不改變 source 的情況下對 index 中的每個文檔進行更新。這對於獲取新屬性或其他聯機映射更改很有用。以下是 API:
POST twitter/_update_by_query?conflicts=proceed
這將返回如下內容:
{
"took" : 147,
"timed_out": false,
"updated": 120,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": -1.0,
"throttled_until_millis": 0,
"total": 120,
"failures" : [ ]
}
_update_by_query 在開始執行的時候獲得一個快照,並使用內部版本控制對找到的內容進行索引。這意味著,如果文檔在獲取快照和處理索引請求之間發生更改,則會出現版本衝突。當版本匹配時,文檔會更新,對應的版本號也會遞增。
由於內部版本控制不支持將0作為有效版本號,因此無法使用_update_by_query更新版本號為零的文檔。
所有的更新和查詢失敗都將導致_update_by_query 中止,並且在 響應中返回 failures。已經更新的內容仍然存在。也就是說這個操作不會回滾,只會中止。當第一個錯誤引起中止後,所有的錯誤都會返回到 failures 元素中,因此有可能有相當多的失敗實例。
如何您想在遇到版本衝突時繼續執行 _updata_by_query,那麼可以在 url 中設置 conflicts=proceed
或是在請求中設置 conflicts":"proceed
。第一個例子是這樣設置的,因為它想要獲得發生改變的線上映射,版本衝突僅僅意味著衝突文檔在update_by_query 的開始時間和試圖更新文檔的時間之間被更新。這沒關係,因為該更新將獲取聯機映射更新。
下麵的例子將會 update twitter
索引中的 tweets:
POST twitter/_doc/_update_by_query?conflicts=proceed
您還可以使用 Query DSL 限制 _update_by_query,下麵的例子將會更新 twitter 索引中所有 user 欄位為 kimchy 的文檔:
POST twitter/_update_by_query?conflicts=proceed
{
"query": {
"term": {
"user": "kimchy"
}
}
}
目前,我們在更新文檔的時候沒有更改 source。這對諸如獲取新屬性很有用,但只是其中一般的樂趣。_update_by_query 支持用腳本來更新文檔。下麵的例子將會把所有user為 kimchy 的 likes 欄位加 1:
POST twitter/_update_by_query
{
"script": {
"source": "ctx._source.likes++",
"lang": "painless"
},
"query": {
"term": {
"user": "kimchy"
}
}
}
正如在 update API 中一樣,您可以設置 ctx.op
來更改執行的操作:
- noop:如果腳本確定不需要進行任何更改,請設置
ctx.op=noop
。這將導致_update_by_query 從其更新中忽略該文檔。此無操作將在響應體的 noop 計數器中報告。 - delete:如果你決定刪除該文檔,請設置
ctx.op=delete
。刪除操作會在響應體的 deleted 計數器中報告。
將 ctx.op 設置為其他值都是錯誤的,在 ctx 中設置其他欄位也都是錯誤的。
請註意,如果我們取消設置 conflicts=proceed
。在這種情況下,我們希望版本衝突能夠中止該操作以便於我們處理失敗的原因。
此API不允許您移動涉及到的文檔,只能修改它的 source。這是有意為之的。因為我們不想將文件移動位置。
此API也可以一次修改多個索引和多個類型,如:
POST twitter,blog/_doc,post/_update_by_query
如何設置了routing
,那麼操作的分片還應該滿足路由的條件,如:
POST twitter/_update_by_query?routing=1
_update_by_query 預設使用的scroll 批次大小是 1000,您可以通過設置 scroll_size
來 修改此值:
POST twitter/_update_by_query?scroll_size=100
_update_by_query 也可以指定 Ingest Node介面的 pipeline,如:
PUT _ingest/pipeline/set-foo
{
"description" : "sets foo",
"processors" : [ {
"set" : {
"field": "foo",
"value": "bar"
}
} ]
}
POST twitter/_update_by_query?pipeline=set-foo
6.1 URL 參數(URL Parameters)
_update_by_query 介面支持的參數有 pretty
、 refresh
、 wait_for_completion
、wait_for_active_shards
、timeout
、scroll
- refresh
- 執行完更新操作後刷新update_by_query 中查詢涉及的分片。這和 update API 的 refresh 不同,update API 的 refresh 僅刷新接收到更新請求的分片。_update_by_query 的 refresh 參數不支持 wait_for。
- wait_for_completion
- 如果請求中 wait_for_completion=false,那麼 elasticsearch 將會執行預檢查、執行請求,然後返回可與任務 API 一起使用的任務,以取消或獲取任務的狀態。elasticsearch 還將在.task/task/${taskId} 處創建此任務的記錄文檔。您可以根據需要保留或刪除該文檔。用完刪除後,elasticsearch 可以回收其占用的空間。
- wait_for_active_shards
- 控制在執行請求前需要有幾個可用的分片數。
- timeout
- 控制每個請求等待分片從不可用狀態到可用狀態的時間。因為_update_by_query 使用滾動搜索,所以可以指定 scroll 參數來控制它使 “搜索上下文” 保持活動的時間,例如:scroll=10m。預設情況下,它是 5 分鐘。
- request_per_second
- 可以設置為任意正數(如 1.4、6、1000 等)。通過設置等待的時間來控制_update_by_query 批量執行更新操作的速率。當 requests_per_second 設置為 - 1 時禁用該控制。
速率限制是通過在批量處理之間等待來完成的,這樣就可以為_update_by_query內部使用的回滾指定一個考慮填充的超時時間。填充時間是批處理大小除以 requests_per_second 與寫入時間只差。預設情況下,批處理大小為 1000,因此如 request_per_second 設置為 500:
target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
由於該批處理是作為單個 _bulk 請求發出的,因此大批量的請求將導致ElasticSearch創建多個請求,然後在啟動下一個集合之前等待一段時間。這是“突發”而不是“平穩”。預設值為-1。
6.2 響應體(Response body)
{
"took" : 147,
"timed_out": false,
"total": 5,
"updated": 5,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": -1.0,
"throttled_until_millis": 0,
"failures" : [ ]
}
- took
- 整個操作耗費的毫秒數
- timed_out
- 如果在執行 _update_by_query 操作時出現超時,那麼這個標識將會返回 true
- total
- 成功執行操作的文檔的數量
- updated
- 成功的更新了多少個文檔
- deleted
- 成功的刪除了多少個文檔
- batches
- 回滾數
- verison_conflicts
- 操作過程中出現版本衝突的數量
- noops
- 由於 ctx.op=noop 設置造成的忽略的文檔數
- retries
- 重覆嘗試的次數,bulk 是批量更新操作重覆嘗試的次數,search 是查詢的重覆嘗試次數
- throthled_millis
- requests_per_second 參數引起的請求等待時間
- requests_per_second
- 在操作過程中,每秒執行的請求數
- throttled_until_millis
- 執行
_update_by_query
時這個值始終0,只在在調用Task API時該值才有意義,它表示下一次(自紀元以來)為了符合requests_per_second
將再次執行請求的毫秒數。
- 執行
- failures
- 執行失敗的數組,包含在執行過程中任何不可恢復的錯誤。如果這個數組不是空的,那麼請求會因為這些失敗而中止。_delete_by_query是使用批處理實現的,任何失敗都會導致整個執行被中止。可以使用conflicts參數來防止reindex在版本衝突時造成操作中止。
6.3 結合 taskAPi 使用(Works with the Task API)
您可以使用 Task API 獲取任何正在進行 update_by_query 請求的狀態:
GET _tasks?detailed=true&actions=*byquery
返回值:
{
"nodes" : {
"r1A2WoRbTwKZ516z6NEs5A" : {
"name" : "r1A2WoR",
"transport_address" : "127.0.0.1:9300",
"host" : "127.0.0.1",
"ip" : "127.0.0.1:9300",
"attributes" : {
"testattr" : "test",
"portsfile" : "true"
},
"tasks" : {
"r1A2WoRbTwKZ516z6NEs5A:36619" : {
"node" : "r1A2WoRbTwKZ516z6NEs5A",
"id" : 36619,
"type" : "transport",
"action" : "indices:data/write/update/byquery",
"status" : {
"total" : 6154,
"updated" : 3500,
"created" : 0,
"deleted" : 0,
"batches" : 4,
"version_conflicts" : 0,
"noops" : 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0
},
"description" : ""
}
}
}
}
}
status:這個對象包含了當前任務的實際狀態。total
欄位是本次操作需要重新索引的文檔數。你可以通過 updated
, created
, and deleted
欄位估計處理進度。當以上幾個欄位的和等於 total
欄位時,請求就執行完畢了。
你可以使用 task id 查看某個任務。下例查看task id為 r1A2WoRbTwKZ516z6NEs5A:36619
的任務信息:
GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619
該 API 可以與wait_for_comletion=false
集成使用,可以清晰的查看已完成任務的狀態。如果任務已經完成,並且在其上設置了wait_for_completion=false
,那麼請求將會返回結果或是錯誤欄位。此功能的代價是當wait_for_completion=false
時會在.tasks/task/${taskId}
目錄下會創建文檔。您可以根據需要刪除該文檔。
6.4 取消任務(Works with the Cancel Task API)
任何_update_by_query
操作都可以通過task cancel
API來取消,如:
POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel
取消應該執行很快,但可能需要幾秒鐘。在此期間上面的 task status API將繼續列出該任務,直到它完全被取消了。
6.5 閾值(Rethrottling)
在正在執行的請求中,requests_per_second
的值可以在運行時通過_rethrotted
API進行修改:
POST _update_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1
可以使用tasks API找到任務ID。
和 requests_per_seconds 參數設置一樣,rethrottling 參數可以是 - 1 (禁用限制)或是其他十進位數(如 1.7 或 12 )。rethrottling 參數能提高查詢速度且會立即生效,但是降低速度必須等到當前操作執行完後才起作用。這可以防止滾動超時
6.6 切片 (slicing)
update_by_query支持 sliced scroll 來使更新操作並行進行。這能提高效率並且提供了一種將請求分解為較小的部分的便捷方式。
6.6.1 手動切片 (Manually slicing)
通過為每個請求提供切片 ID 和切片總數,手動將 update_by_query操作進行分解:
POST twitter/_update_by_query
{
"slice": {
"id": 0,
"max": 2
},
"script": {
"source": "ctx._source['extra'] = 'test'"
}
}
POST twitter/_update_by_query
{
"slice": {
"id": 1,
"max": 2
},
"script": {
"source": "ctx._source['extra'] = 'test'"
}
}
你可以這樣驗證上述 api 的結果:
GET _refresh
POST twitter/_search?size=0&q=extra:test&filter_path=hits.total
返回如下的的結果:
{
"hits": {
"total": 120
}
}
6.6.2 自動切片 (Automatic slicing)
也可以讓 update_by_query 自動並行地滾動切片。使用 slices
指定要使用的切片數:
POST twitter/_update_by_query?refresh&slices=5
{
"script": {
"source": "ctx._source['extra'] = 'test'"
}
}
您可以通過下列語句驗證運行結果:
POST twitter/_search?size=0&q=extra:test&filter_path=hits.total
返回如下的的結果:
{
"hits": {
"total": 120
}
}
slices 設置為 auto 將允許 ElasticSearch 選擇要使用的切片數。此設置將使用一個分片一個切片,直至達到某個限制。如果存在多個源索引,它將根據具有最少分片的那個索引所擁有的分片數來作為切片數。
向_delete_by_query 添加 slices
只會自動執行上一節中使用的手動過程,這意味著它有一些怪癖:
- 您可以在 Tasks API 中查看這些請求。這些子請求是具有
slices
請求的任務的 “子 " 任務。 - 僅使用
slices
獲取請求的任務狀態 (包含已完成切片的狀態)。 - 這些子請求可單獨定址,例如取消和重新限制。
- 使用
slices
重新處理請求將按比例重新調整未完成的子請求。 - 使用
slices
取消請求將取消每個子請求。 - 由於
slices
的性質,每個子請求都不會獲得完全均勻的文檔部分。這些切片文檔都會被處理,但某些切片可能比其他切片分到更大的文檔。 - 像
requests_per_second
這樣的參數和帶有size
的slices
請求 (按指定比例分配給每個子請求)。將其與上述關於分佈不均勻的點相結合,您應該得出結論,使用slices
的size
可能不會刪除指定大小的文檔。 - 每個子請求都會獲得和源索引略有不同的快照,儘管這些快照幾乎同時進行。
6.6.3 選擇 slices 的數量
如果 slices 設置為 auto,elasticsearch 將會自動為大多數索引選擇一個合理的數量。如果您設置手動切片或以其他方式來調整自動切片,請遵循以下準則:
當切片的數量等於索引的分片數時,查詢性能最好。如果這個數量太大(如 500), 請選擇一個較小的數字,因為太多的切片會影響性能。設置高於分片數的切片通常不會提高效率反而會增加開銷。
update 的性能與可用的切片數量呈正相關。
查詢或更新是否是影響此時運行時性能的主要原因,這取決於reindexed時的文檔和集群的資源。
6.7 獲取新屬性(pick up a new property)
假設您創建了一個沒有動態映射的索引,然後向裡面添加了數據,現在添加後一個 mapping value 可以從數據中獲取更多的欄位,如:
PUT test
{
"mappings": {
"_doc": {
"dynamic": false, 【1】
"properties": {
"text": {"type": "text"}
}
}
}
}
POST test/_doc?refresh
{
"text": "words words",
"flag": "bar"
}
POST test/_doc?refresh
{
"text": "words words",
"flag": "foo"
}
PUT test/_mapping/_doc 【2】
{
"properties": {
"text": {"type": "text"},
"flag": {"type": "text", "analyzer": "keyword"}
}
}
【1】"dynamic":false。表示這個新欄位不會被索引,只是存儲到了_source。
【2】為了獲取這個新的欄位您必須reindex所有文檔。
搜索數據將找不到任何內容:
POST test/_search?filter_path=hits.total
{
"query": {
"match": {
"flag": "foo"
}
}
}
{
"hits" : {
"total" : 0
}
}
但是您可以用_update_by_query 請求獲得新的 mapping:
POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
"query": {
"match": {
"flag": "foo"
}
}
}
{
"hits" : {
"total" : 1
}
}
添加多個欄位時,也可以進行相同的操作。