> 本文首發於公眾號:Hunter後端 > 原文鏈接:[Python連接es筆記三之es更新操作](https://mp.weixin.qq.com/s/1cTaVfjLFrmbXajNcayhEA) 這一篇筆記介紹如何使用 Python 對數據進行更新操作。 對於 es 的更新的操作,不用到 Se ...
本文首發於公眾號:Hunter後端
原文鏈接:Python連接es筆記三之es更新操作
這一篇筆記介紹如何使用 Python 對數據進行更新操作。
對於 es 的更新的操作,不用到 Search() 方法,而是直接使用 es 的連接加上相應的函數來操作,本篇筆記目錄如下:
- 獲取連接
- update()
- update_by_query()
- 批量更新
- UpdateByQuery()
1、獲取連接
如果使用的是之前的全局創建連接的方式:
from elasticsearch_dsl import connections
connections.configure(
default={"hosts": "localhost:9200"},
)
我們可以根據別名獲取相應的連接:
conn = connections.connections.get_connection("default")
或者我們直接使用 elasticsearch.Elasticsearch 模塊來重新建立一個連接:
from elasticsearch import Elasticsearch
conn = Elasticsearch(hosts="localhost:9200")
前面介紹過,我們安裝 elasticsearch_dsl 依賴的時候,會自動為我們安裝上相應的 elasticsearch 模塊,我們這裡直接使用即可。
然後通過 conn 連接可以直接對數據進行更新,可用的方法有 update(),update_by_query() 以及一個批量的 bulk() 方法。
2、update()
update() 函數一般只用於指定 id 的更新操作,如果我們知道一條數據的 id,我們可以直接使用 update()。
比如對於 exam 這個 index 下 id=18 的數據,我們想要更新它的 name 欄位和 address 欄位分別為 王五和湖南省,我們可以如下操作:
conn.update(
index="exam",
id=18,
body={
"doc": {
"name": "王五2",
"address": "湖南省",
}
}
)
在上面的操作中,index 為指定的索引,id 參數為我們需要更新的 id,body 內 doc 下的欄位即為我們要更新的數據。
3、update_by_query()
update_by_query() 函數不局限於 id 的查詢更新,我們可以更新任意符合條件的數據,以下是一個簡單的示例:
conn.update_by_query(
index="exam",
body={
"query": {
"term": {"name": "張三豐"}
},
"script": {
"source": "ctx._source.address = params.address",
"params": {
"address": "新地址",
}
}
}
)
在這裡,index 參數還是指向對應的索引,body 內包含了需要更新查詢的條件,這裡都在 query 參數內,需要更新的數據在 script 下,通過腳本的形式來操作更新。
這裡註意下,我這裡用到的是 7.6.0 版本,所以 script 下使用的 source,更低一點版本用的欄位可能是 inline,這裡使用對應版本的參數即可。
在 script.source 中,內容為 ctx._source.address = params.address
,意思是將符合條件數據的 address 欄位內容更新為 params 的 address 的數據。
如果想要更改其他欄位內容,註意前面 ctx._source 為固定寫法,只需要更改後面的欄位名即可。
在 script.params 中,我們則可以定義各種對應的欄位及其內容。
更新多個欄位
如果我們想同時更新多個欄位,比如說符合條件的數據將 address 改為 新地址
,將 age 欄位改為 28,我們則需要將多個條件在 script.source 中使用分號 ;
連接起來,示例如下:
conn.update_by_query(
index="exam",
body={
"query": {
"term": {"name": "新張三豐2"}
},
"script": {
"source": "ctx._source.address = params.address; ctx._source.age = params.age",
"params": {
"address": "新地址3",
"age": "28"
}
}
}
)
雖然這裡更新多個欄位需要使用分號連接,但是在實際的代碼中我們不用這麼寫死,比如說我們需要更改三個欄位,為 ["address", "name", "age"]
,我們如下操作:
field_list = ["address", "name", "age"]
source_list = [f"ctx._source.{key}=params.{key}" for key in field_list]
params = {
"address": "新地址3",
"age": "28",
"name": "new name"
}
conn.update_by_query(
index="exam",
body={
"query": {
"term": {"name": "新張三豐3"}
},
"script": {
"source": ";".join(source_list),
"params": params
}
}
)
4、批量更新
如果我們想批量更新一批數據,這批數據各個欄位的值都不一致,自定義的程度很大,使用 update_by_query() 函數已經不現實了,怎麼辦?
好解決,我們可以使用 helpers.bulk() 批量更新方法。
首先引入這個模塊:
from elasticsearch import helpers
假設我們系統里現在有 id 為 21,23,24 的幾條數據,還是在 exam 這個索引下,我們來構造幾條需要更新的數據來操作:
action_1 = {
"_op_type": "update",
"_index": "exam",
"_id": 21,
"doc": {"age": 19, "name": "令狐沖", "address": "華山派"},
}
action_2 = {
"_op_type": "update",
"_index": "exam",
"_id": 23,
"doc": {"age": 20, "name": "楊過", "address": "終南山"},
}
action_3 = {
"_op_type": "update",
"_index": "exam",
"_id": 24,
"doc": {"age": 21, "name": "張無忌", "address": "武當"},
}
action_list = [action_1, action_2, action_3]
helpers.bulk(conn, actions=action_list)
對於每一條需要更新的數據,有這幾個參數:
_op_type:如果是更新操作,其值則是 update
_index:表示需要更新的數據所在的索引,這裡是 exam
_id:表示這條需要更新的數據的 id
doc:是一個 dict 數據,其下包含了需要更新的欄位及其對應的值
至此,一條需要更新的數據的結構就構造完畢了。
然後對於 helpers.bulk() 函數,接收的第一個參數為 es 連接,actions 參數是一個列表,其內容就是我們前面構造的數據的集合。
然後執行這個操作就可以發現 es 中對應的值已經更改了。
5、UpdateByQuery()
UpdateByQuery() 函數來源於 elasticsearch_dsl 模塊,它的使用和 Search() 方法差不多,都是通過 using 和 index 參數來獲取 es 連接和索引:
from elasticsearch_dsl import connections
from elasticsearch_dsl import UpdateByQuery
from elasticsearch_dsl import Q as ES_Q
connections.configure(
default={"hosts": "localhost:9200"},
)
ubq = UpdateByQuery(using="default", index="exam")
使用這個方法更新數據的具體語法和 update_by_query 差不多,都是通過 script 的方式來操作,以下是一個簡單示例:
ubq = UpdateByQuery(using="default", index="exam")
q1 = ES_Q("term", name="郭靖")
ubq = ubq.query(q1)
ubq = ubq.script(
source="ctx._source.address=params.address",
params={
"address": "襄陽城"
}
)
ubq.execute()
與 Search() 函數一樣,都需要通過 execute() 函數來向 es 提交數據。
如果想獲取更多後端相關文章,可掃碼關註閱讀: