大數據-業務數據採集-FlinkCDC

来源:https://www.cnblogs.com/vipsoft/archive/2022/12/08/16962051.html
-Advertisement-
Play Games

CDC CDC 是 Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲資料庫的變動(包括數據或數據表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。 CDC 的種類 CDC 主要分為基於查詢和基於 Binl ...


CDC

CDC 是 Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲資料庫的變動(包括數據或數據表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。

CDC 的種類

CDC 主要分為基於查詢和基於 Binlog 兩種方式,我們主要瞭解一下這兩種之間的區別:

基於查詢的 CDC 基於 Binlog 的 CDC
開源產品 Sqoop、Kafka JDBC Source Canal、Maxwell、Debezium
執行模式 Batch Streaming
是否可以捕獲所有數據變化
延遲性 高延遲 低延遲
是否增加資料庫壓力

FlinkCDC

Flink 社區開發了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL 等資料庫直接讀取【全量數據】和【增量變更數據】的 source 組件。而不需要使用類似 Kafka 之類的中間件中轉數據
目前也已開源,開源地址:https://github.com/ververica/flink-cdc-connectors
image
image

Connector Database Driver
mongodb-cdc MongoDB: 3.6, 4.x, 5.0 MongoDB Driver: 4.3.1
mysql-cdc MySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
JDBC Driver: 8.0.27
oceanbase-cdc OceanBase CE: 3.1.x
OceanBase EE (MySQL mode): 2.x, 3.x
JDBC Driver: 5.1.4x
oracle-cdc Oracle: 11, 12, 19 Oracle Driver: 19.3.0.0
postgres-cdc PostgreSQL: 9.6, 10, 11, 12 JDBC Driver: 42.2.12
sqlserver-cdc Sqlserver: 2012, 2014, 2016, 2017, 2019 JDBC Driver: 7.2.2.jre8
tidb-cdc TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 JDBC Driver: 8.0.27
db2-cdc Db2: 11.5 DB2 Driver: 11.5.0.0

DataStream:

  • 優點: 多庫多表
  • 缺點: 需要自定義反序列化器(但靈活)
    FlinkSQL:
  • 優點: 不需要自定義反序列化器
  • 缺點: 單表

Demo

註意開啟 binlog_format=ROW
my.ini

log-bin=mysql-bin
#binlog_format="STATEMENT"
binlog_format="ROW"
#binlog_format="MIXED"
#service-id=1

image
POM

  <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.0</version>
        </dependency>
    </dependencies>

基於 DataStream

CustomerDeserialization.java

package com.vipsoft;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;

public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {

    /**
     * 封裝的數據格式
     * {
     * "database":"",
     * "tableName":"",
     * "before":{"id":"","tm_name":""....},
     * "after":{"id":"","tm_name":""....},
     * "type":"c u d",
     * //"ts":156456135615
     * }
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //1.創建JSON對象用於存儲最終數據
        JSONObject result = new JSONObject();

        //2.獲取庫名&表名
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        //3.獲取"before"數據
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        //4.獲取"after"數據
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //5.獲取操作類型  CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        //6.將欄位寫入JSON對象
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("before", beforeJson);
        result.put("after", afterJson);
        result.put("type", type);

        //7.輸出數據
        collector.collect(result.toJSONString());

    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

FlinkCDC.java

package com.vipsoft;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        //1.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //2.通過FlinkCDC構建SourceFunction並讀取數據
        DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("localhost")
                .serverTimeZone("GMT+8")  //時區報錯增加這個設置
                .port(3306)
                .username("root")
                .password("110")
                .databaseList("springboot")
                .tableList("springboot.sys_user")   //如果不添加該參數,則消費指定資料庫中所有表的數據.如果指定,指定方式為db.table
                //.deserializer(new StringDebeziumDeserializationSchema())
                .deserializer(new CustomerDeserialization()) //使用自定義反序列化器
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> streamSource = env.addSource(sourceFunction);

        //3.列印數據
        streamSource.print();

        //4.啟動任務
        env.execute("FlinkCDC");

    }
}

運行效果

  • 預設 StringDebeziumDeserializationSchema
    image
  • 自定義反序列化器
    image

FlinkSQL

package com.vipsoft;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkCDCWithSQL {

    public static void main(String[] args) throws Exception {

        //1.獲取執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.DDL方式建表
        tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
                " id STRING NOT NULL, " +
                " username STRING, " +
                " nick_name STRING " +
                ") WITH ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'hostname' = 'localhost', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = '110', " +
                " 'database-name' = 'springboot', " +
                " 'table-name' = 'sys_user' " +
                ")");

        //3.查詢數據
        Table table = tableEnv.sqlQuery("select * from mysql_binlog");

        //4.將動態表轉換為流
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
        retractStream.print();

        //5.啟動任務
        env.execute("FlinkCDCWithSQL");
    }
}

運行效果

image

對比

通過對比,FlinkCDC 最舒服

FlinkCDC Maxwell Canal
斷點續傳 CK MySQL 本地磁碟
SQL -> 數據 一對一(炸開處理)
初始化功能 有(多庫多表) 有(單表) 無(單獨查詢歷史數據)
封裝格式 自定義 JSON JSON(c/s自定義)
高可用 運行集群高可用 集群(ZK)

插入對比

插入兩條數據

INSER INTO z_user_info VALUES(30,'zhang3','13800000000'),(31,'li4','13999999999')

image
FlinkCDC 每條變化都會產生一條 json
image
Maxwell 每條變化都會產生一條 json
image
Canal 一次性執行的SQL,會產生一條JSON(兩條數據組合在一起)【不方便,需要炸開解析】
image

更新對比

UPDATE z_user_info SET user_name='wang5' WHERE id IN(30,31)

FlinkCDC 包括了修改前的 before 數據
image

Maxwell 不包括修改前的數據
image

Canal 仍然是一條json
image

刪除對比

DELETE FROM z_user_info WHERE id IN(30,31)

FlinkCDC 兩條刪除的 json 數據
image

Maxwell
image

Canal
image

【尚矽谷】Flink數據倉庫視頻教程


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • Valheim伺服器 Mod修改安裝 註意! **不建議在為通關游戲的情況下對游戲進行任何修改,這會極大的縮短游戲的新鮮度,不建議安裝任何mod及修改器** mod的安裝將直接影響你的游戲體驗 伺服器安裝mod還需要參與伺服器的玩家一併安裝mod文件 本mod非往游戲中添加游戲元素,只是在原有的基礎 ...
  • 一、SSL認證 也就是我們常說的伺服器認證,為的是啟動加密傳輸協議https,步驟如下: 1、生成證書請求 進入IIS,選擇伺服器的伺服器證書設置選項, 創建證書申請,填值如圖所示 選擇加密服務提供程式,並設置證書密鑰長度,EV證書需選擇位長2048 完成之後,會保留一條請求記錄,如圖 生成的證書請 ...
  • 在 Linux 中一切皆文件。文件管理主要是涉及文件/目錄的創建、刪除、移動、複製和查詢,有mkdir/rm/mv/cp/find 等命令。其中 find 文件查詢命令較為複雜,參數豐富,功能十分強大;查看文件內容是一個比較大的話題,文本處理也有很多工具供我們使用,本文涉及到這兩部分的內容只是點到為... ...
  • Redis項目總結--緩存更新策略 1.更新策略 | | 記憶體淘汰 | 超時剔除 | 主動更新 | | : : | : : | : : | : : | | 說明 | 不用自己維護,利用Redis記憶體淘汰機制,記憶體不足時自動淘汰部分數據,下次查詢時更新緩存 | 給緩存數據添加過期時間,到期刪除,下次查 ...
  • 全網最全的linux上docker安裝oracle的詳細文檔,遇到了n個問題,查了幾十篇文章,最終彙總版,再有解決不了的,私聊我,我幫你解決 1. 拉取阿裡鏡像oracle docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11 ...
  • 2020年1月,時間跨度長達14年的,微軟2.5億條客戶服務和支持記錄在網上泄露; 同年4月,微盟發生史上最貴“刪庫跑路”事件,造成微盟市值一夜之間縮水約24億港幣; 今年7月,網信辦依據《數據安全法》等法律法規,對滴滴公司開出人民幣80.26億元的巨額罰款,對互聯網企業敲響數據安全警鐘。 數據作為 ...
  • 本文分享自華為雲社區《GaussDB(DWS)字元串、二進位、十六進位互轉》,作者:你是猴子請來的救兵嗎 。 概述 現網中遇到很多小伙伴不清楚字元串與進位之間的轉換方法,其實在GaussDB(DWS)中,進位轉換是非常方便的。這次就來對不同的場景一一進行解析,整理出來供大家翻閱參考。 字元串&二進位 ...
  • 作者:謝澤華 背景 眾所周知單個機房在出現不可抗拒的問題(如斷電、斷網等因素)時,會導致無法正常提供服務,會對業務造成潛在的損失。所以在協同辦公領域,一種可以基於同城或異地多活機制的高可用設計,在保障數據一致性的同時,能夠最大程度降低由於機房的僅單點可用所導致的潛在高可用問題,最大程度上保障業務的用 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...