系列內容 elasticsearch 概述 elasticsearch 安裝 elasticsearch 查詢 客戶端api使用 1. elasticsearch 概述 1.1 簡介 官網: https://www.elastic.co/ Elasticsearch (簡稱ES)是一個分散式、RES ...
前言
最近看文章說如何把Postgresql的數據同步給別的數據源,可以利用它的WAL,具體怎麼操作沒有說,我自己找到一篇文章https://www.cnblogs.com/xiongmozhou/p/14817641.html 可以利用Flink CDC。 我自己正好前段時間也看過Flink,把這個知識串起來也很有意義,於是開始動手試了一下,期間也遇到些困難,也嘗試解決了,有些原理不是很清晰,記錄下來,後面看能不能解決。
Postgresql配置
我們使用上篇文章搭建的Postgresql資料庫,要讓Postgresql支持同步給其它數據源,一個最關鍵的配置是更改wal日誌方式為logical, 這個配置在postgresql.conf, 而我們docker裡面的postgresql.conf這個配置又在哪個目錄呢? 網上找到了答案:https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
進入psql後,使用如下命令
SHOW config_file;
得到如下的結果
/var/lib/postgresql/data/postgresql.conf
得到路徑後, 我打算像平時一樣用vi去修改,發現不行,這個postgresql的Image並沒有安裝vim。
如何修改呢,繼續網上找答案 https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
方法很多,我們用個簡單的,使用sed命令來修改
sed -i -e"s/^#wal_level = replica.*$/wal_level = logical/" /var/lib/postgresql/data/postgresql.conf
就是查找到“#wal_level = replica“,把它替換為“wal_level = logical”
修改後需要重啟postgresql,執行如下命令
su - postgres -c "PGDATA=$PGDATA /usr/lib/postgresql/15/bin/pg_ctl -w restart"
執行後會退出docker,需要重新進入
新建用戶和授予許可權參考https://www.cnblogs.com/xiongmozhou/p/14817641.html
註意文檔中使用CREATE USER user它建的用戶是user,我用的這個用戶名是不成功的,提示語法錯誤
感覺是把user當作保留命令參數了,用戶名改為user1可以成功。
使用flink-connector-postgres-cdc
我們參考官方文檔https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#connector-options
首先在已有的Flink項目中加入如下的pom
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
這裡代碼參考文檔
SourceFunction postgreSQLSource = PostgreSQLSource.<String>builder()
.hostname("localhost")
.port(5432)
.database("postgres") // set captured database
.tableList("postgres.market_price") // set captured table
.username("user1")
.password("pwd")
.decodingPluginName("pgoutput")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(postgreSQLSource)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print Postgres Snapshot + WAL");
有一點需要註意,官方文檔中沒有.decodingPluginName("pgoutput"),使用預設的decoderbufs,運行程式會提示
“PSQLException: ERROR: could not access file "decoderbufs": No such file or directory”, 修改成pgoutput,才能成功。 這裡應該是要安裝插件decoderbufs在Postgresql裡面。這裡暫時留下這個疑問,後面還有wal2json,看怎麼把wal的值轉成json格式顯示出來。
程式運行起來後我們往表裡插入和刪除數據,可以在控制臺中列印出變化來。
這裡直接貼圖
這裡也有個疑問,我對錶操作了三次,結果控制台列印出超過3條的信息,這裡應該和是否commit有關
暫時也沒有細究。
程式運行後,我們可以使用這個命令查看這個slot,
SELECT * FROM pg_replication_slots;
如果我們直接修改配置,比如把pgoutput改為別的,會提示slot flink已經存在,我們需要在postgresql裡面把它先刪除掉。
總結
總體上這個流程是打通了,但是對於裡面的細節沒有深入,比如flink怎麼消費,裡面的記錄怎麼顯示出來,它裡面實現的原理是什麼,都需要花時間去研究,先開個頭在這裡。