替代Flume——Kafka Connect簡介

来源:https://www.cnblogs.com/tree1123/archive/2019/08/30/11434047.html
-Advertisement-
Play Games

我們知道過去對於Kafka的定義是分散式,分區化的,帶備份機制的日誌提交服務。也就是一個分散式的消息隊列,這也是他最常見的用法。但是Kafka不止於此,打開最新的官網。 我們看到Kafka最新的定義是:Apache Kafka® is a distributed streaming platform ...


file
我們知道過去對於Kafka的定義是分散式,分區化的,帶備份機制的日誌提交服務。也就是一個分散式的消息隊列,這也是他最常見的用法。但是Kafka不止於此,打開最新的官網。

file

我們看到Kafka最新的定義是:Apache Kafka® is a distributed streaming platform

分散式流處理平臺。

file

這裡也清晰的描述了Kafka的特點:Kafka用於構建實時數據管道和流式應用程式。它具有水平可擴展性、容錯性、速度極快,併在數千家公司投入生產。

所以現在的Kafka已經不僅是一個分散式的消息隊列,更是一個流處理平臺。這源於它於0.9.0.0和0.10.0.0引入的兩個全新的組件Kafka Connect與Kafka Streaming。

Kafka Connect簡介

我們知道消息隊列必須存在上下游的系統,對消息進行搬入搬出。比如經典的日誌分析系統,通過flume讀取日誌寫入kafka,下游由storm進行實時的數據處理。

file

Kafka Connect的作用就是替代Flume,讓數據傳輸這部分工作可以由Kafka Connect來完成。Kafka Connect是一個用於在Apache Kafka和其他系統之間可靠且可靠地傳輸數據的工具。它可以快速地將大量數據集合移入和移出Kafka。

Kafka Connect的導入作業可以將資料庫或從應用程式伺服器收集的數據傳入到Kafka,導出作業可以將Kafka中的數據傳遞到查詢系統,也可以傳輸到批處理系統以進行離線分析。

Kafka Connect功能包括:

  • 一個通用的Kafka連接的框架 - Kafka Connect規範化了其他數據系統與Kafka的集成,簡化了連接器開發,部署和管理
  • 分散式和獨立模式 - 支持大型分散式的管理服務,也支持小型生產環境的部署
  • REST界面 - 通過易用的REST API提交和管理Kafka Connect
  • 自動偏移管理 - 只需從連接器獲取一些信息,Kafka Connect就可以自動管理偏移量提交過程,因此連接器開發人員無需擔心連接器開發中偏移量提交這部分的開發
  • 預設情況下是分散式和可擴展的 - Kafka Connect構建在現有的組管理協議之上。可以添加擴展集群
  • 流媒體/批處理集成 - 利用Kafka現有的功能,Kafka Connect是橋接流媒體和批處理數據系統的理想解決方案

file

運行Kafka Connect

Kafka Connect目前支持兩種運行模式:獨立和集群。

獨立模式

在獨立模式下,只有一個進程,這種更容易設置和使用。但是沒有容錯功能。

啟動:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
獨立模式配置

第一個參數config/connect-standalone.properties是一些基本的配置:

這幾個在獨立和集群模式下都需要設置:

#bootstrap.servers   kafka集群列表
bootstrap.servers=localhost:9092
#key.converter       key的序列化轉換器  比如json的  key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter     value的序列化轉換器
value.converter=org.apache.kafka.connect.json.JsonConverter

#獨立模式特有的配置:
#offset.storage.file.filename       用於存儲偏移量的文件
offset.storage.file.filename =/home/kafka/connect.offsets
獨立模式連接器配置(配置文件)

後面的參數connector1.properties [connector2.properties ...] 可以多個,是連接器配置內容

這裡我們配置一個從文件讀取數據並存入kafka的配置:

connect-file-sink.properties

  • name - 連接器的唯一名稱。嘗試再次使用相同名稱註冊將失敗。

  • connector.class - 連接器的Java類 此連接器的類的全名或別名。這裡我們選擇FileStreamSink

  • tasks.max - 應為此連接器創建的最大任務數。如果連接器無法達到此級別的並行性,則可能會創建更少的任務。

  • key.converter - (可選)覆蓋worker設置的預設密鑰轉換器。

  • value.converter - (可選)覆蓋worker設置的預設值轉換器。

    下麵兩個必須設置一個:

    • topics - 以逗號分隔的主題列表,用作此連接器的輸入
    • topics.regex - 用作此連接器輸入的主題的Java正則表達式
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

可以在連接器中配置轉換器

需要指定參數:

  • transforms - 轉換的別名列表,指定將應用轉換的順序。
  • transforms.$alias.type - 轉換的完全限定類名。
  • transforms.$alias.$transformationSpecificConfig 轉換的配置屬性

例如,我們把剛纔的文件轉換器的內容添加欄位

首先設置connect-standalone.properties

key.converter.schemas.enable = false
value.converter.schemas.enable = false

設置connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

沒有轉換前的結果:

"foo"
"bar"
"hello world"

轉換後:

{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}

常用轉換類型:

  • InsertField - 使用靜態數據或記錄元數據添加欄位
  • ReplaceField - 過濾或重命名欄位
  • MaskField - 用類型的有效空值替換欄位(0,空字元串等)
  • ValueToKey Value轉換為Key
  • HoistField - 將整個事件作為單個欄位包裝在Struct或Map中
  • ExtractField - 從Struct和Map中提取特定欄位,併在結果中僅包含此欄位
  • SetSchemaMetadata - 修改架構名稱或版本
  • TimestampRouter - 根據原始主題和時間戳修改記錄主題
  • RegexRouter - 根據原始主題,替換字元串和正則表達式修改記錄主題

集群模式

集群模式下,可以擴展,容錯。

啟動:
> bin/connect-distributed.sh config/connect-distributed.properties

在集群模式下,Kafka Connect在Kafka主題中存儲偏移量,配置和任務狀態。

集群模式配置

connect-distributed.properties

#也需要基本的配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

#還有一些配置要註意
#group.id(預設connect-cluster) - Connect的組id 請註意,這不得與使用者的組id 衝突
group.id=connect-cluster

#用於存儲偏移的主題; 此主題應具有許多分區
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1

#用於存儲連接器和任務配置的主題  只能一個分區
config.storage.topic=connect-configs
config.storage.replication.factor=1

#用於存儲狀態的主題; 此主題可以有多個分區
status.storage.topic=connect-status
status.storage.replication.factor=1

在集群模式下,配置並不會在命令行傳進去,而是需要REST API來創建,修改和銷毀連接器。

集群模式連接器配置(REST API)

可以配置REST API伺服器,支持http與https

listeners=http://localhost:8080,https://localhost:8443

預設情況下,如果未listeners指定,則REST伺服器使用HTTP協議在埠8083上運行。

以下是當前支持的REST API:

  • GET /connectors - 返回活動連接器列表
  • POST /connectors - 創建一個新的連接器; 請求主體應該是包含字元串name欄位的JSON對象和包含config連接器配置參數的對象欄位
  • GET /connectors/{name} - 獲取有關特定連接器的信息
  • GET /connectors/{name}/config - 獲取特定連接器的配置參數
  • PUT /connectors/{name}/config - 更新特定連接器的配置參數
  • GET /connectors/{name}/status - 獲取連接器的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪個工作人員,錯誤信息(如果失敗)以及所有任務的狀態
  • GET /connectors/{name}/tasks - 獲取當前為連接器運行的任務列表
  • GET /connectors/{name}/tasks/{taskid}/status - 獲取任務的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪個工作人員,以及錯誤信息是否失敗
  • PUT /connectors/{name}/pause - 暫停連接器及其任務,這將停止消息處理,直到恢復連接器
  • PUT /connectors/{name}/resume - 恢復暫停的連接器(如果連接器未暫停,則不執行任何操作)
  • POST /connectors/{name}/restart - 重新啟動連接器(通常是因為它已經失敗)
  • POST /connectors/{name}/tasks/{taskId}/restart - 重啟個別任務(通常因為失敗)
  • DELETE /connectors/{name} - 刪除連接器,暫停所有任務並刪除其配置

連接器開髮指南

kakfa允許開發人員自己去開發一個連接器。

核心概念

要在Kafka和其他系統之間複製數據,用戶需要創建一個Connector

Connector有兩種形式:

SourceConnectors從另一個系統導入數據,例如,JDBCSourceConnector將關係資料庫導入Kafka

SinkConnectors導出數據,例如,HDFSSinkConnector將Kafka主題的內容導出到HDFS文件

和對應的Task:

SourceTaskSinkTask

Task形成輸入輸出流,開發Task要註意偏移量的問題。

每個流應該是一系列鍵值記錄。還需要定期提交已處理的數據的偏移量,以便在發生故障時,處理可以從上次提交的偏移量恢復。Connector還需要是動態的,實現還負責監視外部系統是否存在任何更改。

開發一個簡單的連接器

開發連接器只需要實現兩個介面,即ConnectorTask

這裡我們簡單開發一個FileStreamConnector。

此連接器是為在獨立模式下使用,SourceConnectorSourceTask讀取文件的每一行,SinkConnectorSinkTask每個記錄寫入一個文件。

連接器示例:

繼承SourceConnector,添加欄位(要讀取的文件名和要將數據發送到的主題)

public class FileStreamSourceConnector extends SourceConnector {
    private String filename;
    private String topic;

定義實際讀取數據的類

@Override
public Class<? extends Task> taskClass() {
    return FileStreamSourceTask.class;
}

FileStreamSourceTask下麵定義該類。接下來,我們添加一些標準的生命周期方法,start()stop()

@Override
public void start(Map<String, String> props) {
    // The complete version includes error handling as well.
    filename = props.get(FILE_CONFIG);
    topic = props.get(TOPIC_CONFIG);
}
 
@Override
public void stop() {
    // Nothing to do since no background monitoring is required.
}

最後,實施的真正核心在於taskConfigs()

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<>();
    // Only one input stream makes sense.
    Map<String, String> config = new HashMap<>();
    if (filename != null)
        config.put(FILE_CONFIG, filename);
    config.put(TOPIC_CONFIG, topic);
    configs.add(config);
    return configs;
}

任務示例:

源任務

實現SourceTask 創建FileStreamSourceTask繼承SourceTask

public class FileStreamSourceTask extends SourceTask {
    String filename;
    InputStream stream;
    String topic;
 
    @Override
    public void start(Map<String, String> props) {
        filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
        stream = openOrThrowError(filename);
        topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
    }
 
    @Override
    public synchronized void stop() {
        stream.close();
    }

接下來,我們實現任務的主要功能,即poll()從輸入系統獲取事件並返回以下內容的方法List

@Override
public List<SourceRecord> poll() throws InterruptedException {
    try {
        ArrayList<SourceRecord> records = new ArrayList<>();
        while (streamValid(stream) && records.isEmpty()) {
            LineAndOffset line = readToNextLine(stream);
            if (line != null) {
                Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
                Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
                records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
            } else {
                Thread.sleep(1);
            }
        }
        return records;
    } catch (IOException e) {
        // Underlying stream was killed, probably as a result of calling stop. Allow to return
        // null, and driving thread will handle any shutdown if necessary.
    }
    return null;
}
接收任務

不像SourceConnectorSinkConnectorSourceTaskSinkTask有非常不同的介面,因為SourceTask採用的是拉介面,並SinkTask使用推介面。兩者共用公共生命周期方法,但SinkTask完全不同:

public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }
 
    public abstract void put(Collection<SinkRecord> records);
 
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    }

這是一個簡單的例子,它們有簡單的結構化數據 - 每一行只是一個字元串。幾乎所有實用的連接器都需要具有更複雜數據格式的模式。要創建更複雜的數據,您需要使用Kafka Connect dataAPI。

Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .build();
 
Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75);

更多Kafka相關技術文章:

什麼是Kafka?
Kafka監控工具彙總
Kafka快速入門
Kafka核心之Consumer
Kafka核心之Producer

更多實時計算,Flink,Kafka等相關技術博文,歡迎關註實時流式計算

file


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 1.測試驗證環境 伺服器角色 機器名 IP SQL Server Ver 主體伺服器 WIN-TestDB4O 172.83.XXX.XXX SQL Server 2012 - 11.0.5058.0 (X64) 鏡像伺服器 WIN-TestDB5O 172.73.XXX.XXX SQL Serve ...
  • 前幾天在社區群上,有人問了一個問題 既然上游最小水印會決定視窗觸發,那如果我上游其中一條流突然沒有了數據,我的視窗還會繼續觸發嗎? 看到這個問題,我蒙了???? 對哈,因為我是選擇上游所有流中水印最小的一條作為當前水印時間,那萬一最小水印的那條流突然裡面沒有數據了 那我的最小水印不就一直不往前走了, ...
  • YARN基礎庫是其他一切模塊的基礎,它的設計直接決定了YARN的穩定性和擴展性,YARN借用了MRV1的一些底層基礎庫,比如RPC庫等,但因為引入了很多新的軟體設計方式,所以它的基礎庫更多,包括直接使用了開源序列化框架Protocol Buffers和Apache Avro,自定義的服務庫、事件庫和 ...
  • 在用戶代碼中,我們設置生成水印和事件時間的方法assignTimestampsAndWatermarks()中這裡有個方法的重載 我們傳入的對象分為兩種 AssignerWithPunctuatedWatermarks(可以理解為每條數據都會產生水印,如果不想產生水印,返回一個null的水印) As ...
  • 增 增加一條數據 如果數據是字元型,必須使用單引號或者雙引號,如:"value"。 刪 刪除一條數據 如果沒有指定 WHERE 子句,MySQL 表中的所有記錄將被刪除。 改 更新一條數據 查 查詢關鍵字的定義順序 ...
  • 一、Sqoop 基本命令 1. 查看所有命令 2. 查看某條命令的具體使用方法 二、Sqoop 與 MySQL 1. 查詢MySQL所有資料庫 通常用於 Sqoop 與 MySQL 連通測試: 2. 查詢指定資料庫中所有數據表 三、Sqoop 與 HDFS 3.1 MySQL數據導入到HDFS 1. ...
  • 一、Sqoop 簡介 Sqoop 是一個常用的數據遷移工具,主要用於在不同存儲系統之間實現數據的導入與導出: + 導入數據:從 MySQL,Oracle 等關係型資料庫中導入數據到 HDFS、Hive、HBase 等分散式文件存儲系統中; + 導出數據:從 分散式文件系統中導出數據到關係資料庫中。 ...
  • 筆者實驗環境:centos 7.4.1708,hadoop-2.6.0-cdh5.14.2. 執行hadoop命令時出現以下告警,不能載入相關庫: 檢查發現本地並沒有庫: 進入到hadoop目錄下可以看到/lib/native為空。 需要下載對應版本的庫文件:http://dl.bintray.co ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...