Kafka分區與消費者的關係

来源:https://www.cnblogs.com/cjsblog/archive/2018/09/17/9664536.html
-Advertisement-
Play Games

1. 前言 我們知道,生產者發送消息到主題,消費者訂閱主題(以消費者組的名義訂閱),而主題下是分區,消息是存儲在分區中的,所以事實上生產者發送消息到分區,消費者則從分區讀取消息,那麼,這裡問題來了,生產者將消息投遞到哪個分區?消費者組中的消費者實例之間是怎麼分配分區的呢?接下來,就圍繞著這兩個問題一 ...


1.  前言


我們知道,生產者發送消息到主題,消費者訂閱主題(以消費者組的名義訂閱),而主題下是分區,消息是存儲在分區中的,所以事實上生產者發送消息到分區,消費者則從分區讀取消息,那麼,這裡問題來了,生產者將消息投遞到哪個分區?消費者組中的消費者實例之間是怎麼分配分區的呢?接下來,就圍繞著這兩個問題一探究竟。

2.  主題的分區數設置


在server.properties配置文件中可以指定一個全局的分區數設置,這是對每個主題下的分區數的預設設置,預設是1。

當然每個主題也可以自己設置分區數量,如果創建主題的時候沒有指定分區數量,則會使用server.properties中的設置。

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 2 --replication-factor 1

在創建主題的時候,可以使用--partitions選項指定主題的分區數量

[root@localhost kafka_2.11-2.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic abc
Topic:abc       PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: abc      Partition: 0    Leader: 0       Replicas: 0     Isr: 0
        Topic: abc      Partition: 1    Leader: 0       Replicas: 0     Isr: 0

3.  生產者與分區


首先提出一個問題:生產者將消息投遞到分區有沒有規律?如果有,那麼它是如何決定一條消息該投遞到哪個分區的呢?

3.1.  預設的分區策略

The default partitioning strategy:

  • If a partition is specified in the record, use it
  • If no partition is specified but a key is present choose a partition based on a hash of the key
  • If no partition or key is present choose a partition in a round-robin fashion

org.apache.kafka.clients.producer.internals.DefaultPartitioner

預設的分區策略是:

  • 如果在發消息的時候指定了分區,則消息投遞到指定的分區
  • 如果沒有指定分區,但是消息的key不為空,則基於key的哈希值來選擇一個分區
  • 如果既沒有指定分區,且消息的key也是空,則用輪詢的方式選擇一個分區
/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

通過源代碼可以更加作證這一點

4.  分區與消費者


消費者以組的名義訂閱主題,主題有多個分區,消費者組中有多個消費者實例,那麼消費者實例和分區之前的對應關係是怎樣的呢?

換句話說,就是組中的每一個消費者負責那些分區,這個分配關係是如何確定的呢?

同一時刻,一條消息只能被組中的一個消費者實例消費

消費者組訂閱這個主題,意味著主題下的所有分區都會被組中的消費者消費到,如果按照從屬關係來說的話就是,主題下的每個分區只從屬於組中的一個消費者,不可能出現組中的兩個消費者負責同一個分區。

那麼,問題來了。如果分區數大於或者等於組中的消費者實例數,那自然沒有什麼問題,無非一個消費者會負責多個分區,(PS:當然,最理想的情況是二者數量相等,這樣就相當於一個消費者負責一個分區);但是,如果消費者實例的數量大於分區數,那麼按照預設的策略(PS:之所以強調預設策略是因為你也可以自定義策略),有一些消費者是多餘的,一直接不到消息而處於空閑狀態。

話又說回來,假設多個消費者負責同一個分區,那麼會有什麼問題呢?

我們知道,Kafka它在設計的時候就是要保證分區下消息的順序,也就是說消息在一個分區中的順序是怎樣的,那麼消費者在消費的時候看到的就是什麼樣的順序,那麼要做到這一點就首先要保證消息是由消費者主動拉取的(pull),其次還要保證一個分區只能由一個消費者負責。倘若,兩個消費者負責同一個分區,那麼就意味著兩個消費者同時讀取分區的消息,由於消費者自己可以控制讀取消息的offset,就有可能C1才讀到2,而C1讀到1,C1還沒處理完,C2已經讀到3了,則會造成很多浪費,因為這就相當於多線程讀取同一個消息,會造成消息處理的重覆,且不能保證消息的順序,這就跟主動推送(push)無異。

4.1.  消費者分區分配策略

org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

如果是自定義分配策略的話可以繼承AbstractPartitionAssignor這個類,它預設有3個實現

4.1.1.  range

range策略對應的實現類是org.apache.kafka.clients.consumer.RangeAssignor

這是預設的分配策略

可以通過消費者配置中partition.assignment.strategy參數來指定分配策略,它的值是類的全路徑,是一個數組

/**
 * The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
 * and the consumers in lexicographic order. We then divide the number of partitions by the total number of
 * consumers to determine the number of partitions to assign to each consumer. If it does not evenly
 * divide, then the first few consumers will have one extra partition.
 *
 * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
 * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
 *
 * The assignment will be:
 * C0: [t0p0, t0p1, t1p0, t1p1]
 * C1: [t0p2, t1p2]
 */

range策略是基於每個主題的

對於每個主題,我們以數字順序排列可用分區,以字典順序排列消費者。然後,將分區數量除以消費者總數,以確定分配給每個消費者的分區數量。如果沒有平均劃分(PS:除不盡),那麼最初的幾個消費者將有一個額外的分區。

簡而言之,就是,

1、range分配策略針對的是主題(PS:也就是說,這裡所說的分區指的某個主題的分區,消費者值的是訂閱這個主題的消費者組中的消費者實例)

2、首先,將分區按數字順序排行序,消費者按消費者名稱的字典序排好序

3、然後,用分區總數除以消費者總數。如果能夠除盡,則皆大歡喜,平均分配;若除不盡,則位於排序前面的消費者將多負責一個分區

例如,假設有兩個消費者C0和C1,兩個主題t0和t1,並且每個主題有3個分區,分區的情況是這樣的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那麼,基於以上信息,最終消費者分配分區的情況是這樣的:

C0: [t0p0, t0p1, t1p0, t1p1]

C1: [t0p2, t1p2]

為什麼是這樣的結果呢?

因為,對於主題t0,分配的結果是C0負責P0和P1,C1負責P2;對於主題t2,也是如此,綜合起來就是這個結果

上面的過程用圖形表示的話大概是這樣的:

閱讀代碼,更有助於理解:

public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
    //    主題與消費者的映射                                                            
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());

    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        String topic = topicEntry.getKey();    //    主題
        List<String> consumersForTopic = topicEntry.getValue();    //    消費者列表

        //    partitionsPerTopic表示主題和分區數的映射
        //    獲取主題下有多少個分區
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic == null)
            continue;

        //    消費者按字典序排序
        Collections.sort(consumersForTopic);

        //    分區數量除以消費者數量
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        //    取模,餘數就是額外的分區
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            //    分配分區
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}

4.1.2.  roundrobin(輪詢)

roundronbin分配策略的具體實現是org.apache.kafka.clients.consumer.RoundRobinAssignor

/**
 * The round robin assignor lays out all the available partitions and all the available consumers. It
 * then proceeds to do a round robin assignment from partition to consumer. If the subscriptions of all consumer
 * instances are identical, then the partitions will be uniformly distributed. (i.e., the partition ownership counts
 * will be within a delta of exactly one across all consumers.)
 *
 * For example, suppose there are two consumers C0 and C1, two topics t0 and t1, and each topic has 3 partitions,
 * resulting in partitions t0p0, t0p1, t0p2, t1p0, t1p1, and t1p2.
 *
 * The assignment will be:
 * C0: [t0p0, t0p2, t1p1]
 * C1: [t0p1, t1p0, t1p2]
 *
 * When subscriptions differ across consumer instances, the assignment process still considers each
 * consumer instance in round robin fashion but skips over an instance if it is not subscribed to
 * the topic. Unlike the case when subscriptions are identical, this can result in imbalanced
 * assignments. For example, we have three consumers C0, C1, C2, and three topics t0, t1, t2,
 * with 1, 2, and 3 partitions, respectively. Therefore, the partitions are t0p0, t1p0, t1p1, t2p0,
 * t2p1, t2p2. C0 is subscribed to t0; C1 is subscribed to t0, t1; and C2 is subscribed to t0, t1, t2.
 *
 * Tha assignment will be:
 * C0: [t0p0]
 * C1: [t1p0]
 * C2: [t1p1, t2p0, t2p1, t2p2]
 */

輪詢分配策略是基於所有可用的消費者和所有可用的分區的

與前面的range策略最大的不同就是它不再局限於某個主題

如果所有的消費者實例的訂閱都是相同的,那麼這樣最好了,可用統一分配,均衡分配

例如,假設有兩個消費者C0和C1,兩個主題t0和t1,每個主題有3個分區,分別是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2

那麼,最終分配的結果是這樣的:

C0: [t0p0, t0p2, t1p1]

C1: [t0p1, t1p0, t1p2]

用圖形表示大概是這樣的:

假設,組中每個消費者訂閱的主題不一樣,分配過程仍然以輪詢的方式考慮每個消費者實例,但是如果沒有訂閱主題,則跳過實例。當然,這樣的話分配肯定不均衡。

什麼意思呢?也就是說,消費者組是一個邏輯概念,同組意味著同一時刻分區只能被一個消費者實例消費,換句話說,同組意味著一個分區只能分配給組中的一個消費者。事實上,同組也可以不同訂閱,這就是說雖然屬於同一個組,但是它們訂閱的主題可以是不一樣的。

例如,假設有3個主題t0,t1,t2;其中,t0有1個分區p0,t1有2個分區p0和p1,t2有3個分區p0,p1和p2;有3個消費者C0,C1和C2;C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1和t2。那麼,按照輪詢分配的話,C0應該負責

首先,肯定是輪詢的方式,其次,比如說有主題t0,t1,t2,它們分別有1,2,3個分區,也就是t0有1個分區,t1有2個分區,t2有3個分區;有3個消費者分別從屬於3個組,C0訂閱t0,C1訂閱t0和t1,C2訂閱t0,t1,t2;那麼,按照輪詢分配的話,C0應該負責t0p0,C1應該負責t1p0,其餘均由C2負責。

上述過程用圖形表示大概是這樣的:

為什麼最後的結果是

C0: [t0p0]

C1: [t1p0]

C2: [t1p1, t2p0, t2p1, t2p2]

這是因為,按照輪詢t0p1由C0負責,t1p0由C1負責,由於同組,C2只能負責t1p1,由於只有C2訂閱了t2,所以t2所有分區由C2負責,綜合起來就是這個結果

細想一下可以發現,這種情況下跟range分配的結果是一樣的

5.  測試代碼

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cjs.example</groupId>
    <artifactId>kafka-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafka-demo</name>
    <description></description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
package com.cjs.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class HelloProducer {

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("abc", Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (null != e) {
                        e.printStackTrace();
                    }else {
                        System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());
                    }
                }
            });
        }
        producer.close();

    }
}
package com.cjs.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class HelloConsumer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.133:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
//        props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("foo", "bar", "abc"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("partition = %s, offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}

6.  參考


http://kafka.apache.org/documentation/#consumerconfigs

https://blog.csdn.net/feelwing1314/article/details/81097167

https://blog.csdn.net/OiteBody/article/details/80595971

https://blog.csdn.net/YChenFeng/article/details/74980531

 


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 此系列將記錄本人從開始到結束做物料管理系統的過程 登錄界面的設計 此博客將實現如下界面: 當用戶名或密碼沒輸入時將顯示相應的提示信息,採用java swing實現 代碼: ...
  • 數組中有一個數字出現的次數超過數組長度的一半,請找出這個數字。例如輸入一個長度為9的數組{1,2,3,2,2,2,5,4,2}。由於數字2在數組中出現了5次,超過數組長度的一半,因此輸出2。如果不存在則輸出0。 兩種方式: 1.定義一個新數組arr,遍曆數組給arr賦值,arr[元素]=出現的次數 ... ...
  • 已經有兩個月沒有寫博客了,也有好幾個月沒有看go相關的內容了,由於工作原因最近在做java以及大數據相關的內容,導致最近工作較忙,博客停止了更新,正好想撿起之前go的東西,所以找了一個源碼學習 這個也是之前用go寫日誌收集的時候用到的一個包 :github.com/hpcloud/tail, 這次就 ...
  • 目前主要功能是完成知乎視頻的下載. 在抓包和網頁分析發現有blob:https://...格式的視頻鏈接, 但是無法訪問, 不過知乎好像是m3u8格式的, 具體的我也不太清楚, 但這並不妨礙我們的下載工作. 其中ts就是被分割後的相對url, 拼接後就可以下載播放了, 不過這裡還要做的就是將所有被分 ...
  • 比賽鏈接 上紫啦hhh,好開心 每個題裡面都有中文翻譯,我就不說題意了。。 A 直接模擬即可 #include<cstdio> #include<algorithm> #define int long long using namespace std; const int MAXN = 1e5 + ...
  • 先說下我寫這個爬蟲的思路吧:首先是利用selenium模擬瀏覽器,然後搜索貼吧名,如果不存在就提示“沒有找到該貼吧”,然後重新輸入,如果搜到了這個貼吧,就進入該貼吧並且顯示該貼吧首頁上的所有帖子,然後我們可以選擇查看哪一個帖子,程式就會返回該帖子的內容,對於評論較多的帖子,還可以查看下一頁,如果想看 ...
  • 機器語言: 優點: 最底層,所以速度最快! 缺點: 最複雜,所以開發效率低! 彙編語言: 優點: 最底層,所以速度最快! 缺點: 最複雜,所以開發效率低! 高級語言: 1.編譯類: 優點: 語言執行速度快,不依賴語言運行環境! 缺點: 跨平臺差。 2.解釋類: 優點: 跨平臺方便,一份代碼可以到處用 ...
  • 本次爬取用到的知識點有: 1. selenium 2. pymysql 3 pyquery 正文 1. 分析目標網站 1. 打開某寶首頁, 輸入"男裝"後點擊"搜索", 則跳轉到"男裝"的搜索界面. 2. 空白處"右擊"再點擊"檢查"審查網頁元素, 點擊"Network". 1) 找到對應的URL, ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...