Flink CDC 監聽 Postgresql表的變化

来源:https://www.cnblogs.com/dk168/archive/2023/02/17/17131614.html
-Advertisement-
Play Games

系列內容 elasticsearch 概述 elasticsearch 安裝 elasticsearch 查詢 客戶端api使用 1. elasticsearch 概述 1.1 簡介 官網: https://www.elastic.co/ Elasticsearch (簡稱ES)是一個分散式、RES ...


前言

最近看文章說如何把Postgresql的數據同步給別的數據源,可以利用它的WAL,具體怎麼操作沒有說,我自己找到一篇文章https://www.cnblogs.com/xiongmozhou/p/14817641.html 可以利用Flink CDC。 我自己正好前段時間也看過Flink,把這個知識串起來也很有意義,於是開始動手試了一下,期間也遇到些困難,也嘗試解決了,有些原理不是很清晰,記錄下來,後面看能不能解決。

Postgresql配置

我們使用上篇文章搭建的Postgresql資料庫,要讓Postgresql支持同步給其它數據源,一個最關鍵的配置是更改wal日誌方式為logical, 這個配置在postgresql.conf, 而我們docker裡面的postgresql.conf這個配置又在哪個目錄呢? 網上找到了答案:https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
進入psql後,使用如下命令

SHOW config_file;

得到如下的結果
/var/lib/postgresql/data/postgresql.conf
得到路徑後, 我打算像平時一樣用vi去修改,發現不行,這個postgresql的Image並沒有安裝vim。
如何修改呢,繼續網上找答案 https://stackoverflow.com/questions/30848670/how-to-customize-the-configuration-file-of-the-official-postgresql-docker-image
方法很多,我們用個簡單的,使用sed命令來修改

sed -i -e"s/^#wal_level = replica.*$/wal_level = logical/" /var/lib/postgresql/data/postgresql.conf

就是查找到“#wal_level = replica“,把它替換為“wal_level = logical”
修改後需要重啟postgresql,執行如下命令

su - postgres -c "PGDATA=$PGDATA /usr/lib/postgresql/15/bin/pg_ctl -w restart"

執行後會退出docker,需要重新進入

新建用戶和授予許可權參考https://www.cnblogs.com/xiongmozhou/p/14817641.html
註意文檔中使用CREATE USER user它建的用戶是user,我用的這個用戶名是不成功的,提示語法錯誤
感覺是把user當作保留命令參數了,用戶名改為user1可以成功。

我們參考官方文檔https://ververica.github.io/flink-cdc-connectors/master/content/connectors/postgres-cdc.html#connector-options
首先在已有的Flink項目中加入如下的pom

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-postgres-cdc</artifactId>
            <version>2.3.0</version>
            <scope>provided</scope>
        </dependency>

這裡代碼參考文檔

        SourceFunction postgreSQLSource = PostgreSQLSource.<String>builder()
                .hostname("localhost")
                .port(5432)
                .database("postgres") // set captured database
                .tableList("postgres.market_price") // set captured table
                .username("user1")
                .password("pwd")
                .decodingPluginName("pgoutput")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env
        .addSource(postgreSQLSource)
        .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("Print Postgres Snapshot + WAL");

有一點需要註意,官方文檔中沒有.decodingPluginName("pgoutput"),使用預設的decoderbufs,運行程式會提示
“PSQLException: ERROR: could not access file "decoderbufs": No such file or directory”, 修改成pgoutput,才能成功。 這裡應該是要安裝插件decoderbufs在Postgresql裡面。這裡暫時留下這個疑問,後面還有wal2json,看怎麼把wal的值轉成json格式顯示出來。

程式運行起來後我們往表裡插入和刪除數據,可以在控制臺中列印出變化來。
這裡直接貼圖
image

這裡也有個疑問,我對錶操作了三次,結果控制台列印出超過3條的信息,這裡應該和是否commit有關
暫時也沒有細究。

程式運行後,我們可以使用這個命令查看這個slot,
SELECT * FROM pg_replication_slots;
image

如果我們直接修改配置,比如把pgoutput改為別的,會提示slot flink已經存在,我們需要在postgresql裡面把它先刪除掉。

總結

總體上這個流程是打通了,但是對於裡面的細節沒有深入,比如flink怎麼消費,裡面的記錄怎麼顯示出來,它裡面實現的原理是什麼,都需要花時間去研究,先開個頭在這裡。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 前言:用於展示生產線數據相關信息 在車輛生產線的小屏幕上【西門子的,比1980*1080的要小一圈,比pad要大一圈,專門給生產線做的】展示數據。數據用的flex佈局,很簡單的解決了自適應的問題。效果很好。 但,image展示的圖片的寬高不勻稱,比例不同。 這是開始的代碼,只摘取了關鍵部分: <bo ...
  • defineExpose要在變數和方法聲明定義之後再使用,否則瀏覽器的控制台會輸出很多警告,並且最終將該頁面卡死。 ...
  • 今天,有個群友在群里提問,使用 CSS 能否實現下述這個圖形: emmm,中間這個酷似三次貝塞爾曲線的造型,使用 CSS 不太好實現。我的建議是切圖實現,然而群友要求一定要用 CSS 實現。 雖然麻煩,但是這個圖形勉強也是能用 CSS 實現的。本文就將探討一下上述圖形的純 CSS 實現方式,並且從中 ...
  • 在分散式系統中, 什麼是拜占庭將軍問題?產生的場景和解決方案是什麼?什麼是 Raft 共識演算法?Raft 演算法是如何解決拜占庭將軍問題的?其核心原理和演算法邏輯是什麼?除了 Raft,還有哪些共識演算法?共識問題作為分散式系統的一大難點和痛點,本文主要介紹了其產生的背景、原因,以及通用的 Raft 演算法... ...
  • 談到java中的併發,我們就避不開線程之間的同步和協作問題,談到線程同步和協作我們就不能不談談jdk中提供的AbstractQueuedSynchronizer(翻譯過來就是抽象的隊列同步器)機制; (一)、AQS中的state和Node含義: AQS中提供了一個int volatile state ...
  • 題目來源:https://www.acwing.com/problem/content/description/789/ 題目描述 給定你一個長度為 n 的整數數列。 請你使用歸併排序對這個數列按照從小到大進行排序。 並將排好序的數列按順序輸出。 輸入格式 輸入共兩行,第一行包含整數 n。 第二行包 ...
  • SpringMVC文件上傳 1.基本介紹 SpringMVC 為文件上傳提供了直接的支持,這種支持是通過即插即用的 MultipartResolver 實現的。spring 用 Jacarta Commons FileUpload 技術實現了一個 MultipartResolver 的實現類:Com ...
  • 對於廣大書蟲而言,沒有小說看是最痛苦的,你身邊有這樣的人嗎? 今天咱們分享一個小說下載器代碼,打包成exe後,發給你的小伙伴也能直接使用… 思路流程 什麼是爬蟲? 按照一定的規則, 去採集互聯網上面數據 爬蟲可以做什麼? 採集數據: 定製化採集數據 自動化腳本:自動點贊/評論/刷票/商品搶購腳本/自 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...