我們知道過去對於Kafka的定義是分散式,分區化的,帶備份機制的日誌提交服務。也就是一個分散式的消息隊列,這也是他最常見的用法。但是Kafka不止於此,打開最新的官網。 我們看到Kafka最新的定義是:Apache Kafka® is a distributed streaming platform ...
我們知道過去對於Kafka的定義是分散式,分區化的,帶備份機制的日誌提交服務。也就是一個分散式的消息隊列,這也是他最常見的用法。但是Kafka不止於此,打開最新的官網。
我們看到Kafka最新的定義是:Apache Kafka® is a distributed streaming platform
分散式流處理平臺。
這裡也清晰的描述了Kafka的特點:Kafka用於構建實時數據管道和流式應用程式。它具有水平可擴展性、容錯性、速度極快,併在數千家公司投入生產。
所以現在的Kafka已經不僅是一個分散式的消息隊列,更是一個流處理平臺。這源於它於0.9.0.0和0.10.0.0引入的兩個全新的組件Kafka Connect與Kafka Streaming。
Kafka Connect簡介
我們知道消息隊列必須存在上下游的系統,對消息進行搬入搬出。比如經典的日誌分析系統,通過flume讀取日誌寫入kafka,下游由storm進行實時的數據處理。
Kafka Connect的作用就是替代Flume,讓數據傳輸這部分工作可以由Kafka Connect來完成。Kafka Connect是一個用於在Apache Kafka和其他系統之間可靠且可靠地傳輸數據的工具。它可以快速地將大量數據集合移入和移出Kafka。
Kafka Connect的導入作業可以將資料庫或從應用程式伺服器收集的數據傳入到Kafka,導出作業可以將Kafka中的數據傳遞到查詢系統,也可以傳輸到批處理系統以進行離線分析。
Kafka Connect功能包括:
- 一個通用的Kafka連接的框架 - Kafka Connect規範化了其他數據系統與Kafka的集成,簡化了連接器開發,部署和管理
- 分散式和獨立模式 - 支持大型分散式的管理服務,也支持小型生產環境的部署
- REST界面 - 通過易用的REST API提交和管理Kafka Connect
- 自動偏移管理 - 只需從連接器獲取一些信息,Kafka Connect就可以自動管理偏移量提交過程,因此連接器開發人員無需擔心連接器開發中偏移量提交這部分的開發
- 預設情況下是分散式和可擴展的 - Kafka Connect構建在現有的組管理協議之上。可以添加擴展集群
- 流媒體/批處理集成 - 利用Kafka現有的功能,Kafka Connect是橋接流媒體和批處理數據系統的理想解決方案
運行Kafka Connect
Kafka Connect目前支持兩種運行模式:獨立和集群。
獨立模式
在獨立模式下,只有一個進程,這種更容易設置和使用。但是沒有容錯功能。
啟動:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
獨立模式配置
第一個參數config/connect-standalone.properties是一些基本的配置:
這幾個在獨立和集群模式下都需要設置:
#bootstrap.servers kafka集群列表
bootstrap.servers=localhost:9092
#key.converter key的序列化轉換器 比如json的 key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter value的序列化轉換器
value.converter=org.apache.kafka.connect.json.JsonConverter
#獨立模式特有的配置:
#offset.storage.file.filename 用於存儲偏移量的文件
offset.storage.file.filename =/home/kafka/connect.offsets
獨立模式連接器配置(配置文件)
後面的參數connector1.properties [connector2.properties ...] 可以多個,是連接器配置內容
這裡我們配置一個從文件讀取數據並存入kafka的配置:
connect-file-sink.properties
name
- 連接器的唯一名稱。嘗試再次使用相同名稱註冊將失敗。connector.class
- 連接器的Java類 此連接器的類的全名或別名。這裡我們選擇FileStreamSinktasks.max
- 應為此連接器創建的最大任務數。如果連接器無法達到此級別的並行性,則可能會創建更少的任務。key.converter
- (可選)覆蓋worker設置的預設密鑰轉換器。value.converter
- (可選)覆蓋worker設置的預設值轉換器。下麵兩個必須設置一個:
topics
- 以逗號分隔的主題列表,用作此連接器的輸入topics.regex
- 用作此連接器輸入的主題的Java正則表達式
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
可以在連接器中配置轉換器
需要指定參數:
transforms
- 轉換的別名列表,指定將應用轉換的順序。transforms.$alias.type
- 轉換的完全限定類名。transforms.$alias.$transformationSpecificConfig
轉換的配置屬性
例如,我們把剛纔的文件轉換器的內容添加欄位
首先設置connect-standalone.properties
key.converter.schemas.enable = false
value.converter.schemas.enable = false
設置connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
transforms=MakeMap, InsertSource
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.MakeMap.field=line
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source
沒有轉換前的結果:
"foo"
"bar"
"hello world"
轉換後:
{"line":"foo","data_source":"test-file-source"}
{"line":"bar","data_source":"test-file-source"}
{"line":"hello world","data_source":"test-file-source"}
常用轉換類型:
- InsertField - 使用靜態數據或記錄元數據添加欄位
- ReplaceField - 過濾或重命名欄位
- MaskField - 用類型的有效空值替換欄位(0,空字元串等)
- ValueToKey Value轉換為Key
- HoistField - 將整個事件作為單個欄位包裝在Struct或Map中
- ExtractField - 從Struct和Map中提取特定欄位,併在結果中僅包含此欄位
- SetSchemaMetadata - 修改架構名稱或版本
- TimestampRouter - 根據原始主題和時間戳修改記錄主題
- RegexRouter - 根據原始主題,替換字元串和正則表達式修改記錄主題
集群模式
集群模式下,可以擴展,容錯。
啟動:
> bin/connect-distributed.sh config/connect-distributed.properties
在集群模式下,Kafka Connect在Kafka主題中存儲偏移量,配置和任務狀態。
集群模式配置
connect-distributed.properties
#也需要基本的配置
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#還有一些配置要註意
#group.id(預設connect-cluster) - Connect的組id 請註意,這不得與使用者的組id 衝突
group.id=connect-cluster
#用於存儲偏移的主題; 此主題應具有許多分區
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
#用於存儲連接器和任務配置的主題 只能一個分區
config.storage.topic=connect-configs
config.storage.replication.factor=1
#用於存儲狀態的主題; 此主題可以有多個分區
status.storage.topic=connect-status
status.storage.replication.factor=1
在集群模式下,配置並不會在命令行傳進去,而是需要REST API來創建,修改和銷毀連接器。
集群模式連接器配置(REST API)
可以配置REST API伺服器,支持http與https
listeners=http://localhost:8080,https://localhost:8443
預設情況下,如果未listeners
指定,則REST伺服器使用HTTP協議在埠8083上運行。
以下是當前支持的REST API:
GET /connectors
- 返回活動連接器列表POST /connectors
- 創建一個新的連接器; 請求主體應該是包含字元串name
欄位的JSON對象和包含config
連接器配置參數的對象欄位GET /connectors/{name}
- 獲取有關特定連接器的信息GET /connectors/{name}/config
- 獲取特定連接器的配置參數PUT /connectors/{name}/config
- 更新特定連接器的配置參數GET /connectors/{name}/status
- 獲取連接器的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪個工作人員,錯誤信息(如果失敗)以及所有任務的狀態GET /connectors/{name}/tasks
- 獲取當前為連接器運行的任務列表GET /connectors/{name}/tasks/{taskid}/status
- 獲取任務的當前狀態,包括它是否正在運行,失敗,暫停等,分配給哪個工作人員,以及錯誤信息是否失敗PUT /connectors/{name}/pause
- 暫停連接器及其任務,這將停止消息處理,直到恢復連接器PUT /connectors/{name}/resume
- 恢復暫停的連接器(如果連接器未暫停,則不執行任何操作)POST /connectors/{name}/restart
- 重新啟動連接器(通常是因為它已經失敗)POST /connectors/{name}/tasks/{taskId}/restart
- 重啟個別任務(通常因為失敗)DELETE /connectors/{name}
- 刪除連接器,暫停所有任務並刪除其配置
連接器開髮指南
kakfa允許開發人員自己去開發一個連接器。
核心概念
要在Kafka和其他系統之間複製數據,用戶需要創建一個Connector
Connector有兩種形式:
SourceConnectors
從另一個系統導入數據,例如,JDBCSourceConnector
將關係資料庫導入Kafka
SinkConnectors
導出數據,例如,HDFSSinkConnector
將Kafka主題的內容導出到HDFS文件
和對應的Task:
SourceTask
和SinkTask
Task形成輸入輸出流,開發Task要註意偏移量的問題。
每個流應該是一系列鍵值記錄。還需要定期提交已處理的數據的偏移量,以便在發生故障時,處理可以從上次提交的偏移量恢復。Connector還需要是動態的,實現還負責監視外部系統是否存在任何更改。
開發一個簡單的連接器
開發連接器只需要實現兩個介面,即Connector
和Task
。
這裡我們簡單開發一個FileStreamConnector。
此連接器是為在獨立模式下使用,SourceConnector/
SourceTask讀取文件的每一行,
SinkConnector/
SinkTask每個記錄寫入一個文件。
連接器示例:
繼承SourceConnector,添加欄位(要讀取的文件名和要將數據發送到的主題)
public class FileStreamSourceConnector extends SourceConnector {
private String filename;
private String topic;
定義實際讀取數據的類
@Override
public Class<? extends Task> taskClass() {
return FileStreamSourceTask.class;
}
在FileStreamSourceTask
下麵定義該類。接下來,我們添加一些標準的生命周期方法,start()
和stop()
@Override
public void start(Map<String, String> props) {
// The complete version includes error handling as well.
filename = props.get(FILE_CONFIG);
topic = props.get(TOPIC_CONFIG);
}
@Override
public void stop() {
// Nothing to do since no background monitoring is required.
}
最後,實施的真正核心在於taskConfigs()
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<>();
// Only one input stream makes sense.
Map<String, String> config = new HashMap<>();
if (filename != null)
config.put(FILE_CONFIG, filename);
config.put(TOPIC_CONFIG, topic);
configs.add(config);
return configs;
}
任務示例:
源任務
實現SourceTask
創建FileStreamSourceTask繼承SourceTask
public class FileStreamSourceTask extends SourceTask {
String filename;
InputStream stream;
String topic;
@Override
public void start(Map<String, String> props) {
filename = props.get(FileStreamSourceConnector.FILE_CONFIG);
stream = openOrThrowError(filename);
topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG);
}
@Override
public synchronized void stop() {
stream.close();
}
接下來,我們實現任務的主要功能,即poll()
從輸入系統獲取事件並返回以下內容的方法List
:
@Override
public List<SourceRecord> poll() throws InterruptedException {
try {
ArrayList<SourceRecord> records = new ArrayList<>();
while (streamValid(stream) && records.isEmpty()) {
LineAndOffset line = readToNextLine(stream);
if (line != null) {
Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename);
Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset);
records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line));
} else {
Thread.sleep(1);
}
}
return records;
} catch (IOException e) {
// Underlying stream was killed, probably as a result of calling stop. Allow to return
// null, and driving thread will handle any shutdown if necessary.
}
return null;
}
接收任務
不像SourceConnector
和SinkConnector
,SourceTask
並SinkTask
有非常不同的介面,因為SourceTask
採用的是拉介面,並SinkTask
使用推介面。兩者共用公共生命周期方法,但SinkTask
完全不同:
public abstract class SinkTask implements Task {
public void initialize(SinkTaskContext context) {
this.context = context;
}
public abstract void put(Collection<SinkRecord> records);
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
}
這是一個簡單的例子,它們有簡單的結構化數據 - 每一行只是一個字元串。幾乎所有實用的連接器都需要具有更複雜數據格式的模式。要創建更複雜的數據,您需要使用Kafka Connect data
API。
Schema schema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.build();
Struct struct = new Struct(schema)
.put("name", "Barbara Liskov")
.put("age", 75);
更多Kafka相關技術文章:
什麼是Kafka?
Kafka監控工具彙總
Kafka快速入門
Kafka核心之Consumer
Kafka核心之Producer
更多實時計算,Flink,Kafka等相關技術博文,歡迎關註實時流式計算