跨機房ES同步實戰

来源:https://www.cnblogs.com/Jcloud/archive/2022/12/08/16965243.html
-Advertisement-
Play Games

作者:謝澤華 背景 眾所周知單個機房在出現不可抗拒的問題(如斷電、斷網等因素)時,會導致無法正常提供服務,會對業務造成潛在的損失。所以在協同辦公領域,一種可以基於同城或異地多活機制的高可用設計,在保障數據一致性的同時,能夠最大程度降低由於機房的僅單點可用所導致的潛在高可用問題,最大程度上保障業務的用 ...


作者:謝澤華

背景

眾所周知單個機房在出現不可抗拒的問題(如斷電、斷網等因素)時,會導致無法正常提供服務,會對業務造成潛在的損失。所以在協同辦公領域,一種可以基於同城或異地多活機制的高可用設計,在保障數據一致性的同時,能夠最大程度降低由於機房的僅單點可用所導致的潛在高可用問題,最大程度上保障業務的用戶體驗,降低單點問題對業務造成的潛在損失顯得尤為重要。

同城雙活,對於生產的高可用保障,重大的意義和價值是不可言喻的。錶面上同城雙活只是簡單的部署了一套生產環境而已,但是在架構上,這個改變的影響是巨大的,無狀態應用的高可用管理、請求流量的管理、版本發佈的管理、網路架構的管理等,其提升的架構複雜度巨大。

結合真實的協同辦公產品:京辦(為北京市政府提供協同辦公服務的綜合性平臺)生產環境面對的複雜的政務網路以及京辦同城雙活架構演進的案例,給大家介紹下京辦持續改進、分階段演進過程中的一些思考和實踐經驗的總結。本文僅針對ES集群在跨機房同步過程中的方案和經驗進行介紹和總結。

架構

1.部署Logstash在金山雲機房上,Logstash啟動多個實例(按不同的類型分類,提高同步效率),並且和金山雲機房的ES集群在相同的VPC

2.Logstash需要配置大網訪問許可權,保證Logstash和ES原集群和目標集群互通。

3.數據遷移可以全量遷移和增量遷移,首次遷移都是全量遷移後續的增加數據選擇增量遷移。

4.增量遷移需要改造增加識別的增量數據的標識,具體方法後續進行介紹。

原理

Logstash工作原理

Logstash分為三個部分input 、filter、ouput:

1.input處理接收數據,數據可以來源ES,日誌文件,kafka等通道.

2.filter對數據進行過濾,清洗。

3.ouput輸出數據到目標設備,可以輸出到ES,kafka,文件等。

增量同步原理

  1. 對於T時刻的數據,先使用Logstash將T以前的所有數據遷移到有孚機房京東雲ES,假設用時∆T

  2. 對於T到T+∆T的增量數據,再次使用logstash將數據導入到有孚機房京東雲的ES集群

  3. 重覆上述步驟2,直到∆T足夠小,此時將業務切換到華為雲,最後完成新增數據的遷移

適用範圍:ES的數據中帶有時間戳或者其他能夠區分新舊數據的標簽

流程

準備工作

1.創建ECS和安裝JDK忽略,自行安裝即可

2.下載對應版本的Logstash,儘量選擇與Elasticsearch版本一致,或接近的版本安裝即可

https://www.elastic.co/cn/downloads/logstash

1) 源碼下載直接解壓安裝包,開箱即用

2)修改對記憶體使用,logstash預設的堆記憶體是1G,根據ECS集群選擇合適的記憶體,可以加快集群數據的遷移效率。

  1. 遷移索引

Logstash會幫助用戶自動創建索引,但是自動創建的索引和用戶本身的索引會有些許差異,導致最終數據的搜索格式不一致,一般索引需要手動創建,保證索引的數據完全一致。

以下提供創建索引的python腳本,用戶可以使用該腳本創建需要的索引。

create_mapping.py文件是同步索引的python腳本,config.yaml是集群地址配置文件。

註:使用該腳本需要安裝相關依賴

yum install -y PyYAML
yum install -y python-requests

拷貝以下代碼保存為 create_mapping.py:

import yaml
import requests
import json
import getopt
import sys

def help():
    print
    """
    usage:
    -h/--help print this help.
    -c/--config config file path, default is config.yaml
    
    example:  
    python create_mapping.py -c config.yaml 
    """
def process_mapping(index_mapping, dest_index):
    print(index_mapping)
    # remove unnecessary keys
    del index_mapping["settings"]["index"]["provided_name"]
    del index_mapping["settings"]["index"]["uuid"]
    del index_mapping["settings"]["index"]["creation_date"]
    del index_mapping["settings"]["index"]["version"]

    # check alias
    aliases = index_mapping["aliases"]
    for alias in list(aliases.keys()):
        if alias == dest_index:
            print(
                "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
            del index_mapping["aliases"][alias]
    if index_mapping["settings"]["index"].has_key("lifecycle"):
        lifecycle = index_mapping["settings"]["index"]["lifecycle"]
        opendistro = {"opendistro": {"index_state_management":
                                         {"policy_id": lifecycle["name"],
                                          "rollover_alias": lifecycle["rollover_alias"]}}}
        index_mapping["settings"].update(opendistro)
        # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
        del index_mapping["settings"]["index"]["lifecycle"]
    print(index_mapping)
    return index_mapping
def put_mapping_to_target(url, mapping, source_index, dest_auth=None):
    headers = {'Content-Type': 'application/json'}
    create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
    if create_resp.status_code != 200:
        print(
            "create index " + url + " failed with response: " + str(create_resp) + ", source index is " + source_index)
        print(create_resp.text)
        with open(source_index + ".json", "w") as f:
            json.dump(mapping, f)
def main():
    config_yaml = "config.yaml"
    opts, args = getopt.getopt(sys.argv[1:], '-h-c:', ['help', 'config='])
    for opt_name, opt_value in opts:
        if opt_name in ('-h', '--help'):
            help()
            exit()
        if opt_name in ('-c', '--config'):
            config_yaml = opt_value

    config_file = open(config_yaml)
    config = yaml.load(config_file)
    source = config["source"]
    source_user = config["source_user"]
    source_passwd = config["source_passwd"]
    source_auth = None
    if source_user != "":
        source_auth = (source_user, source_passwd)
    dest = config["destination"]
    dest_user = config["destination_user"]
    dest_passwd = config["destination_passwd"]
    dest_auth = None
    if dest_user != "":
        dest_auth = (dest_user, dest_passwd)
    print(source_auth)
    print(dest_auth)

    # only deal with mapping list
    if config["only_mapping"]:
        for source_index, dest_index in config["mapping"].iteritems():
            print("start to process source index" + source_index + ", target index: " + dest_index)
            source_url = source + "/" + source_index
            response = requests.get(source_url, auth=source_auth)
            if response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    response.status_code) + " response is " + response.text)
                continue
            mapping = response.json()
            index_mapping = process_mapping(mapping[source_index], dest_index)

            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
            print("process source index " + source_index + " to target index " + dest_index + " successed.")
    else:
        # get all indices
        response = requests.get(source + "/_alias", auth=source_auth)
        if response.status_code != 200:
            print("*** get all index failed. resp statusCode:" + str(
                response.status_code) + " response is " + response.text)
            exit()
        all_index = response.json()
        for index in list(all_index.keys()):
            if "." in index:
                continue
            print("start to process source index" + index)
            source_url = source + "/" + index
            index_response = requests.get(source_url, auth=source_auth)
            if index_response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    index_response.status_code) + " response is " + index_response.text)
                continue
            mapping = index_response.json()

            dest_index = index
            if index in config["mapping"].keys():
                dest_index = config["mapping"][index]
            index_mapping = process_mapping(mapping[index], dest_index)

            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
            print("process source index " + index + " to target index " + dest_index + " successed.")

if __name__ == '__main__':
    main()

配置文件保存為config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"

# 是否只處理這個文件中mapping地址的索引
# 如果設置成true,則只會將下麵的mapping中的索引獲取到併在目的端創建
# 如果設置成false,則會取源端集群的所有索引,除去(.kibana)
# 並且將索引名稱與下麵的mapping匹配,如果匹配到使用mapping的value作為目的端的索引名稱
# 如果匹配不到,則使用源端原始的索引名稱
only_mapping: true

# 要遷移的索引,key為源端的索引名字,value為目的端的索引名字
mapping:
    source_index: dest_index

以上代碼和配置文件準備完成,直接執行 python create_mapping.py 即可完成索引同步。

索引同步完成可以取目標集群的kibana上查看或者執行curl查看索引遷移情況:

GET _cat/indices?v

全量遷移

Logstash配置位於config目錄下。

用戶可以參考配置修改Logstash配置文件,為了保證遷移數據的準確性,一般建議建立多組Logstash,分批次遷移數據,每個Logstash遷移部分數據。

配置集群間遷移配置參考:

input{
    elasticsearch{
        # 源端地址
        hosts =>  ["ip1:port1","ip2:port2"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
        # 需要遷移的索引列表,以逗號分隔,支持通配符
        index => "a_*,b_*"
        # 以下三項保持預設即可,包含線程數和遷移數據大小和logstash jvm配置相關
        docinfo=>true
        slices => 10
        size => 2000
        scroll => "60m"
    }
}

filter {
  # 去掉一些logstash自己加的欄位
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output{
    elasticsearch{
        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
 # 目的端索引名稱,以下配置為和源端保持一致
        index => "%{[@metadata][_index]}"
        # 目的端索引type,以下配置為和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 目標端數據的_id,如果不需要保留原_id,可以刪除以下這行,刪除後性能會更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }

    # 調試信息,正式遷移去掉
    stdout { codec => rubydebug { metadata => true }}
}

增量遷移

預處理:

  1. @timestamp 在elasticsearch2.0.0beta版本後棄用

https://www.elastic.co/guide/en/elasticsearch/reference/2.4/mapping-timestamp-field.html

  1. 本次對於京辦從金山雲機房遷移到京東有孚機房,所涉及到的業務領域多,各個業務線中所代表新增記錄的時間戳欄位不統一,所涉及到的相容工作量大,於是考慮通過elasticsearch中預處理功能pipeline進行預處理添加統一增量標記欄位:gmt_created_at,以減少遷移工作的複雜度(各自業務線可自行評估是否需要此步驟)。
PUT _ingest/pipeline/gmt_created_at
{
  "description": "Adds gmt_created_at timestamp to documents",
  "processors": [
    {
      "set": {
        "field": "_source.gmt_created_at",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}
  1. 檢查pipeline是否生效
GET _ingest/pipeline/*
  1. 各個index設置對應settings增加pipeline為預設預處理
PUT index_xxxx/_settings
{
  "settings": {
    "index.default_pipeline": "gmt_created_at"
  }
}
  1. 檢查新增settings是否生效
GET index_xxxx/_settings

增量遷移腳本

schedule-migrate.conf

index:可以使用通配符的方式

query: 增量同步的DSL,統一gmt_create_at為增量同步的特殊標記

schedule: 每分鐘同步一把,"* * * * *"

input {
elasticsearch {
        hosts =>  ["ip:port"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
        index => "index_*"
        query => '{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
        size => 5000
        scroll => "5m"
        docinfo => true
        schedule => "* * * * *"
      }
}
filter {
     mutate {
      remove_field => ["source", "@version"]
   }
}
output {
    elasticsearch {
        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登錄用戶名密碼
        user => "username"
        password => "password"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }

# 調試信息,正式遷移去掉
stdout { codec => rubydebug { metadata => true }}
}

問題:

mapping中存在join父子類型的欄位,直接遷移報400異常

[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, 
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #<LogStash::Event:0x3b3df773>], 
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400, 
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", 
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}

解決方法:

https://discuss.elastic.co/t/an-routing-missing-exception-is-obtained-when-reindex-sets-the-routing-value/155140 https://github.com/elastic/elasticsearch/issues/26183

結合業務特征,通過在filter中加入小量的ruby代碼,將_routing的值取出來,放回logstah event中,由此問題得以解決。

示例:


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ​ 摘要:本文主要歸納總結藍牙模塊的不同工作模式,通過藍牙模塊不同的工作模式瞭解其扮演不同角色時工作的一個基本原理,為更深入的研究藍牙模塊底層的工作機制和技術方案進行鋪墊。 1、主設備工作模式 主設備是能夠搜索別人並主動建立連接的一方,從掃描狀態轉化而來的。其可以和一個或多個從設備進行連接通信,它會 ...
  • Valheim伺服器 Mod修改安裝 註意! **不建議在為通關游戲的情況下對游戲進行任何修改,這會極大的縮短游戲的新鮮度,不建議安裝任何mod及修改器** mod的安裝將直接影響你的游戲體驗 伺服器安裝mod還需要參與伺服器的玩家一併安裝mod文件 本mod非往游戲中添加游戲元素,只是在原有的基礎 ...
  • 一、SSL認證 也就是我們常說的伺服器認證,為的是啟動加密傳輸協議https,步驟如下: 1、生成證書請求 進入IIS,選擇伺服器的伺服器證書設置選項, 創建證書申請,填值如圖所示 選擇加密服務提供程式,並設置證書密鑰長度,EV證書需選擇位長2048 完成之後,會保留一條請求記錄,如圖 生成的證書請 ...
  • 在 Linux 中一切皆文件。文件管理主要是涉及文件/目錄的創建、刪除、移動、複製和查詢,有mkdir/rm/mv/cp/find 等命令。其中 find 文件查詢命令較為複雜,參數豐富,功能十分強大;查看文件內容是一個比較大的話題,文本處理也有很多工具供我們使用,本文涉及到這兩部分的內容只是點到為... ...
  • Redis項目總結--緩存更新策略 1.更新策略 | | 記憶體淘汰 | 超時剔除 | 主動更新 | | : : | : : | : : | : : | | 說明 | 不用自己維護,利用Redis記憶體淘汰機制,記憶體不足時自動淘汰部分數據,下次查詢時更新緩存 | 給緩存數據添加過期時間,到期刪除,下次查 ...
  • 全網最全的linux上docker安裝oracle的詳細文檔,遇到了n個問題,查了幾十篇文章,最終彙總版,再有解決不了的,私聊我,我幫你解決 1. 拉取阿裡鏡像oracle docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11 ...
  • 2020年1月,時間跨度長達14年的,微軟2.5億條客戶服務和支持記錄在網上泄露; 同年4月,微盟發生史上最貴“刪庫跑路”事件,造成微盟市值一夜之間縮水約24億港幣; 今年7月,網信辦依據《數據安全法》等法律法規,對滴滴公司開出人民幣80.26億元的巨額罰款,對互聯網企業敲響數據安全警鐘。 數據作為 ...
  • 本文分享自華為雲社區《GaussDB(DWS)字元串、二進位、十六進位互轉》,作者:你是猴子請來的救兵嗎 。 概述 現網中遇到很多小伙伴不清楚字元串與進位之間的轉換方法,其實在GaussDB(DWS)中,進位轉換是非常方便的。這次就來對不同的場景一一進行解析,整理出來供大家翻閱參考。 字元串&二進位 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...