5分鐘搞定 關係型資料庫 到 Flink 數據同步

来源:https://www.cnblogs.com/clougence/archive/2022/08/30/16639531.html
-Advertisement-
Play Games

簡述 實時數據處理領域中,使用 Flink 方式,除了從日誌服務訂閱埋點數據外,總離不開從關係型資料庫訂閱並處理相關業務數據,這時就需要監測並捕獲資料庫增量數據,將變更按發生的順序寫入到消息中間件以供計算(或消費)。 本文主要介紹如何通過 CloudCanal 快速構建一條高效穩定運行的 MySQL ...


簡述

實時數據處理領域中,使用 Flink 方式,除了從日誌服務訂閱埋點數據外,總離不開從關係型資料庫訂閱並處理相關業務數據,這時就需要監測並捕獲資料庫增量數據,將變更按發生的順序寫入到消息中間件以供計算(或消費)。
本文主要介紹如何通過 CloudCanal 快速構建一條高效穩定運行的 MySQL -> Kafka -> Flink 數據同步鏈路。

技術點

相容多種常見消息結構

CloudCanal 目前支持 Debezium Envelope (新增)CanalAliyun DTS Avro 等多種流行消息結構,對數據下游消費比較友好。
本次對 Debezium Envelope 消息格式的支持,我們採用了一種輕量的方式做到完全相容,充分利用 CloudCanal 增量組件,擴展數據序列化器 (EnvelopDeserialize),得到 Envelop 消息併發送到 Kafka 中。
其中 Envelop 的消息結構分為 PayloadSchema 兩部分

  • Payload:存儲具體數據
  • Schema:定義 Payload 的解析格式 (預設關閉)
{
  "payload":{
    "after":{
      "column_1":"3",
      ...
    },
    "before":null,
    "op":"c",
    "source":{
      "db":"kafka_test",
      "table":"new_table"
      "pos":110341861,
      "ts_ms":1659614884026,
      ...
    },
    "ts_ms":1659614884026
  },
  "schema":{
    "fields":[
      {
        "field":"after",
        "fields":[
          {
            "field":"column_1",
            "isPK":true,
            "jdbType":4,
            "type":"int(11)"
          },
          ...
        ],
        "type":"struct"
      },
      ...
    ],
    "type":"struct"
  }
}

高度可視化的CDC

CDC 工具如 FlinkCDCMaxwellDebezium ... 各有特色,CloudCanal 相對這些產品,最大的特點是高度可視化,自動化,下表針對目標端為Kafka 的 CDC 簡要做了一些對比。

CloudCanal FlinkCDC Maxwell
產品化 完備 基礎
同步對象配置 可視化 代碼 配置文件
封裝格式 多種常用格式 自定義 JSON
高可用
數據初始化(snapshot) 實例級 實例級 單表
源端支持 ORACLE,MySQL,SQLServer,MongoDB,PostgreSQL... ORACLE,MySQL,SQLServer,MongoDB,PostgreSQL... MySQL

CloudCanal 在平衡性能的基礎上,提供多種關係型數據源的同步,以及反向同步;提供便捷的可視化操作、輕巧的數據源添加、輕便的參數配置;
提供多種常見的消息格式,僅僅通過滑鼠點擊,就可以使用其他 CDC 的消息格式的傳輸,讓數據處理變的異常的快捷、方便。
其中經過我們在相同環境的測試下, CloudCanal 在高寫入的 MySQL 場景中,處理數據的效率表現的很出色,後續我們會繼續對 CloudCanal 進行優化,提升整體的性能。
綜上,相比與類似的 CDC 產品來說,CloudCanal 簡單輕巧並集成一體化的操作占據了很大的優勢。

Flink 流式計算中不僅要訂閱日誌伺服器的日誌埋點信息,同樣需要業務資料庫中的信息,通過 CDC 工具訂閱數據,能減少查詢對業務資料庫產生的壓力還能以流的形式傳輸,方便與日誌伺服器中的數據進行關聯處理。
實際開發中,可以將業務資料庫中的信息提取過濾之後動態的放入 Hbase 中作為維度數據,方便相關聯的寬表進行關聯查詢;
也可以對數據進行開窗、分組、聚合,同樣也可以下沉到其他的 Kafka 消費者組中,實現數據的分層。
image.png

操作示例

前置條件

  • 本例使用 Envelop 消息格式,關係型資料庫 MySQL 為示例,展示 MySQL 對接 Flink 的 Demo
  • 登陸 CloudCanal SaaS版,使用參見快速上手文檔
  • 準備好 1 個 MySQL 實例,1 個 Kafka 實例(本例使用自己搭建的 MySQL 5.6,阿裡雲 Kafka 2.2)
  • 準備好 Flink 消費端程式,配置好相關信息:flink-demo 下載
  • 登錄 CloudCanal 平臺,添加 Kafka,MySQL

截屏2022-08-17 17.12.13.png

  • Kafka 自定義一個主題 topic_1,並創建一條 MySQL -> Kafka 鏈路作為增量數據來源

任務創建

  • 首先配置 **FlinkDemo 程式的 **阿裡雲 Kafka 相關信息

截屏2022-08-17 17.09.12.png

  • 運行 FlinkDemo 程式,等待消費 MySQL 同步 Kafka 的數據(程式不要關閉)

截屏2022-08-17 17.08.50.png

  • **任務管理 **-> **任務創建 **
  • 測試鏈接並選擇 目標 資料庫,**並選擇 DebeziumEnvelope 消息格式,和 topic_1 主題 **(在阿裡雲里提前創建)

截屏2022-08-17 17.08.18.png

  • 選擇 數據同步,不勾選 全量數據初始化,其他選項預設

截屏2022-08-17 17.07.46.png

  • 選擇需要遷移同步的表 **table1 **和對應的 Kafka 主題 topic_1

截屏2022-08-17 17.07.19.png

持續點擊下一步,並創建出數據同步任務。

  • 向 **MySQL 生成數據,MySQL **-> Kafka(topic_1) -> Flink
  • FlinkDemo 接收到 Kafka(topic_1) 數據,下沉到 topic_2 主題,列印並輸出;這裡 Flink 程式可以做更多的流式計算的操作,FlinkDemo 只是演示了最基本的數據傳輸案例

截屏2022-08-17 17.10.05.png

常見問題

還支持哪些源端數據源呢?

目前開放 MySQL、Oracle,SQLServer,Postgres,MongoDB 到 Kafka,如果各位有需求,可以在社區反饋給我們。

支持 DDL 消息同步嗎?

目前 關係型數據到 kafka 是支持 DDL 消息的同步的,可以將 關係型資料庫 DDL 的變化同步到 Kafka 當中。

總結

本文簡單介紹瞭如何使用 CloudCanal  進行 MySQL -> Kafka -> Flink 數據遷移同步。各位讀者朋友,如果你覺得還不錯,請點贊、評論加轉發吧。

加入CloudCanal粉絲群掌握一手消息和獲取更多福利,請添加我們小助手微信:suhuayue001

CloudCanal-免費好用的企業級數據同步工具,歡迎品鑒。
瞭解更多產品可以查看官方網站http://www.clougence.com
CloudCanal社區https://www.askcug.com/


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 一、項目命名 SCADA:數據採集與監控系統 SCADA(Supervisory Control And Data Acquisition),即數據採集與監視控制系統。 SCADA.Common SCADA.Main SCADA.UI SCADA.Communication SCADA.HMI SC ...
  • Dockerfile 基本結構 Dockerfile 是一個文本格式的配置文件,用戶可以使用 Dockerfile 快速創建自定義鏡像。 Dockerfile 由一行行命令語句組成,並且支持以 # 開頭的註釋行。 Docker分為四部分: 基礎鏡像信息 維護者信息 鏡像操作指令 容器啟動時預設要執行 ...
  • 快速代碼 # nfs的Server配置文件和配置方法 echo '/newnfs 192.168.3.*(rw,sync,no_root_squash)' >> /etc/exports # 根目錄新建文件夾,許可權777 mkdir /newnfs && chmod 777 /newnfs# 重新啟 ...
  • ping ping命令用於測試兩台主機之間是否可以通信,一般情況下會使用ping www.baidu.com來測試網路連通性,如果不指定發送包的個數預設是一直發送數據包,可以使用Ctrl+C停止。網路聯通時就會收到回覆,只要出現的不是以下信息就要根據具體的情況進行排錯。 ping原理:ping命令使 ...
  • top top命令相當於任務管理器。在top命令中,可以使用M,將進程列表按記憶體使用排序,使用P將進程列表按照CPU的使用情況排序,輸入q退出。 (1)第一行是任務隊列信息,顯示系統時間、運行時間、當前有幾個登錄用戶、負載均衡,load average後面的三個參數分別表示1分鐘、5分鐘、15分鐘的 ...
  • root用戶和普通用戶 雖然root用戶的的許可權很大,但一般情況下,我們都不會直接使用root用戶而是創建一個普通用戶,這樣可以避免因為許可權過大帶來的一些誤操作,當使用一些需要許可權的操作時,可以使用sudo命令以管理員身份執行該操作。 su和sudo su命令不加參數時,預設是切換到root用戶,但 ...
  • ​ 點亮 ⭐️ Star · 照亮開源之路 GitHub:https://github.com/apache/dolphinscheduler Apache DolphinScheduler是一款非常不錯的調度工具,可單機可集群可容 器,可調度sql、存儲過程、http、大數據等,也可使用shell ...
  • 今天我會進行StoneDB資料庫在Debian系統下的安裝。 官方文檔中沒有說明在Debian系統的安裝步驟,我來試試能否順利安裝。 準備Debian系統 我是在本地使用虛擬機安裝的Debian 11.2系統,安裝過程比較順利,安裝完成後。先為Debian系統裝上SSH,然後通過SSH連接虛擬機命令 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...