使用 Apache SeaTunnel 實現 Kafka Source 解析複雜Json 案例

来源:https://www.cnblogs.com/seatunnel/archive/2023/07/19/17566423.html
-Advertisement-
Play Games

![file](https://img2023.cnblogs.com/other/3195851/202307/3195851-20230719181113912-393860575.jpg) > 版本說明: > > SeaTunnel:apache-seatunnel-2.3.2-SNAPHOT ...


file

版本說明:

SeaTunnel:apache-seatunnel-2.3.2-SNAPHOT

引擎說明:

Flink:1.16.2

Zeta:官方自帶

前言

近些時間,我們正好接手一個數據集成項目,數據上游方是給我們投遞到Kafka,我們一開始的技術選型是SpringBoot+Flink對上游數據進行加工處理(下文簡稱:方案一),由於測試不到位,後來到線上,發現數據寫入效率完全不符合預期。後來將目光轉到開源項目SeaTunnel上面,發現Source支持Kafka,於是開始研究測試,開發環境測試了500w+數據,發現效率在10000/s左右。果斷放棄方案一,採取SeaTunnel對數據進行集成加工(下文簡稱:方案二)。在SeaTunnel研究的過程中,總結了兩種方法,方法二相較於方法一,可以實現全場景使用,無需擔心欄位值裡面各種意想不到的字元對數據落地造成錯位現象的發生。

對比

file

在方案二的基礎上又衍生出兩種方法
file

所以,在經過長時間的探索和我們線上驗證得出結論,建議使用方案二的方法二。

好了,我們進入正文,主篇幅主要介紹方案二中的兩種方法,讓大家主觀的感受SeaTunnel的神奇。

方案一 Springboot+Flink實現Kafka 複雜JSON的解析

網上案例很多,在此不做過多介紹。

方案二 SeaTunnel實現Kafka複雜JSON的解析

在開始介紹之前,我們看一下我們上游方投遞Kafka的Json樣例數據(對部分數據進行了敏感處理),如下:

    "magic": "a***G",
    "type": "D***",
    "headers": null,
    "messageSchemaId": null,
    "messageSchema": null,
    "message": {
        "data": {
            "LSH": "187eb13****l0214723",
            "NSRSBH": "9134****XERG56",
            "QMYC": "01*****135468",
            "QMZ": "1Zr*****UYGy%2F5bOWtrh",
            "QM_SJ": "2023-05-05 16:42:10.000000",
            "YX_BZ": "Y",
            "ZGHQ_BZ": "Y",
            "ZGHQ_SJ": "2023-06-26 16:57:17.000000",
            "SKSSQ": 202304,
            "SWJG_DM": "",
            "SWRY_DM": "00",
            "CZSJ": "2023-05-05 16:42:10.000000",
            "YNSRSBH": "9134****XERG56",
            "SJTBSJ": "2023-06-26 19:29:59.000",
            "SJCZBS": "I"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "12440977",
            "timestamp": "2023-06-26T19:29:59.673000",
            "streamPosition": "00****3.16",
            "transactionId": "000***0006B0002",
            "changeMask": "0FFF***FF",
            "columnMask": "0F***FFFF",
            "transactionEventCounter": 1,
            "transactionLastEvent": false
        }
    }
}

方法一、不通過UDF函數實現

存在問題:欄位值存在分隔符,例如‘,’ 則數據在落地的時候會發生錯位現象。

該方法主要使用官網 transform-v2的各種轉換插件進行實現,主要用到的插件有 ReplaceSplit以及Sql實現

ST腳本:(ybjc_qrqm.conf)

env {
  execution.parallelism = 100
  job.mode = "STREAMING"
  job.name = "kafka2mysql_ybjc"
  execution.checkpoint.interval = 60000
}
 
source {
  Kafka {
    result_table_name = "DZFP_***_QRQM1"
    topic = "DZFP_***_QRQM"
    bootstrap.servers = "centos1:19092,centos2:19092,centos3:19092"
    schema = {
      fields {
        message =  {
            data = {
                LSH = "string",
                NSRSBH =  "string",
                QMYC =  "string",
                QMZ =  "string",
                QM_SJ =  "string",
                YX_BZ =  "string",
                ZGHQ_BZ =  "string",
                ZGHQ_SJ =  "string",
                SKSSQ =  "string",
                SWJG_DM = "string",
                SWRY_DM = "string",
                CZSJ = "string",
                YNSRSBH = "string",
                SJTBSJ = "string",
                SJCZBS = "string"
            }
        }
      }  
    }
	  start_mode = "earliest"
    #start_mode.offsets = {
    #     0 = 0
    #     1 = 0
    #     2 = 0
    #}	
    kafka.config = {
      auto.offset.reset = "earliest"
      enable.auto.commit = "true"
	    # max.poll.interval.ms = 30000000
	    max.partition.fetch.bytes = "5242880"
	    session.timeout.ms = "30000"
      max.poll.records = "100000"
    }
  }
}

transform {
 Replace {
    source_table_name = "DZFP_***_QRQM1"
    result_table_name = "DZFP_***_QRQM2"
    replace_field = "message"
    pattern = "[["
    replacement = ""
    #is_regex = true
	  #replace_first = true
  }
  Replace {
    source_table_name = "DZFP_***_QRQM2"
    result_table_name = "DZFP_***_QRQM3"
    replace_field = "message"
    pattern = "]]"
    replacement = ""
    #is_regex = true
	  #replace_first = true
  }
  
  Split {
    source_table_name = "DZFP_***_QRQM3"
    result_table_name = "DZFP_***_QRQM4"
    # 存在問題: 如果欄位值存在分隔符 separator,則數據會錯位
    separator = ","
    split_field = "message"
    # 你的第一個欄位包含在zwf5裡面,,前五個占位符是固定的。
    output_fields = [zwf1,zwf2,zwf3,zwf4,zwf5,nsrsbh,qmyc,qmz,qm_sj,yx_bz,zghq_bz,zghq_sj,skssq,swjg_dm ,swry_dm ,czsj ,ynsrsbh ,sjtbsj ,sjczbs]
  }
  
  sql{
   source_table_name = "DZFP_***_QRQM4"
   query = "select replace(zwf5 ,'fields=[','') as lsh,nsrsbh,trim(qmyc) as qmyc,qmz,qm_sj,yx_bz, zghq_bz,zghq_sj,skssq,swjg_dm ,swry_dm ,czsj ,ynsrsbh ,sjtbsj ,replace(sjczbs,']}]}','') as sjczbs  from DZFP_DZDZ_QRPT_YWRZ_QRQM4 where skssq <> ' null'"
   result_table_name = "DZFP_***_QRQM5"
  }
  
}

sink {
	Console {
		source_table_name = "DZFP_***_QRQM5"
	}
	
    jdbc {
	    source_table_name = "DZFP_***_QRQM5"
        url = "jdbc:mysql://localhost:3306/dbname?serverTimezone=GMT%2b8"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "user"
        password = "pwd"
        batch_size = 200000
        database = "dbname"
        table = "tablename"
				generate_sink_sql = true
        primary_keys = ["nsrsbh","skssq"]
    } 
}

正常寫入數據是可以寫入了。

寫入成功如下:

● kafka源數據:
file
● tidb目標數據:
file
現在我們模擬給kafka發送一條數據,其中,SJTBSJ欄位我在中間設置一個, 是逗號。

原始值:2023-06-26 19:29:59.000 更改之後的值2023-06-26 19:29:59.0,00

往topic生產一條數據命令
kafka-console-producer.sh --topic DZFP_***_QRQM --broker-list centos1:19092,centos2:19092,centos3:19092

發送如下:

    "magic": "a***G",
    "type": "D***",
    "headers": null,
    "messageSchemaId": null,
    "messageSchema": null,
    "message": {
        "data": {
            "LSH": "187eb13****l0214723",
            "NSRSBH": "9134****XERG56",
            "QMYC": "01*****135468",
            "QMZ": "1Zr*****UYGy%2F5bOWtrh",
            "QM_SJ": "2023-05-05 16:42:10.000000",
            "YX_BZ": "Y",
            "ZGHQ_BZ": "Y",
            "ZGHQ_SJ": "2023-06-26 16:57:17.000000",
            "SKSSQ": 202304,
            "SWJG_DM": "",
            "SWRY_DM": "00",
            "CZSJ": "2023-05-05 16:42:10.000000",
            "YNSRSBH": "9134****XERG56",
            "SJTBSJ": "2023-06-26 19:29:59.0,00",
            "SJCZBS": "I"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "12440977",
            "timestamp": "2023-06-26T19:29:59.673000",
            "streamPosition": "00****3.16",
            "transactionId": "000***0006B0002",
            "changeMask": "0FFF***FF",
            "columnMask": "0F***FFFF",
            "transactionEventCounter": 1,
            "transactionLastEvent": false
        }
    }
}

寫入之後,發現數據錯位了。
file

結論:其實這個問題線上還是能遇到的,比如地址欄位裡面含有逗號,備註信息裡面含有逗號等等,這種現象是不可避免的,所以此種方案直接pass。對數據危害性極大!可以處理簡單的數據,當做一種思路。

方法二:通過UDF函數實現

該方法通過UDF函數擴展(https://seatunnel.apache.org/docs/2.3.2/transform-v2/sql-udf)的方式,實現嵌套kafka source json源數據的解析。可以大大簡化ST腳本的配置

ST腳本:(ybjc_qrqm_yh.conf)

env {
    execution.parallelism = 5
    job.mode = "STREAMING"
    job.name = "kafka2mysql_ybjc_yh"
    execution.checkpoint.interval = 60000
}

source {
    Kafka {
        result_table_name = "DZFP_***_QRQM1"
        topic = "DZFP_***_QRQM"
        bootstrap.servers = "centos1:19092,centos2:19092,centos3:19092"
        schema = {
        fields {
             message =  {
                data = "map<string,string>"
             }
        }  
        }
        start_mode = "earliest"
        #start_mode.offsets = {
        #     0 = 0
        #     1 = 0
        #     2 = 0
        #}	
        kafka.config = {
        auto.offset.reset = "earliest"
        enable.auto.commit = "true"
        # max.poll.interval.ms = 30000000
        max.partition.fetch.bytes = "5242880"
        session.timeout.ms = "30000"
        max.poll.records = "100000"
        }
    }
}
transform {
    sql{
        source_table_name = "DZFP_***_QRQM1"
        result_table_name = "DZFP_***_QRQM2"
        # 這裡的qdmx就是我自定義的UDF函數,具體實現下文詳細講解。。。
        query = "select qdmx(message,'lsh') as lsh,qdmx(message,'nsrsbh') as nsrsbh,qdmx(message,'qmyc') as qmyc,qdmx(message,'qmz') as qmz,qdmx(message,'qm_sj') as qm_sj,qdmx(message,'yx_bz') as yx_bz,qdmx(message,'zghq_bz') as zghq_bz,qdmx(message,'zghq_sj') as zghq_sj,qdmx(message,'skssq') as skssq,qdmx(message,'swjg_dm') as swjg_dm,qdmx(message,'swry_dm') as swry_dm,qdmx(message,'czsj') as czsj,qdmx(message,'ynsrsbh') as ynsrsbh, qdmx(message,'sjtbsj') as sjtbsj,qdmx(message,'sjczbs') as sjczbs  from  DZFP_DZDZ_QRPT_YWRZ_QRQM1"
    }
}

    
sink {
    Console {
        source_table_name = "DZFP_***_QRQM2"
    }
    jdbc {
		  source_table_name = "DZFP_***_QRQM2"
		  url = "jdbc:mysql://localhost:3306/dbname?serverTimezone=GMT%2b8"
		  driver = "com.mysql.cj.jdbc.Driver"
		  user = "user"
		  password = "pwd"
		  batch_size = 200000
		  database = "dbname"
		  table = "tablename"
		  generate_sink_sql = true
		  primary_keys = ["nsrsbh","skssq"]
	} 
}

執行腳本:查看結果,發現並沒有錯位,還在原來的欄位(sjtbsj)上面。

這種方法,是通過key獲取value值。不會出現方法一中的按照逗號分割出現數據錯位現象。
file

具體UDF函數編寫如下。

maven引入如下:

<dependencies>
  <dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>seatunnel-transforms-v2</artifactId>
    <version>2.3.2</version>
    <scope>provided</scope>
  </dependency>

  <dependency>
    <groupId>org.apache.seatunnel</groupId>
    <artifactId>seatunnel-api</artifactId>
    <version>2.3.2</version>
  </dependency>

  <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.8.20</version>
  </dependency>

  <dependency>
    <groupId>com.google.auto.service</groupId>
    <artifactId>auto-service-annotations</artifactId>
    <version>1.1.1</version>
    <optional>true</optional>
    <scope>compile</scope>
  </dependency>

  <dependency>
    <groupId>com.google.auto.service</groupId>
    <artifactId>auto-service</artifactId>
    <version>1.1.1</version>
    <optional>true</optional>
    <scope>compile</scope>
  </dependency>
</dependencies>  

UDF具體實現java代碼如下:

package org.seatunnel.sqlUDF;

import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.google.auto.service.AutoService;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.transform.sql.zeta.ZetaUDF;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@AutoService(ZetaUDF.class)
public class QdmxUDF implements ZetaUDF {

    @Override
    public String functionName() {
        return "QDMX";
    }

    @Override
    public SeaTunnelDataType<?> resultType(List<SeaTunnelDataType<?>> list) {
        return BasicType.STRING_TYPE;
    }

    // list 參數實例:(也就是kafka 解析過來的數據)
    //SeaTunnelRow{tableId=, kind=+I, fields=[{key1=value1,key2=value2,.....}]}
    @Override
    public Object evaluate(List<Object> list) {
        String str = list.get(0).toString();
        //1 Remove the prefix
        str = StrUtil.replace(str, "SeaTunnelRow{tableId=, kind=+I, fields=[{", "");
        //2 Remove the suffix
        str = StrUtil.sub(str, -3, 0);
        // 3 build Map key value
        Map<String, String> map = parseToMap(str);
        if ("null".equals(map.get(list.get(1).toString())))
            return "";
        // 4 return the value of the key
        return map.get(list.get(1).toString());
    }

    public static Map<String, String> parseToMap(String input) {
        Map<String, String> map = new HashMap<>();
        // 去除大括弧 在字元串階段去除
        // input = input.replaceAll("[{}]", "");
        // 拆分鍵值對
        String[] pairs = input.split(", ");

        for (String pair : pairs) {
            String[] keyValue = pair.split("=");
            if (keyValue.length == 2) {
                String key = keyValue[0].trim().toLowerCase();
                String value = keyValue[1].trim();
                map.put(key, value);
            }
        }
        return map;
    }
}

然後打包,打包命令如下:

mvn -T 8 clean install -DskipTests -Dcheckstyle.skip -Dmaven.javadoc.skip=true

查看META-INF/services, 看註解@AutoService 是否生成對應的spi介面:

如下:則打包成功!
file
如果沒有,則打包失敗,UDF函數無法使用.

可以參考我的打包插件:

  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-site-plugin</artifactId>
      <version>3.7</version>
      <dependencies>
        <dependency>
          <groupId>org.apache.maven.doxia</groupId>
          <artifactId>doxia-site-renderer</artifactId>
          <version>1.8</version>
        </dependency>
      </dependencies>
    </plugin>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-compiler-plugin</artifactId>
      <version>3.8.1</version>
      <configuration>
        <annotationProcessorPaths>
          <path>
            <groupId>com.google.auto.service</groupId>
            <artifactId>auto-service</artifactId>
            <version>1.1.1</version>
          </path>
        </annotationProcessorPaths>
      </configuration>
    </plugin>
  </plugins>
</build>

最終打成的jar包放到 ${SEATUNNEL_HOME}/lib目錄下,由於我的UDF函數引入了第三方jar包,也需要一併上傳。如果是Zeta集群,需要重啟Zeta集群才能生效。其他引擎實時生效。
最終上傳成功如下:
file
說明:這個hutool-all的jar包可以含在java_study這個項目裡面,我圖方便,上傳了兩個。

綜上,推薦使用通過UDF函數擴展的方式,實現嵌套kafka source json源數據的解析。

本文由 白鯨開源 提供發佈支持!


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • ## 一:背景 ### 1. 講故事 如果要分析 Linux上的 .NET程式 CPU 爆高,按以往的個性我肯定是抓個 dump 下來做事後分析,這種分析模式雖然不重但也不輕,還需要一定的底層知識,那有沒有傻瓜式的 CPU 爆高分析方式呢? 相信有很多朋友知道 **B站713事件**,最終就是用 p ...
  • > 註:本文隸屬於《理解ASP.NET Core》系列文章,請查看置頂博客或[點擊此處查看全文目錄](https://www.cnblogs.com/xiaoxiaotank/p/15185288.html) # 概述 在微服務化的架構設計中,網關扮演著重要的看門人角色,它所提供的功能之一就是**限 ...
  • ## 引言 上文[編碼技巧 同步鎖對象的選定](url)中,提到了在C#中,讓線程同步有兩種方式: - 鎖(lock、Monitor等) - 信號量(EventWaitHandle、Semaphore、Mutex) 加鎖是最常用的線程同步的方法,就不再討論,本篇主要討論使用信號量同步線程。 ## W ...
  • [TOC] # PWM脈衝寬調點燈 ## 前言 對於燈等來說有很多種方法,前面介紹了一些基礎的點燈方法,比如直接點燈,按鍵控制點燈,按鍵中斷點燈,但都是比較簡單的一些方法也很基礎,要問我有沒有什麼高級點的點燈方法,答案是有的,在這我要介紹一種高級點燈的方法就是使用PWM進行點燈。 ## 1.什麼是P ...
  • 大家好,我是痞子衡,是正經搞技術的痞子。今天痞子衡給大家介紹的是**恩智浦i.MXRT1xxx系列MCU的Serial NAND啟動**。 最近越來越多的客戶在咨詢 i.MXRT1xxx 從 Serial NAND 啟動的事情,讓這個本來比較冷門的啟動設備突然火熱起來。據痞子衡的瞭解,其實客戶主要目 ...
  • # 安裝 安裝 ```bash sudo apt install samba ``` 檢查服務狀態 ```bash systemctl status smbd --no-pager -l ``` 檢查是否啟用(開機自啟動) ```bash systemctl is-enabled smbd # en ...
  • ![](https://img2023.cnblogs.com/blog/3076680/202307/3076680-20230719144557396-616589792.png) # 1. 結果集分頁 ## 1.1. 只有做過了排序,才有可能準確地從結果集中返回指定區間的記錄 ## 1.2.  ...
  • 一、約束的基本概念 1、概念:約束是作用於表中欄位上的規則,用於限制儲存在表中的數據 2、目的:保證資料庫中的數據的正確性,有效性和完整性 3、分類 非空約束(not null):限制該欄位的數據不能為null 唯一約束(unique):保證該欄位的所有數據都是唯一,不重覆的 主鍵約束(primar ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...