最簡單流處理引擎——Kafka Streams簡介

来源:https://www.cnblogs.com/tree1123/archive/2019/09/04/11457851.html
-Advertisement-
Play Games

Kafka在0.10.0.0版本以前的定位是分散式,分區化的,帶備份機制的日誌提交服務。而kafka在這之前也沒有提供數據處理的顧服務。大家的流處理計算主要是還是依賴於Storm,Spark Streaming,Flink等流式處理框架。 Storm,Spark Streaming,Flink流處理 ...


file
Kafka在0.10.0.0版本以前的定位是分散式,分區化的,帶備份機制的日誌提交服務。而kafka在這之前也沒有提供數據處理的顧服務。大家的流處理計算主要是還是依賴於Storm,Spark Streaming,Flink等流式處理框架。

file

Storm,Spark Streaming,Flink流處理的三駕馬車各有各的優勢.

Storm低延遲,並且在市場中占有一定的地位,目前很多公司仍在使用。

Spark Streaming藉助Spark的體系優勢,活躍的社區,也占有一定的份額。

而Flink在設計上更貼近流處理,並且有便捷的API,未來一定很有發展。

file

但是他們都離不開Kafka的消息中轉,所以Kafka於0.10.0.0版本推出了自己的流處理框架,Kafka Streams。Kafka的定位也正式成為Apache Kafka® is a distributed streaming platform,分散式流處理平臺。

實時流式計算

近幾年來實時流式計算發展迅速,主要原因是實時數據的價值和對於數據處理架構體系的影響。實時流式計算包含了 無界數據 近實時 一致性 可重覆結果 等等特征。a type of data processing engine that is designed with infinite data sets in mind 一種考慮了無線數據集的數據處理引擎。

1、無限數據:一種不斷增長的,基本上無限的數據集。這些通常被稱為“流式數據”。無限的流式數據集可以稱為無界數據,相對而言有限的批量數據就是有界數據。

2、無界數據處理:一種持續的數據處理模式,應用於上面的無界數據。批量處理數據(離線計算)也可以重覆運行來處理數據,但是會有性能的瓶頸。

3、低延遲,近實時的結果:相對於離線計算而言,離線計算並沒有考慮延遲的問題。

解決了兩個問題,流處理可以提代批處理系統:

1、正確性:有了這個,就和批量計算等價了。

Streaming需要能隨著時間的推移依然能計算一定時間視窗的數據。Spark Streaming通過微批的思想解決了這個問題,實時與離線系統進行了一致性的存儲,這一點在未來的實時計算系統中都應該滿足。

2、推理時間的工具:這可以讓我們超越批量計算。

好的時間推理工具對於處理不同事件的無界無序數據至關重要。

而時間又分為事件時間和處理時間。

還有很多實時流式計算的相關概念,這裡不做贅述。

Kafka Streams簡介

Kafka Streams被認為是開發實時應用程式的最簡單方法。它是一個Kafka的客戶端API庫,編寫簡單的java和scala代碼就可以實現流式處理。

優勢:

  • 彈性,高度可擴展,容錯

  • 部署到容器,VM,裸機,雲

  • 同樣適用於小型,中型和大型用例

  • 與Kafka安全性完全集成
  • 編寫標準Java和Scala應用程式
  • 在Mac,Linux,Windows上開發

  • Exactly-once 語義

用例:

紐約時報使用Apache Kafka和Kafka Streams將發佈的內容實時存儲和分發到各種應用程式和系統,以供讀者使用。

Pinterest大規模使用Apache Kafka和Kafka Streams來支持其廣告基礎架構的實時預測預算系統。使用Kafka Streams,預測比以往更準確。

作為歐洲領先的線上時尚零售商,Zalando使用Kafka作為ESB(企業服務匯流排),幫助我們從單一服務架構轉變為微服務架構。使用Kafka處理 事件流使我們的技術團隊能夠實現近乎實時的商業智能。

荷蘭合作銀行是荷蘭三大銀行之一。它的數字神經系統Business Event Bus由Apache Kafka提供支持。它被越來越多的財務流程和服務所使用,其中之一就是Rabo Alerts。此服務會在財務事件時實時向客戶發出警報,並使用Kafka Streams構建。

LINE使用Apache Kafka作為我們服務的中央資料庫,以便彼此通信。每天產生數億億條消息,用於執行各種業務邏輯,威脅檢測,搜索索引和數據分析。LINE利用Kafka Streams可靠地轉換和過濾主題,使消費者可以有效消費的子主題,同時由於其複雜而簡單的代碼庫,保持易於維護性。

Topology

Kafka Streams通過一個或多個拓撲定義其計算邏輯,其中拓撲是通過流(邊緣)和流處理器(節點)構成的圖。

file

拓撲中有兩種特殊的處理器

  • 源處理器:源處理器是一種特殊類型的流處理器,沒有任何上游處理器。它通過使用來自這些主題的記錄並將它們轉發到其下游處理器,從一個或多個Kafka主題為其拓撲生成輸入流。
  • 接收器處理器:接收器處理器是一種特殊類型的流處理器,沒有下游處理器。它將從其上游處理器接收的任何記錄發送到指定的Kafka主題。

在正常處理器節點中,還可以把數據發給遠程系統。因此,處理後的結果可以流式傳輸回Kafka或寫入外部系統。

Kafka在這當中提供了最常用的數據轉換操作,例如mapfilterjoinaggregations等,簡單易用。

當然還有一些關於時間,視窗,聚合,亂序處理等。未來再一一做詳細介紹,下麵我們進行簡單的入門案例開發。

快速入門

首先提供WordCount的java版和scala版本。

java8+:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
 
import java.util.Arrays;
import java.util.Properties;
 
public class WordCountApplication {
 
    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
 
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
 
}

scala:

import java.util.Properties
import java.util.concurrent.TimeUnit
 
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
object WordCountApplication extends App {
  import Serdes._
 
  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }
 
  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")
 
  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()
 
  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}

如果kafka已經啟動了,可以跳過前兩步。

1、下載

下載 2.3.0版本並解壓縮它。請註意,有多個可下載的Scala版本,我們選擇使用推薦的版本(2.12):

> tar -xzf kafka_2.12-2.3.0.tgz
> cd kafka_2.12-2.3.0

2、啟動

Kafka使用ZooKeeper,因此如果您還沒有ZooKeeper伺服器,則需要先啟動它。

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

啟動Kafka伺服器:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3、創建topic 啟動生產者

我們創建名為streams-plaintext-input的輸入主題和名為streams-wordcount-output的輸出主題:

> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".


> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".

查看:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
 
Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0

4、啟動WordCount

以下命令啟動WordCount演示應用程式:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

演示應用程式將從輸入主題stream-plaintext-input讀取,對每個讀取消息執行WordCount演算法的計算,並連續將其當前結果寫入輸出主題streams-wordcount-output。因此,除了日誌條目之外不會有任何STDOUT輸出,因為結果會寫回Kafka。

現在我們可以在一個單獨的終端中啟動控制台生成器,為這個主題寫一些輸入數據:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

並通過在單獨的終端中使用控制台使用者讀取其輸出主題來檢查WordCount演示應用程式的輸出:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

5、處理數據

我們在生產者端輸入一些數據。

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka

輸出端:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1

繼續輸入:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2

我們看到隨著數據實時輸入,wordcount的結果實時的輸出了。

6、停止程式

您現在可以通過Ctrl-C按順序停止控制台使用者,控制台生產者,Wordcount應用程式,Kafka代理和ZooKeeper伺服器。

什麼是Kafka?
Kafka監控工具彙總
Kafka快速入門
Kafka核心之Consumer
Kafka核心之Producer

替代Flume——Kafka Connect簡介

更多實時計算,Flink,Kafka等相關技術博文,歡迎關註實時流式計算

file


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 本篇主要通過練習來講解子查詢的知識,進入正題之前,先熟悉數據表,表格的數據可以先不用管,主要是熟悉表格的欄位名 這裡子查詢分為三個部分: 1、where條件子查詢 這個子查詢和普通的查詢沒什麼區別,主要是先讀懂題目的意思,然後將結果集組裝起來 需求:1.查看職員名稱和名字為chang的員工一樣的所有 ...
  • 查詢後的結果 更改SQL 語句為 查詢結果 ...
  • 外鍵的使用條件: 兩個表必須是InnoDB表,MyISAM表暫時不支持外鍵外鍵列必須建立了索引,MySQL 4.1.2以後的版本在建立外鍵時會自動創建索引,但如果在較早的版本則需要顯式建立;外鍵關係的兩個表的列必須是數據類型相似,也就是可以相互轉換類型的列,比如int和tinyint可以,而int和 ...
  • [TOC] 1.使用mysqldump實現邏輯備份 2.恢復邏輯備份 3.備份/恢復案例 mysql 資料庫備份/恢復實驗一:資料庫損壞 備份: 1. mysqldump uroot p123 all databases /backup/ _all.sql 2. mysql uroot p123 e ...
  • ORA-12514 TNS 監聽程式當前無法識別連接描述符中請求服務 的解決方法 ORA-12514 TNS 監聽程式當前無法識別連接描述符中請求服務 有同事遇到這個問題,現總結一下,原因如下: 你oracle安裝成功後,一直未停止資料庫(即資料庫是啟動的),客戶端配置成功後,應該一直不會有什麼問題 ...
  • 1.避免全表掃描 對查詢進行優化,應儘量避免全表掃描,首先應考慮在where 及order by 涉及的列上建立索引。 2.避免判斷null 值 應儘量避免在where 子句中對欄位進行null 值判斷,否則將導致引擎放棄使用索引而進行全表掃描,如: select id from t where n ...
  • 在使用left join時,on and和on where條件的區別如下: 1、on條件是在生成臨時表時使用的條件,它不管on中的條件是否為真,都會返回左邊表中的記錄。 2、where條件是在臨時表生成好後,再對臨時表進行過濾的條件。這時已經沒有left join的含義(必須返回左邊表的記錄)了, ...
  • 一、御前 1 win+R DOS 輸入 net start mtsql 和 net stop mysql 啟動和停止Mysql 服務,也可通過電腦——管理——服務和應用程式——服務——MYSQL——右擊 啟動mysql服務出現服務名無效的原因及解決方法【失敗】 問題原因:mysql服務沒有安裝。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...