Kafka入門寶典(詳細截圖版)

来源:https://www.cnblogs.com/tree1123/archive/2019/09/09/11490122.html
-Advertisement-
Play Games

1、瞭解 Apache Kafka 1.1、簡介 官網:http://kafka.apache.org/ Apache Kafka 是一個開源 消息系統 ,由Scala 寫成。是由Apache 軟體基金會開發的一個開源消息系統項目。 Kafka 最初是由LinkedIn 開發,並於2011 年初開源 ...


file

1、瞭解 Apache Kafka

1.1、簡介

file

官網:http://kafka.apache.org/

  • Apache Kafka 是一個開源消息系統,由Scala 寫成。是由Apache 軟體基金會開發的一個開源消息系統項目。
  • Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該項目的目標是為處理實時數據提供一個統一、高通量、低等待(低延時)的平臺。
  • Kafka 是一個分散式消息系統:具有生產者、消費者的功能。它提供了類似於JMS 的特性,但是在設計實現上完全不同,此外它並不是JMS 規範的實現。【重點】

1.2、kafka的基本結構

file

  • Producer:消息的發送者

  • Consumer:消息的接收者

  • kafka cluster:kafka的集群。

  • Topic:就是消息類別名,一個topic中通常放置一類消息。每個topic都有一個或者多個訂閱者(消費者)。

消息的生產者將消息推送到kafka集群,消息的消費者從kafka集群中拉取消息。

1.3、kafka的完整架構

file

說明:

  • broker:集群中的每一個kafka實例,稱之為broker;
  • ZooKeeper:Kafka 利用ZooKeeper 保存相應元數據信息, Kafka 元數據信息包括如代理節點信息、Kafka集群信息、舊版消費者信息及其消費偏移量信息、主題信息、分區狀態信息、分區副本分配方案信息、動態配置信息等。
  • ConsumerGroup:在Kafka 中每一個消費者都屬於一個特定消費組( ConsumerGroup ),我們可以為每個消費者指定一個消費組,以groupld 代表消費組名稱,通過group.id 配置設置。如果不指定消費組,則該消費者屬於預設消費組test-consumer-group 。

1.4、kafka的特性

  • 消息持久化
    • Kafka 基於文件系統來存儲和緩存消息。
  • 高吞吐量
    • Kafka 將數據寫到磁碟,充分利用磁碟的順序讀寫。同時, Kafka 在數據寫入及數據同步採用了零拷貝( zero-copy )技術,採用sendFile()函數調用,sendFile()函數是在兩個文件描述符之間直接傳遞數據,完全在內核中操作,從而避免了內核緩衝區與用戶緩衝區之間數據的拷貝,操作效率極高。
    • Kafka 還支持數據壓縮及批量發送,同時Kafka 將每個主題劃分為多個分區,這一系列的優化及實現方法使得Kafka 具有很高的吞吐量。經大多數公司對Kafka 應用的驗證, Kafka 支持每秒數百萬級別的消息
  • 高擴展性
    • Kafka 依賴ZooKeeper來對集群進行協調管理,這樣使得Kafka 更加容易進行水平擴展,生產者、消費者和代理都為分散式,可配置多個。
    • 同時在機器擴展時無需將整個集群停機,集群能夠自動感知,重新進行負責均衡及數據複製。
  • 多客戶端支持
    • Kafka 核心模塊用Scala 語言開發,Kafka 提供了多種開發語言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等。
  • 安全機制
    • Kafka 支持以下幾種安全措施:
      • 通過SSL 和SASL(Kerberos), SASL/PLA時驗證機制支持生產者、消費者與broker連接時的身份認證;
      • 支持代理與ZooKeeper 連接身份驗證;
      • 通信時數據加密;
      • 客戶端讀、寫許可權認證;
      • Kafka 支持與外部其他認證授權服務的集成;
  • 數據備份
    • Kafka 可以為每個topic指定副本數,對數據進行持久化備份,這可以一定程度上防止數據丟失,提高可用性。
  • 輕量級
    • Kafka 的實例是無狀態的,即broker不記錄消息是否被消費,消費偏移量的管理交由消費者自己或組協調器來維護。
    • 同時集群本身幾乎不需要生產者和消費者的狀態信息,這就使得Kafka非常輕量級,同時生產者和消費者客戶端實現也非常輕量級。
  • 消息壓縮
    • Kafka 支持Gzip, Snappy 、LZ4 這3 種壓縮方式,通常把多條消息放在一起組成MessageSet,然後再把Message Set 放到一條消息裡面去,從而提高壓縮比率進而提高吞吐量。

1.5、kafka的應用場景

  • 消息系統。
    • Kafka 作為一款優秀的消息系統,具有高吞吐量、內置的分區、備份冗餘分散式等特點,為大規模消息處理提供了一種很好的解決方案。
  • 應用監控。
    • 利用Kafka 採集應用程式和伺服器健康相關的指標,如CPU 占用率、IO 、記憶體、連接數、TPS 、QPS 等,然後將指標信息進行處理,從而構建一個具有監控儀錶盤、曲線圖等可視化監控系統。例如,很多公司採用Kafka 與ELK (Elastic Search 、Logstash 和Kibana)整合構建應用服務監控系統。
  • 網站用戶行為追蹤。
    • 為了更好地瞭解用戶行為、操作習慣,改善用戶體驗,進而對產品升級改進,將用戶操作軌跡、內容等信息發送到Kafka 集群上,通過Hadoop 、Spark 或Strom等進行數據分析處理,生成相應的統計報告,為推薦系統推薦對象建模提供數據源,進而為每個用戶進行個性化推薦。
  • 流處理。
    • 需要將己收集的流數據提供給其他流式計算框架進行處理,用Kafka 收集流數據是一個不錯的選擇。
  • 持久性日誌。
    • Kafka 可以為外部系統提供一種持久性日誌的分散式系統。日誌可以在多個節點間進行備份, Kafka 為故障節點數據恢復提供了一種重新同步的機制。同時, Kafka很方便與HDFS 和Flume 進行整合,這樣就方便將Kafka 採集的數據持久化到其他外部系統。

2、Kafka的安裝與配置

準備三台虛擬機,分別是node01,node02,node03,並且修改hosts文件如下:

vim /etc/hosts
#註意: 前面的ip地址改成自己的ip地址

192.168.40.133 node01
192.168.40.134 node02
192.168.40.135 node03

#3台伺服器的時間要一致
#時間更新:
yum install -y rdate
rdate -s  time-b.nist.gov

2.1、基礎環境配置

2.1.1、JDK環境

由於Kafka 是用Scala 語言開發的,運行在JVM上,因此在安裝Kafka 之前需要先安裝JDK 。

安裝過程略過,我這裡使用的是jdk1.8。

file

2.1.2、ZooKeeper環境

2.1.2.1、安裝ZooKeeper

Kafka 依賴ZooKeeper ,通過ZooKeeper 來對服務節點、消費者上下線管理、集群、分區元數據管理等,因此ZooKeeper 也是Kafka 得以運行的基礎環境之一。

#上傳zookeeper-3.4.9.tar.gz到/export/software
cd /export/software
mkdir -p /export/servers/
tar -xvf zookeeper-3.4.9.tar.gz -C /export/servers/
#創建ZooKeeper的data目錄
mkdir /export/data/zookeeper -p
cd /export/servers/zookeeper-3.4.9/conf/
#修改配置文件
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
#設置data目錄
dataDir=/export/data/zookeeper
#啟動ZooKeeper
./zkServer.sh start
#檢查是否啟動成功
jps
2.1.2.3、搭建ZooKeeper集群
#在/export/data/zookeeper目錄中創建myid文件
vim /export/data/zookeeper/myid
#寫入對應的節點的id,如:1,2等,保存退出

#在conf下,修改zoo.cfg文件
vim zoo.cfg
#添加如下內容
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
2.1.2.3、配置環境變數
vim /etc/profile
export ZK_HOME=/export/servers/zookeeper-3.4.9
export PATH=${ZK_HOME}/bin:$PATH

#立即生效
source /etc/profile
2.1.2.4、分發到其它機器
scp /etc/profile node02:/etc/
scp /etc/profile node03:/etc/

cd /export/servers
scp -r zookeeper-3.4.9 node02:/export/servers/
scp -r zookeeper-3.4.9 node03:/export/servers/
2.1.2.5、一鍵啟動、停止腳本
mkdir -p /export/servers/onekey/zk
vim slave
#輸入如下內容
node01
node02
node03
#保存退出

vim startzk.sh
#輸入如下內容
cat /export/servers/onekey/zk/slave | while read line
do
{
 echo "開始啟動 --> "$line
 ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh start >/dev/null 2>&1 &"
}&
wait
done
echo "★★★啟動完成★★★"
#保存退出

vim stopzk.sh
#輸入如下內容
cat /export/servers/onekey/zk/slave | while read line
do
{
 echo "開始停止 --> "$line
 ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh stop >/dev/null 2>&1 &"
}&
wait
done
echo "★★★停止完成★★★"
#保存退出

#設置可執行許可權
chmod +x startzk.sh stopzk.sh

#添加到環境變數中
export ZK_ONEKEY=/export/servers/onekey
export PATH=${ZK_ONEKEY}/zk:$PATH
2.1.2.6、檢查啟動是否成功

file

發現三台機器都有“QuorumPeerMain”進程,說明機器已經啟動成功了。

檢查集群是否正常:

zkServer.sh status

file

file

file

發現,集群運行一切正常。

2.2、安裝Kafka

2.2.1、單機版Kafka安裝

第一步:上傳Kafka安裝包並且解壓

rz 上傳kafka_2.11-1.1.0.tgz到 /export/software/
cd /export/software/
tar -xvf kafka_2.11-1.1.0.tgz -C /export/servers/
cd /export/servers
mv kafka_2.11-1.1.0/ kafka

第二步:配置環境變數

vim /etc/profile

#輸入如下內容
export KAFKA_HOME=/export/servers/kafka
export PATH=${KAFKA_HOME}/bin:$PATH

#保存退出
source /etc/profile

第三步:修改配置文件

cd /export/servers/kafka
cd config
vim server.properties

# The id of the broker. This must be set to a unique integer for each broker.
# 必須要只要一個brokerid,並且它必須是唯一的。
broker.id=0

# A comma separated list of directories under which to store log files
# 日誌數據文件存儲的路徑 (如不存在,需要手動創建該目錄, mkdir -p /export/data/kafka/)
log.dirs=/export/data/kafka

# ZooKeeper的配置,本地模式下指向到本地的ZooKeeper服務即可
zookeeper.connect=node01:2181

# 保存退出

第四步:啟動kafka服務

# 以守護進程的方式啟動kafka
kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties

第五步:檢測kafka是否啟動

file

如果進程中有名為kafka的進程,就說明kafka已經啟動了。

2.2.2、驗證kafka是否安裝成功

由於kafka是將元數據保存在ZooKeeper中的,所以,可以通過查看ZooKeeper中的信息進行驗證kafka是否安裝成功。

file

file

file

2.2.3、部署kafka-manager

Kafka Manager 由 yahoo 公司開發,該工具可以方便查看集群 主題分佈情況,同時支持對 多個集群的管理、分區平衡以及創建主題等操作。

源碼托管於github:https://github.com/yahoo/kafka-manager

第一步:上傳Kafka-manager安裝包並且解壓

rz上傳kafka-manager-1.3.3.17.tar.gz到 /export/software/
cd /export/software
tar -xvf kafka-manager-1.3.3.17.tar.gz -C /export/servers/
cd /export/servers/kafka-manager-1.3.3.17/conf

第二步:修改配置文件

#修改配置文件
vim application.conf
#新增項,http訪問服務的埠
http.port=19000
#修改成自己的zk機器地址和埠
kafka-manager.zkhosts="node01:2181"
#保存退出

第三步:啟動服務

cd /export/servers/kafka-manager-1.3.3.17/bin
#啟動服務
./kafka-manager -Dconfig.file=../conf/application.conf

#製作啟動腳本
vim /etc/profile
export KAFKA_MANAGE_HOME=/export/servers/kafka-manager-1.3.3.17
export PATH=${KAFKA_MANAGE_HOME}/bin:$PATH

source /etc/profile

cd /export/servers/onekey/
mkdir kafka-manager
cd kafka-manager
vim start-kafka-manager.sh
nohup kafka-manager -Dconfig.file=${KAFKA_MANAGE_HOME}/conf/application.conf >/dev/null 2>&1 &
chmod +x start-kafka-manager.sh
vim /etc/profile
export PATH=${ZK_ONEKEY}/kafka-manager:$PATH
source /etc/profile

第四步:檢查是否啟動成功

打開瀏覽器,輸入地址:http://node01:19000/,即可看到kafka-manage管理界面。

file

2.2.4、kafka-manager的使用

進入管理界面,是沒有顯示Cluster信息的,需要添加後才能操作。

  • 添加 Cluster:

file

輸入Cluster Name、ZooKeeper信息、以及Kafka的版本信息(這裡最高只能選擇1.0.0)。

file

點擊Save按鈕保存。

file

添加成功。

  • 查看kafka的信息
    file
  • 查看Broker信息
    file
  • 查看Topic列表
    file
  • 查看單個topic信息以及操作
    file
  • 優化副本選舉
    file
  • 查看消費者信息
    file

2.2.5、搭建kafka集群

kafka集群的搭建是非常簡單的,只需要將上面的單機版的kafka分發的其他機器,並且將ZooKeeper信息修改成集群的配置以及設置不同的broker值即可。

第一步:將kafka分發到node02、node03

cd /export/servers/
scp -r kafka node02:/export/servers/
scp -r kafka node03:/export/servers/
scp /etc/profile node02:/etc/
scp /etc/profile node03:/etc/
# 分別到node02、node03機器上執行
source /etc/profile

第二步:修改node01、node02、node03上的kafka配置文件

  • node01:

    cd /export/servers/kafka/config
    vim server.properties
    zookeeper.connect=node01:2181,node02:2181,node03:2181
  • node02:

    cd /export/servers/kafka/config
    vim server.properties
    broker.id=1
    zookeeper.connect=node01:2181,node02:2181,node03:2181
  • node03:

    cd /export/servers/kafka/config
    vim server.properties
    broker.id=2
    zookeeper.connect=node01:2181,node02:2181,node03:2181

第三步:編寫一鍵啟動、停止腳本。註意:該腳本依賴於環境變數中的KAFKA_HOME。

mkdir -p /export/servers/onekey/kafka
vim slave
#輸入如下內容
node01
node02
node03
#保存退出

vim start-kafka.sh
#輸入如下內容
cat /export/servers/onekey/kafka/slave | while read line
do
{
 echo "開始啟動 --> "$line
 ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties >/dev/null 2>&1 &"
}&
wait
done
echo "★★★啟動完成★★★"
#保存退出
chmod +x start-kafka.sh

vim stop-kafka.sh
#輸入如下內容
cat /export/servers/onekey/kafka/slave | while read line
do
{
 echo "開始停止 --> "$line
 ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &"
}&
wait
done
echo "★★★停止完成★★★"
#保存退出
chmod +x stop-kafka.sh

#加入到環境變數中
export PATH=${ZK_ONEKEY}/kafka:$PATH
source /etc/profile

第四步:通過kafka-manager管理工具查看集群信息。
file

由此可見,kafka集群已經啟動完成。

3、Kafka快速入門

對kafka的操作有2種方式,一種是通過命令行方式,一種是通過API方式。

3.1、通過命令行Kafka

Kafka在bin目錄下提供了shell腳本文件,可以對Kafka進行操作,分別是:
file
通過命令行的方式,我們將體驗下kafka,以便我們對kafka有進一步的認知。

3.1.1、topic的操作

3.1.1.1、創建topic
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic

#執行結果:
Created topic "my-kafka-topic".

參數說明:

  • zookeeper:參數是必傳參數,用於配置 Kafka 集群與 ZooKeeper 連接地址。至少寫一個。
  • partitions:參數用於設置主題分區數,該配置為必傳參數。
  • replication-factor:參數用來設置主題副本數 ,該配置也是必傳參數。
  • topic:指定topic的名稱。
3.1.1.2、查看topic列表
kafka-topics.sh --list --zookeeper node01:2181

__consumer_offsets
my-kafka-topic

可以查看列表。

如果需要查看topic的詳細信息,需要使用describe命令。

kafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic
#若不指定topic,則查看所有topic的信息
kafka-topics.sh --describe --zookeeper node01:2181
3.1.1.3、刪除topic

通過kafka-topics.sh執行刪除動作,需要在server.properties文件中配置 delete.topic.enable=true,該配置預設為 false。

否則執行該腳本並未真正刪除主題 ,將該topic標記為刪除狀態 。

kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic

# 執行如下
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
Topic my-kafka-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

# 如果將delete.topic.enable=true
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2
Topic my-kafka-topic2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

# 說明:雖然設置後,刪除時依然提示沒有設置為true,實際上已經刪除了。

3.1.2、生產者的操作

kafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic

可以看到,已經向topic發送了消息。

3.1.3、消費者的操作

kafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic
# 通過以上命令,可以看到消費者可以接收生產者發送的消息

# 如果需要從頭開始接收數據,需要添加--from-beginning參數
kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic

file

3.2、通過Java Api操作Kafka

除了通過命令行的方式操作kafka外,還可以通過Java api的方式操作,這種方式將更加的常用。

3.2.1、創建工程

file

導入依賴:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>itcast-bigdata</artifactId>
        <groupId>cn.itcast.bigdata</groupId>
        <version>1.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>itcast-bigdata-kafka</artifactId>

    <dependencies>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

    </dependencies>
    
    <build>
        <plugins>
            <!-- java編譯插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>


</project>

3.2.2、topic的操作

由於主題的元數據信息是註冊在 ZooKeeper 相 應節點之中,所以對主題的操作實質是對 ZooKeeper 中記錄主題元數據信息相關路徑的操作。 Kafka將對 ZooKeeper 的相關操作封裝成一 個 ZkUtils 類 , 井封裝了一個AdrninUtils 類調用 ZkClient 類的相關方法以實現對 Kafka 元數據 的操作,包括對主題、代理、消費者等相關元數據的操作。對主題操作的相關 API調用較簡單, 相應操作都是通過調用 AdminUtils類的相應方法來完成的。

package cn.itcast.kafka;

import kafka.admin.AdminUtils;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;

import java.util.Properties;

public class TestKafkaTopic {

    @Test
    public void testCreateTopic() {
        ZkUtils zkUtils = null;
        try {
            //參數:zookeeper的地址,session超時時間,連接超時時間,是否啟用zookeeper安全機制
            zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());

            String topicName = "my-kafka-topic-test1";
            if (!AdminUtils.topicExists(zkUtils, topicName)) {
                //參數:zkUtils,topic名稱,partition數量,副本數量,參數,機架感知模式
                AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
                System.out.println(topicName + " 創建成功!");
            } else {
                System.out.println(topicName + " 已存在!");
            }
        } finally {
            if (null != zkUtils) {
                zkUtils.close();
            }
        }

    }
}

測試結果:

file

3.2.2.1、刪除topic
    @Test
    public void testDeleteTopic() {
        ZkUtils zkUtils = null;
        try {
            //參數:zookeeper的地址,session超時時間,連接超時時間,是否啟用zookeeper安全機制
            zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
            String topicName = "my-kafka-topic-test1";
            if (AdminUtils.topicExists(zkUtils, topicName)) {
                //參數:zkUtils,topic名稱
                AdminUtils.deleteTopic(zkUtils, topicName);
                System.out.println(topicName + " 刪除成功!");
            } else {
                System.out.println(topicName + " 不已存在!");
            }
        } finally {
            if (null != zkUtils) {
                zkUtils.close();
            }
        }

    }

測試結果:

file

3.2.3、生產者的操作

package cn.itcast.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import java.util.Properties;

public class TestProducer {

    @Test
    public void testProducer() throws InterruptedException {
        Properties config = new Properties();

        // 設置kafka服務列表,多個用逗號分隔
        config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
        // 設置序列化消息 Key 的類
        config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 設置序列化消息 value 的類
        config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 初始化
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
        for (int i = 0; i < 100 ; i++) {
            ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
            // 發送消息
            kafkaProducer.send(record);
            System.out.println("發送消息 --> " + i);

            Thread.sleep(100);
        }

        kafkaProducer.close();

    }

}

3.2.4、消費者的操作

package cn.itcast.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;

import javax.sound.midi.Soundbank;
import java.util.Arrays;
import java.util.Properties;

public class TestConsumer {

    @Test
    public void testConsumer() {
        Properties config = new Properties();
        // 設置kafka服務列表,多個用逗號分隔
        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
        // 設置消費者分組id
        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        // 設置序反列化消息 Key 的類
        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 設置序反列化消息 value 的類
        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());


        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
        // 訂閱topic
        kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));

        while (true) { // 使用死迴圈不斷的拉取數據
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                String value = record.value();
                long offset = record.offset();
                System.out.println("value = " + value + ", offset = " + offset);
            }
        }

    }
}

什麼是Kafka?
Kafka監控工具彙總
Kafka快速入門
Kafka核心之Consumer
Kafka核心之Producer

替代Flume——Kafka Connect簡介
最簡單流處理引擎——Kafka Streams簡介

更多實時計算,Flink,Kafka等相關技術博文,歡迎關註實時流式計算

file


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • [20190909]完善vim的bccacl插件.txthttp://blog.itpub.net/267265/viewspace-2140886/http://blog.itpub.net/267265/viewspace-2140823/http://blog.itpub.net/267265 ...
  • 合理配置tempdb 1.tempdb在SQL Server停掉,重啟時會自動的drop,re-create. 根據model資料庫會預設建立一個新的 2.tempdb對IO的要求比較高,最好分配到高IO的磁碟上且與其他的數據文件分到不用的磁碟上,以提高讀寫效率 3.應該根據根據CPU個數來配置te ...
  • 學習T SQL時記錄的筆記,記得並不全也不詳細 <! more if和while語句 自定義函數 分為標量函數、表值函數(內聯表值函數和多語句表值函數) 標量函數:只返回一個基礎類型數據的值 表值函數:返回一個table類型的結果集 內聯表值函數 多語句表值函數 多語句表值函數可以看作是標量函數和內 ...
  • 在MySQL中常用數據類型主要分為以下幾類:數值類型、字元串類型、日期時間類型。 數值類型 字元串類型 日期時間類型 ...
  • 1.初識 1.資料庫 1. 什麼角色: 用戶名密碼 商品價格等信息 對數據的處理更便捷 2. web程式 資料庫管理員專門管理 是一個開 3. 資料庫的縮寫 db 4. DBMS 資料庫管理系統 5. mysql RDBMS 關係型資料庫管理系統 6. 解決了: 1. 文件操作的效率和便捷問題 2. ...
  • MySQL中的日誌包括:錯誤日誌、通用查詢日誌、二進位日誌、慢查詢日誌等等。這裡主要介紹下比較常用的兩個功能:通用查詢日誌和慢查詢日誌。 錯誤日誌:記錄啟動、運行或停止mysqld時出現的問題。通用日誌:記錄建立的客戶端連接和執行的語句。二進位日誌:記錄所有更改數據的語句。還用於複製。慢查詢日誌:記 ...
  • 七、多表查詢 ​ 對於查詢在之前已經學過了簡單查詢、限定查詢、查詢排序,這些都屬於 SQL 的標準語句,而上一章的單行函數,主要功能是為了彌補查詢的不足。 ​ 而從多表查詢開始就正式進入到了複雜查詢部分。 7.1、基本語法 多表查詢就是在一條查詢語句中,從多張表裡一起取出所需要的數據。如果要想進行多 ...
  • apache Directory Studio 是一個 LDAP 服務的一個圖形化客戶端工具。本文主要講解一下 apache Directory Studio 簡單的使用方法。 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...