CDC CDC 是 Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲資料庫的變動(包括數據或數據表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。 CDC 的種類 CDC 主要分為基於查詢和基於 Binl ...
CDC
CDC 是 Change Data Capture(變更數據獲取)的簡稱。核心思想是,監測並捕獲資料庫的變動(包括數據或數據表的插入、更新以及刪除等),將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。
CDC 的種類
CDC 主要分為基於查詢和基於 Binlog 兩種方式,我們主要瞭解一下這兩種之間的區別:
基於查詢的 CDC | 基於 Binlog 的 CDC | |
---|---|---|
開源產品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
執行模式 | Batch | Streaming |
是否可以捕獲所有數據變化 | 否 | 是 |
延遲性 | 高延遲 | 低延遲 |
是否增加資料庫壓力 | 是 | 否 |
FlinkCDC
Flink 社區開發了 flink-cdc-connectors 組件,這是一個可以直接從 MySQL、PostgreSQL 等資料庫直接讀取【全量數據】和【增量變更數據】的 source 組件。而不需要使用類似 Kafka 之類的中間件中轉數據
目前也已開源,開源地址:https://github.com/ververica/flink-cdc-connectors
Connector | Database | Driver |
---|---|---|
mongodb-cdc | MongoDB: 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 |
mysql-cdc | MySQL: 5.6, 5.7, 8.0.x RDS MySQL: 5.6, 5.7, 8.0.x PolarDB MySQL: 5.6, 5.7, 8.0.x Aurora MySQL: 5.6, 5.7, 8.0.x MariaDB: 10.x PolarDB X: 2.0.1 |
JDBC Driver: 8.0.27 |
oceanbase-cdc | OceanBase CE: 3.1.x OceanBase EE (MySQL mode): 2.x, 3.x |
JDBC Driver: 5.1.4x |
oracle-cdc | Oracle: 11, 12, 19 | Oracle Driver: 19.3.0.0 |
postgres-cdc | PostgreSQL: 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
sqlserver-cdc | Sqlserver: 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |
tidb-cdc | TiDB: 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 |
db2-cdc | Db2: 11.5 | DB2 Driver: 11.5.0.0 |
DataStream:
- 優點: 多庫多表
- 缺點: 需要自定義反序列化器(但靈活)
FlinkSQL: - 優點: 不需要自定義反序列化器
- 缺點: 單表
Demo
註意開啟 binlog_format=ROW
my.ini
log-bin=mysql-bin
#binlog_format="STATEMENT"
binlog_format="ROW"
#binlog_format="MIXED"
#service-id=1
POM
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>
</dependencies>
基於 DataStream
CustomerDeserialization.java
package com.vipsoft;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
/**
* 封裝的數據格式
* {
* "database":"",
* "tableName":"",
* "before":{"id":"","tm_name":""....},
* "after":{"id":"","tm_name":""....},
* "type":"c u d",
* //"ts":156456135615
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.創建JSON對象用於存儲最終數據
JSONObject result = new JSONObject();
//2.獲取庫名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct) sourceRecord.value();
//3.獲取"before"數據
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.獲取"after"數據
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.獲取操作類型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//6.將欄位寫入JSON對象
result.put("database", database);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//7.輸出數據
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
FlinkCDC.java
package com.vipsoft;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1.獲取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.通過FlinkCDC構建SourceFunction並讀取數據
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("localhost")
.serverTimeZone("GMT+8") //時區報錯增加這個設置
.port(3306)
.username("root")
.password("110")
.databaseList("springboot")
.tableList("springboot.sys_user") //如果不添加該參數,則消費指定資料庫中所有表的數據.如果指定,指定方式為db.table
//.deserializer(new StringDebeziumDeserializationSchema())
.deserializer(new CustomerDeserialization()) //使用自定義反序列化器
.startupOptions(StartupOptions.initial())
.build();
DataStreamSource<String> streamSource = env.addSource(sourceFunction);
//3.列印數據
streamSource.print();
//4.啟動任務
env.execute("FlinkCDC");
}
}
運行效果
- 預設 StringDebeziumDeserializationSchema
- 自定義反序列化器
FlinkSQL
package com.vipsoft;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkCDCWithSQL {
public static void main(String[] args) throws Exception {
//1.獲取執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2.DDL方式建表
tableEnv.executeSql("CREATE TABLE mysql_binlog ( " +
" id STRING NOT NULL, " +
" username STRING, " +
" nick_name STRING " +
") WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = 'localhost', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '110', " +
" 'database-name' = 'springboot', " +
" 'table-name' = 'sys_user' " +
")");
//3.查詢數據
Table table = tableEnv.sqlQuery("select * from mysql_binlog");
//4.將動態表轉換為流
DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
retractStream.print();
//5.啟動任務
env.execute("FlinkCDCWithSQL");
}
}
運行效果
對比
通過對比,FlinkCDC 最舒服
FlinkCDC | Maxwell | Canal | |
---|---|---|---|
斷點續傳 | CK | MySQL | 本地磁碟 |
SQL -> 數據 | 無 | 無 | 一對一(炸開處理) |
初始化功能 | 有(多庫多表) | 有(單表) | 無(單獨查詢歷史數據) |
封裝格式 | 自定義 | JSON | JSON(c/s自定義) |
高可用 | 運行集群高可用 | 無 | 集群(ZK) |
插入對比
插入兩條數據
INSER INTO z_user_info VALUES(30,'zhang3','13800000000'),(31,'li4','13999999999')
FlinkCDC 每條變化都會產生一條 json
Maxwell 每條變化都會產生一條 json
Canal 一次性執行的SQL,會產生一條JSON(兩條數據組合在一起)【不方便,需要炸開解析】
更新對比
UPDATE z_user_info SET user_name='wang5' WHERE id IN(30,31)
FlinkCDC 包括了修改前的 before 數據
Maxwell 不包括修改前的數據
Canal 仍然是一條json
刪除對比
DELETE FROM z_user_info WHERE id IN(30,31)
FlinkCDC 兩條刪除的 json 數據
Maxwell
Canal