Kafka集群調優+能力探底

来源:https://www.cnblogs.com/xijiu/archive/2023/12/05/17878078.html
-Advertisement-
Play Games

2.7Python(目前ArcGIS使用)代碼轉化為3.5Python(目前ArcGIS Pro使用)代碼 Analyze Tools For Pro (2to3命令) 基本操作 調用ArcToolbox的兩種形式 #arcpy.ToolboxAlias.ToolName() #arcpy.Tool ...


一、前言

我們需要對4個規格的kafka能力進行探底,即其可以承載的最大吞吐;4個規格對應的單節點的配置如下:

  • 標準版: 2C4G
  • 鉑金版: 4C8G
  • 專業版: 8C16G
  • 企業版: 16C32G

另外,一般來講,在同配置下,kafka的讀性能是要優於寫性能的,寫操作時,數據要從網卡拷貝至堆記憶體,然後進行一堆數據校驗、解析後,會將數據拷貝至堆外記憶體,然後再拷貝至操作系統的page cache,最後操作系統非同步刷盤至設備中。而讀操作時,kafka使用了零拷貝技術,數據會從disk或page cache直接拷貝到網卡,節省了大量的記憶體拷貝。因此我們這次探底將聚焦於鏈路的短板,即kafka的寫操作進行壓測

註:本文不是專業的壓測報告,而是針對不同集群調優,以獲取其最大的吞吐能力

二、磁碟能力探底

在真正開始對kafka壓測前,我們首先對磁碟的能力進行一個摸底。因為kafka是典型的數據型應用,是強依賴磁碟性能的,一旦有了這個數據,那麼這個就是kafka的性能天花板。如果磁碟是傳統的機械磁碟,那麼瓶頸毫無懸念一般都會落在磁碟上;但如果磁碟類型是SSD,而且性能很高的話,操作系統會極力壓榨cpu,從而獲取一個最大刷盤吞吐,因此瓶頸在哪就很難講了

要測試磁碟吞吐的話,2個變參的影響較大:

  1. 單次寫入磁碟的數據量
  2. 寫盤的線程數

2.1、單次刷盤大小

現在的硬體廠商,對於磁碟的優化,基本上都是4K對齊的,因此我們的參數一般也要設置為4K的整數倍,例如4K\8K\16K... 如果單次寫入量小於4K,例如只寫了10byte,那麼底層刷盤的時候,也會刷4K的量,這就是臭名昭著的寫放大

而具體單次寫多少數據量能達到最優呢? 這就需要不斷的benchmark了

2.2、刷盤線程數

我們知道kafka的broker通過參數num.io.threads來控制io的線程數量,通常是cpu * 2,不過這個參數並不能真實反應在同一時刻寫盤的線程數,因此我們探底的時候,也需要動態修改這個參數,從而獲取磁碟真實的吞吐

2.3、探底工具

public class DiskMain2 {
    private static final long EXE_KEEP_TIME = 30 * 1000;

    public static void main(String[] args) throws Exception {
        new DiskMain2().begin(args);
    }

    private void begin(String[] args) throws Exception {
        AtomicLong totalLen = new AtomicLong();
        long begin = System.currentTimeMillis();
        int threadNum = args.length > 0 ? Integer.parseInt(args[0]) : 4;
        int msgK = args.length > 1 ? Integer.parseInt(args[1]) : 16;
        int size = msgK * 1024;
        List<Thread> threadList = new ArrayList<>();
        for (int j = 0; j < threadNum; j++) {
            Thread thread = new Thread(() -> {
                try {
                    File file = new File("/bitnami/kafka/" + UUID.randomUUID() + ".txt");
                    file.createNewFile();
                    FileChannel channel = FileChannel.open(Paths.get(file.getPath()), StandardOpenOption.WRITE);
                    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(size);
                    for (int i = 0; ; i++) {
                        byteBuffer.clear();
                        byteBuffer.position(size);
                        byteBuffer.flip();
                        channel.write(byteBuffer);
                        if (i % 100 == 0) {
                            long cost = System.currentTimeMillis() - begin;
                            if (cost > EXE_KEEP_TIME) {
                                break;
                            }
                        }
                    }
                    channel.force(false);
                    totalLen.addAndGet(file.length());
                    file.delete();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
            thread.start();
            threadList.add(thread);
        }
        for (Thread thread : threadList) {
            thread.join();
        }
        long cost = (System.currentTimeMillis() - begin) / 1000;
        System.out.println((totalLen.get() / 1024 / 1024) + " MB");
        System.out.println(cost + " sec");
        System.out.println(totalLen.get() / cost / 1024 / 1024 + " MB/sec");
    }
}

簡單描述下這個工具做的事兒:啟動M(可配)個線程,每個線程單次往磁碟中寫入N(可配)K的數據,整個過程持續30秒,然後統計所有寫入文件的總大小,最後除以時間,從而計算磁碟吞吐

幾個註意點:

  • 大塊的磁碟寫入,一定要使用FileChannel,與kafka中的log寫入對齊
  • 儘量減少cpu的使用,將壓力下放給磁碟,demo中使用的ByteBuffer,通過修改position的值模擬大塊數據
  • 使用DirectByteBuffer,減少一次堆記憶體 --> 對外記憶體的拷貝

2C4G

[root@jmc-pod kafka]# java DiskMain 2 4 802 MB/sec

[root@jmc-pod kafka]# java DiskMain 2 8 1016 MB/sec

[root@jmc-pod kafka]# java DiskMain 2 16 1105 MB/sec

[root@jmc-pod kafka]# java DiskMain 2 32 942 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 4 1013 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 8 941 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 16 1062 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 32 1076 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 4 916 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 8 993 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 16 1035 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 32 982 MB/sec

4C8G

[root@jmc-pod kafka]# java DiskMain 2 16 1320 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 4 2172 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 8 2317 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 16 2580 MB/sec

[root@jmc-pod kafka]# java DiskMain 4 32 2271 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 4 2150 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 8 2315 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 16 2498 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 32 2536 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 64 2434 MB/sec

8C16G

[root@jmc-pod kafka]# java DiskMain 4 16 2732 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 4 3440 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 8 3443 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 16 3531 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 32 3561 MB/sec

[root@jmc-pod kafka]# java DiskMain 8 64 3562 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 4 3515 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 8 3573 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 16 3659 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 32 3673 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 64 3674 MB/sec

[root@jmc-pod kafka]# java DiskMain 12 16 3672 MB/sec

16C32G

[root@jmc-pod kafka]# java DiskMain 16 16 3918 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 8 3814 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 32 3885 MB/sec

[root@jmc-pod kafka]# java DiskMain 16 64 3894 MB/sec

[root@jmc-pod kafka]# java DiskMain 24 8 4053 MB/sec

[root@jmc-pod kafka]# java DiskMain 24 16 4039 MB/sec

[root@jmc-pod kafka]# java DiskMain 24 32 4080 MB/sec

[root@jmc-pod kafka]# java DiskMain 24 64 4050 MB/sec

[root@jmc-pod kafka]# java DiskMain 32 16 4042 MB/sec

[root@jmc-pod kafka]# java DiskMain 32 32 4078 MB/sec

通過反覆壓測,得出如下結論:

規格

最大吞吐量

cpu

參數

2C4G

1105 MB/sec

200%

2線程,16K

4C8G

2580 MB/sec

400%

4線程,16K

8C16G

3674 MB/sec

790%

16線程,64K

16C32G

4080 MB/sec

950%

24線程,32K

在kafka 3副本的經典協議下,上述表格便是其吞吐量的天花板。其中16C32G在算力上雖然比8C16G強了1倍,但其落盤速度卻基本持平,在大數據的壓力下,瓶頸終究會落在磁碟,因此可以大膽預測,其性能不會比8C16G高出太多

分析上述數據可知:

  • 2C4G:磁碟的吞吐量約1G/s,遠沒有達到上限,此時的瓶頸在cpu
  • 4C8G:吞吐量雖上升了一倍不止,不過cpu飈滿,瓶頸還在cpu
  • 8C16G:吞吐量約為3.5G/s,相比較4C8G並沒有翻倍,後端的cpu幾乎吃滿,光看這組數據不好定位瓶頸
  • 16C32G:終於探到磁碟的底了,在cpu還有大量剩餘的前提下,磁碟明顯寫不動了

磁碟的性能是真好,居然能壓出 4 GB/s的速率,嘆嘆

三、Kafka吞吐量概述

一般描述某個kafka集群的吞吐量時,通常寫為 3*n MB/s,例如 3*100 MB/s。 之所以習慣這樣描述,是基於kafka自身的3副本協議,即1主2備的模式,leader收到業務流量n後,2個follower還需要從leader將數據同步過來,這樣在集群角度看來,是一共處理了3*n流量

某個topic所擁有的副本數,理論上是不能大於整個集群的broker數量的,因為副本本質上是做高可用的,當topic的副本數大於整個集群的broker數量後,那勢必某個broker存在2個及以上副本,這樣也就喪失了高可用的初衷

3.1、集群橫向擴容

所謂集群橫向擴容,是指為集群添加同構broker,集群的broker數量初始為3,擴容後可能變為了6,這裡的broker數量與topic的replica不是同一個概念,註意區分。

某個topic副本數過多,將帶來集群內部大量的數據流轉,而副本數過少,例如單副本,又存在一些高可用的風險,因此即便隨著broker數量的增多,kafka最佳實踐還是建議將topic的副本數設置為3,這樣每增加3個broker,集群的能力將會得到橫向的擴容

這裡的橫向擴容出來的能力跟broker數量是嚴格呈線性關係的,本文不會對橫向擴容進行壓測對比

3.2、集群縱向擴容

縱向擴容是指集群的broker數量不變,但是提升broker的配置。例如之前集群是3 * 2C4G的規格,進行縱向擴容後,集群將變為3 * 4C8G

broker的配置線性提升了,其提供的吞吐能力也會隨著線性提升嗎? 答案是否定的;如果磁碟用的是機械磁碟,我們可能很快能夠斷言瓶頸將卡在disk上,但SSD的高吞吐也是非常吃cpu的,內容比較複雜,記憶體、磁碟、cpu等都息息相關,這裡沒有很好的捷徑,只能benchmark

縱向壓測、對比也是本文的重心

四、發壓準備

4.1、客戶端準備

4.1.1、發壓程式

發壓程式使用官方的工具kafka-producer-perf-test.sh,這個工具實際調用的是kafka內核中的類:

org.apache.kafka.tools.ProducerPerformance

當然,單個Producer的pool、開闢記憶體、與server端的連接等都是有上限的,因此真正發壓時,需要啟動多個發壓進程。發壓腳本如下

bash kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.10:9094,10.0.0.11:9094,10.0.0.12:9094 \
	acks=1  \
  buffer.memory=134217728 \
--producer.config=admin5.conf \
--topic topic_6 \
--throughput=-1  \
--num-records 100000000 \
--record-size 1024000

admin5.conf 配置如下(因為開啟了ACL認證,因此需要申明SASL配置)

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-a9asfbx5pl" password="a9asfbxmsn";

關於發壓腳本的參數配置做一下說明

  • bootstrap.servers 連接集群的接入點
  • acks 響應方式,這個對性能影響非常大,這裡使用預設的1。有3種配置
    • 1 : leader收到消息後便返回成功
    • 0 : 不需要等待任何副本確認
    • -1 : 生產者將等待所有的副本接收到消息併進行確認
  • buffer.memory 這裡是設置了producer的128M的緩衝區,預設為32M
  • producer.config 因為目標集群開啟了acl,這裡需要存放一些相關配置
  • throughput 發送流量的一個上限值,-1表示不設置上限
  • num-records 發送消息的條數,因為要壓測,所以這裡放一個大值
  • record-size 每條消息的大小。這裡配置的1M,因為需要壓測集群的極限值,這裡直接設置一個相對大的值
    • 註意,如果消息大小配置較小的話,可以通過調整batch.size及linger.ms參數來控制攢批

4.1.2、發壓機器

因為kafka實例是被k8s孵化出來的,因此獨立開闢了 5台ECS發壓,其配置

10.252.128.183

8C 16G

10.252.128.185

8C 16G

10.252.128.136

4C 8G

10.252.128.140

4C 8G

10.252.128.176

4C 8G

因為發壓程式不會占用大量cpu及記憶體,當開啟多進程壓測時,只要網路帶寬不是瓶頸就OK

4.2、服務端準備

4.2.1、常規壓測配置

集群新建出來後,有一些關鍵的配置還是需要留意設置一下的,否則性能會打很大的折扣

配置項

建議值

說明

num.network.threads

與cpu核數相近

broker端的網路線程,負責將數據從網卡搬運至broker堆記憶體中,這個過程涉及記憶體的拷貝,是一個典型的吃cpu的操作。設置太小,網卡搬運工作將會成為瓶頸,設置太大,又會造成頻繁的線程切換,建議將其設置為cpu+1

num.io.threads

2*cpu + 1

broker端典型的IO線程,所有寫log的操作都是該線程觸發的

log.retention.bytes

-1

parition目錄的最大閾值,當partition目錄超過該值時就會觸發刪除老消息的操作。這個是kafka提供的原生配置,設置為-1,代表不對其做限制

log.flush.interval.messages

Long.MaxValue

接收到指定條數的消息後刷盤,這裡建議配置為最大值,即刷盤的行為留給操作系統自己去控制。此值切勿設置的過小,否則將會導致磁碟頻繁的sync,對性能影響很大

log.flush.interval.ms

null

達到指定時間後將記憶體中的消息刷盤。這個配置項與log.flush.interval.messages類似,也是刷盤的策略,這裡同樣建議不對其設置,交由操作系統來控制已獲得最大的吞吐性能

message.max.bytes

10M

這個值限定了單條消息在broker端的上限,同樣在producer也有一個參數來配置max.request.size,producer端的這個配置一定是要小於server端的

num.replica.fetchers

cpu+1

這個值沒有特定大小,根據不同的場景來設定,如果想保證配置acks=-1的性能較高,那麼需要提高此值,建議設置為cpu+1,否則就是預設的1即可

replica.fetch.max.bytes

10M

設置副本單次拉取的最大位元組數,這個值需要大於單條消息的最大值,否則可能導致性能偏低

4.2.2、副本同步

前文說過,kafka選擇不同的副本同步策略、同步副本數量,對性能影響很大;如果選擇單副本的話,那麼最大吞吐就是上文使用工具測出來的磁碟性能,而如果選擇多副本的話,則整體吞吐的計算公式是是業務流量*副本數,後文我們將針對不同acksnum.replica.fetchers參數以及不同的副本數分別進行壓測,最終得出一個多維參考值

4.2.3、服務端監控

服務端監控主要是查看集群整體流量、broker cpu、記憶體參數。我們通過top命令可以快速獲取cpu、memory參數,而集群整體流量,為了快速獲取,可通過JMX去拉取kafka原生監控項

public class JMXMain {
    public static void main(String[] args) throws Exception {
        new JMXMain().begin(args);
    }

    private void begin(String[] args) throws Exception {
        String metricName = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";
        List<MBeanServerConnection> connectionList = initConnectionList(args);

        while (true) {
            long total = 0L;
            long[] arr = new long[3];
            int index = 0;
            for (MBeanServerConnection connection : connectionList) {
                Optional<ObjectName> first = connection.queryNames(new ObjectName(metricName), null).stream().findFirst();
                if (first.isPresent()) {
                    Object oneMinuteRate = connection.getAttribute(first.get(), "OneMinuteRate");
                    long single = transToM(oneMinuteRate);
                    arr[index++] = single;
                    total += single;
                } else {
                    System.out.println("none");
                }
            }

            System.out.println(Arrays.toString(arr));
            System.out.println("rate is " + total + " MB/sec");
            
            Thread.sleep(10000);
        }
    }

    private List<MBeanServerConnection> initConnectionList(String[] args) throws Exception {
        // 10.0.0.21:9094,10.0.0.22:9094,10.0.0.23:9094
        List<String> connList = new ArrayList<>();
        if (args != null) {
            for (String ip : args) {
                String s = "service:jmx:rmi:///jndi/rmi://" + ip + ":5555/jmxrmi";
                connList.add(s);
            }
        }
        List<MBeanServerConnection> resultList = new ArrayList<>();
//        String[] arr = {"service:jmx:rmi:///jndi/rmi://10.0.0.19:5555/jmxrmi", "service:jmx:rmi:///jndi/rmi://10.0.0.20:5555/jmxrmi", "service:jmx:rmi:///jndi/rmi://10.0.0.21:5555/jmxrmi"};
        for (String jmxUrl : connList) {
            System.out.println(jmxUrl);
            JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(jmxUrl));
            MBeanServerConnection connection = connector.getMBeanServerConnection();
            resultList.add(connection);
        }
        return resultList;
    }
    
    public static long transToM(Object count) {
        if (count == null) {
            return 0;
        } else {
            double v = Double.parseDouble(count.toString()) / 1024 / 1024;
            return (long) v;
        }
    }
}

每隔10秒列印一下集群整體的流量及每個broker各自流量,例如:

[406, 407, 405]
rate is 1218 MB/sec
[409, 409, 407]
rate is 1225 MB/sec
[408, 408, 408]
rate is 1224 MB/sec
[406, 406, 405]
rate is 1217 MB/sec

註意:這裡列印的流量,僅包含leader處理的業務流量,不包括follower從leader同步的備份流量,例如,我創建一個單partition,3副本的topic,然後向集群寫入100MB/s的流量,因為設置了3副本,因此雖然只會向其中某個broker發送數據,但是另外2個broker中同時也均會有100MB/s的備份流量,但是使用上述工具則只會列印leader的流量: [100, 0, 0]

五、發壓

5.1、2C4G

5.1.1、單partition/單副本

最小配置,首先測試一下單partition、單replica 的性能,從而與磁碟極限性能做個對比;這個值體現了kafka單broker的極限能力

創建topic:【1 partition、1 replica、acks=1】

發壓命令

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \ 
--producer-props \
	bootstrap.servers=10.0.0.10:9094,10.0.0.11:9094,10.0.0.12:9094 \
  acks=0 \
  max.request.size=2048000   \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin5.conf \
--topic topic_1_1 \
--throughput=-1  --num-records 100000000 --record-size 1924000

當啟動了8個producer客戶端後,監控集群的吞吐峰值來到了 550MB/s;其實啟動了4個客戶端後,吞吐量就達到了530,後續通過增加客戶端數量帶來的收益越來越小,說明broker端能力已達上限

[0, 546, 0]
rate is 546 MB/sec
[0, 545, 0]
rate is 545 MB/sec
[0, 550, 0]
rate is 550 MB/sec

簡單做個對比

規格

理論峰值

Kafka吞吐

2C4G

1105 MB/sec

550MB/s

在開始對磁碟用工具進行壓測時候,2C4G的規格就因為cpu成為了短板,壓測工具自身沒有消耗cpu的邏輯,幾乎全量的cpu都消耗在了刷盤的操作中

而kafka的構成要比磁碟工具複雜很多,涉及記憶體的數據拷貝、數據解析、正確性驗證、刷盤等,而這些操作無疑會消耗大量cpu,cpu本身就是短板,因此壓測出來的kafka吞吐量會比理論值低很多

因此當前2C4G的3節點的極限能力是 3 * 550MB/s

5.1.2、多partition/三副本/all acks

當選項acks設置為all時,代表只有當3個副本的消息都落盤後,才會response,這個設置也是嚴格的保證了數據的高可用,不會有任何數據的丟失,同時這種配置也是效率最低的,我們創建一個 6 partition,3副本的topic,同時將acks設置為all,再查看此時的性能,做一個對比

創建topic:【6 partition、3 replica、acks=all】

發壓命令

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.10:9094,10.0.0.11:9094,10.0.0.12:9094 \
  acks=-1 \
  max.request.size=2048000   \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin5.conf \
--topic topic_6_3 \
--throughput=-1  --num-records 100000000 --record-size 1924000

啟動了4個producer客戶端後,監控集群的吞吐峰值來到了 3 * 320MB/s

[106, 107, 105]
rate is 318 MB/sec
[107, 107, 106]
rate is 320 MB/sec
[108, 108, 106]
rate is 322 MB/sec
[108, 107, 107]
rate is 322 MB/sec
[108, 107, 107]
rate is 322 MB/sec
[108, 106, 107]
rate is 321 MB/sec

4個客戶端的延遲都已經很高,達到了400ms左右,說明數據都積攢到了broker端排隊處理,4個客戶端數據採樣:

terminal-1:
230 records sent, 45.8 records/sec (84.07 MB/sec), 395.2 ms avg latency, 2228.0 ms max latency.
223 records sent, 44.4 records/sec (81.56 MB/sec), 437.7 ms avg latency, 1771.0 ms max latency.

terminal-4:
216 records sent, 43.1 records/sec (79.11 MB/sec), 429.5 ms avg latency, 1628.0 ms max latency.
225 records sent, 45.0 records/sec (82.54 MB/sec), 399.1 ms avg latency, 1279.0 ms max latency.

可見,acks參數的不同,對最終結果的影響甚大

因cpu核數只有2,因此調整num.replica.fetchers參數對最終的壓測影響不大,後續等cpu核數增加後可以考慮增加此值

5.1.3、多partition/三副本/leader

然而我們實際生產中,通常既不會將topic的副本數設置為1,也不會將acks設置為all,那麼這個時候的最大流量值,體現的便是集群能夠處理業務流量的峰值,一旦這個值超過了550MB/s,那麼follower一定出現不同程度的落後leader的現象,等流量回落後,follower再逐步進行追趕,因此這個值也是具有相當重要的參考價值

創建topic:【3 partition、3 replica、acks=1】

發壓命令

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=10.0.0.10:9094,10.0.0.11:9094,10.0.0.12:9094 acks=1 max.request.size=2048000   --producer.config=/root/kafka_2.12-2.8.2/bin/admin5.conf --topic topic_3_3 --throughput=-1  --num-records 100000000 --record-size 1924000

啟動了12個producer客戶端後,監控集群的業務流量峰值來到約了 1GB/s

[347, 349, 343]
rate is 1039 MB/sec
[340, 345, 342]
rate is 1027 MB/sec
[339, 344, 341]
rate is 1024 MB/sec
[343, 347, 344]
rate is 1034 MB/sec

客戶端的延遲都已經達到了400ms左右,說明瓶頸不在客戶端側,客戶端數據採樣:

terminal-1:
228 records sent, 45.6 records/sec (83.60 MB/sec), 412.7 ms avg latency, 1173.0 ms max latency.
268 records sent, 53.5 records/sec (98.11 MB/sec), 334.0 ms avg latency, 774.0 ms max latency.

terminal-3:
218 records sent, 43.4 records/sec (79.62 MB/sec), 394.7 ms avg latency, 1422.0 ms max latency.
252 records sent, 50.3 records/sec (92.35 MB/sec), 377.1 ms avg latency, 1226.0 ms max latency.

註意:我這裡並沒有使用 3 * 1GB/s 的描述,是因為雖然leader確實已經接受了1GB/s的流量,但是其並沒有在同一時刻事實同步給follower,事實上,隨著時間的推移,follower已經落後的越來越多

5.1.4、整理總結

能力

描述

流量

單broker磁碟能力

當前cpu+磁碟所具備的極限寫入能力,作為判斷kafka能力的參考

1105 MB/s

嚴格寫入能力

嚴格高可用地寫入數據,即acks=all的方式寫入數據,這種模式下,3個broker中的數據齊頭併進

3 * 320 MB/s

常規集群能力

用的最多的方式, acks=1,即寫leader的模式,也是討論最多的模式

3 * 550 MB/s

可應對峰值能力

這種模式下,leader的流量將遠大於follower的,

1000 MB/s

5.2、4C8G

相關認證配置 admin7.conf

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-acegzesojh" password="acegzesfmj";

5.2.1、單partition/單副本

創建topic:【1 partition、1 replica、acks=1】

當啟動了8個producer端時,集群的流量來到580 MB/s左右,這個值與2C4G的基本持平,難道它們兩個的性能相當嗎?其實不盡然,因為單partition、單副本的case,註定broker將會是同時寫入1個文件,此時的瓶頸將落在IO上,因此,單純的加cpu是不會提升吞吐的

[0, 584, 0]
rate is 584 MB/sec
[0, 589, 0]
rate is 589 MB/sec
[0, 586, 0]
rate is 586 MB/sec

看一下cpu的使用率

cpu基本維持在180%上下,而4C的上限是400%

5.2.2、multi 【單partition/單副本】

創建topic:4 * 【1 partition、1 replica、acks=1】

既然1個topic無法探知broker的上限,那我們就創建多個【單partition/單副本】的topic,使其落在同一個broker上,然後再向這個broker發壓即可。(也可以通過手動指定將某個topic的分區都分佈在1台broker上實現)

查看topic_1_1的ISR分佈情況:

bash /root/kafka_2.12-2.8.2/bin/kafka-topics.sh \
--bootstrap-server 10.0.0.5:9094,10.0.0.9:9094,10.0.0.18:9094 \
--command-config  /root/kafka_2.12-2.8.2/bin/admin7.conf  \
--describe --topic topic_1_1

返回結果

Topic: topic_1_1	Partition: 0	Leader: 1000	Replicas: 1000	Isr: 1000

通過這種方式,選取4個topic:topic_1_1、topic_i_1_1、topic_j_1_1、topic_n_1_1,然後每個topic啟動4個producer進行發壓,也就是一共開啟了16個client端發壓

首先看一下broker端的流量統計指標,單broker的流量來到了 1GB/s

[0, 1009, 0]
rate is 1009 MB/sec
[0, 1016, 0]
rate is 1016 MB/sec
[0, 1021, 0]
rate is 1021 MB/sec
[0, 1016, 0]
rate is 1016 MB/sec
[0, 1015, 0]
rate is 1015 MB/sec

再觀察一下cpu利用率,維持在400%,已經打滿

客戶端的監控日誌採樣。延遲也高達500ms,說明壓力已經完全給到了broker

196 records sent, 38.8 records/sec (71.24 MB/sec), 457.4 ms avg latency, 560.0 ms max latency.
167 records sent, 33.4 records/sec (61.21 MB/sec), 536.9 ms avg latency, 649.0 ms max latency.
147 records sent, 29.1 records/sec (53.35 MB/sec), 623.0 ms avg latency, 782.0 ms max latency.
184 records sent, 36.6 records/sec (67.19 MB/sec), 489.4 ms avg latency, 588.0 ms max latency.
189 records sent, 37.8 records/sec (69.33 MB/sec), 478.6 ms avg latency, 563.0 ms max latency.

至此,探知當前配置的單broker的處理上限為 1 GB/s,因此集群的最大吞吐為 3 * 1 GB/s

5.2.3、多partition/三副本/all acks

創建topic:【6 partition、3 replica、acks=-1】

發壓命令

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.27:9094,10.0.0.28:9094,10.0.0.29:9094 \
  acks=-1 max.request.size=2048000 \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin77.conf  \
--topic topic_a_6_3 --throughput=-1  \
--num-records 100000000 --record-size 1924000

吞吐停留在330 MB/s,怎麼跟2C4G的相差不大呢?

[112, 111, 111]
rate is 334 MB/sec
[112, 111, 111]
rate is 334 MB/sec
[112, 111, 112]
rate is 335 MB/sec

這裡別忘了一個參數num.replica.fetchers,這個參數預設為1,調大這個參數將加快follower從leader拉取數據的速率;我們首先看下當前這個參數的設置:

sh kafka-configs.sh \
--bootstrap-server 10.0.0.27:9094,10.0.0.28:9094,10.0.0.29:9094 \
--command-config  admin77.conf --all \
--entity-type brokers --describe  |  grep "num.replica.fetchers"

最終返回

num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}
num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}
num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}

這個參數是可以調用命令進行直接修改的,我們將其修改為cpu核數

sh kafka-configs.sh \
--bootstrap-server 10.0.0.27:9094,10.0.0.28:9094,10.0.0.29:9094 \
--command-config  admin77.conf \
--alter --entity-type brokers --entity-default \
--add-config 'num.replica.fetchers=4' 

最終broker的性能提升至了550 MB/s

[181, 182, 182]
rate is 545 MB/sec
[183, 184, 183]
rate is 550 MB/sec
[183, 184, 183]
rate is 550 MB/sec

cpu利用率也相當低,大量的耗時都停留在三副本sync上

5.2.4、多partition/三副本/leader

將參數“num.replica.fetchers”調整為預設值後,同時將acks設置為1,再次發壓

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.27:9094,10.0.0.28:9094,10.0.0.29:9094 \
  acks=1 max.request.size=2048000 \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin77.conf  \
--topic topic_a_6_3 --throughput=-1  \
--num-records 100000000 --record-size 1924000

共啟動了16個客戶端,流量來到了2200 MB/s

[734, 737, 733]
rate is 2204 MB/sec
[734, 736, 744]
rate is 2214 MB/sec
[737, 745, 741]
rate is 2223 MB/sec

同時cpu被打滿

部分發壓程式日誌採樣。隨著producer的增多,吞吐量維持在恆定值

432 records sent, 86.2 records/sec (158.12 MB/sec), 198.7 ms avg latency, 1601.0 ms max latency.
378 records sent, 75.4 records/sec (138.33 MB/sec), 238.2 ms avg latency, 1297.0 ms max latency.
413 records sent, 82.4 records/sec (151.17 MB/sec), 222.9 ms avg latency, 1315.0 ms max latency.
353 records sent, 70.5 records/sec (129.39 MB/sec), 268.1 ms avg latency, 1442.0 ms max latency.

5.2.5、整理總結

能力

描述

流量

單broker磁碟能力

當前cpu+磁碟所具備的極限寫入能力,作為判斷kafka能力的參考

2580 MB/s

嚴格寫入能力

嚴格高可用地寫入數據,即acks=all的方式寫入數據,這種模式下,3個broker中的數據齊頭併進

3 * 550 MB/s

常規集群能力

用的最多的方式, acks=1,即寫leader的模式,也是討論最多的模式

3 * 1000 MB/s

可應對峰值能力

這種模式下,leader的流量將遠大於follower的,會產生了流量傾斜

2250 MB/s

5.3、8C16G

用到的配置信息 admin6.conf

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-acntgcoin9" password="acntgcozqb";

5.3.1、multi 【單partition/單副本】

創建topic:【12 partition、1 replica、acks=1】

通過命令創建topic,將12個分區全部都放在第一個broker上

bash /root/kafka_2.12-2.8.2/bin/kafka-topics.sh \
--bootstrap-server 10.0.0.5:9094,10.0.0.6:9094,10.0.0.7:9094 \
--command-config  /root/kafka_2.12-2.8.2/bin/admin6.conf  \
--create --topic topic_1_1 \
--replica-assignment 1000,1000,1000,1000,1000,1000,1000,1000,1000,1000,1000,1000

啟動28個producer端發壓客戶端後,broker流量趨於穩定

[1964, 0, 0]
rate is 1964 MB/sec
[1959, 0, 0]
rate is 1959 MB/sec
[1962, 0, 0]
rate is 1962 MB/sec

查看對應pod的cpu使用率,已經打滿

最終得出結論,單台broker的吞吐上限為 1.9 GB/s

5.3.2、多partition/三副本/all acks

創建topic:【12 partition、3 replica、acks=-1】

當前規格配置較高,需要創建更多的partition以壓榨更多的cpu資源

查看參數num.replica.fetchers

sh kafka-configs.sh \
--bootstrap-server 10.0.0.5:9094,10.0.0.6:9094,10.0.0.7:9094 \
--command-config  admin6.conf --all \
--entity-type brokers --describe  |  grep "num.replica.fetchers"

返回

  num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}
  num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}
  num.replica.fetchers=1 sensitive=false synonyms={DEFAULT_CONFIG:num.replica.fetchers=1}

用同樣的方法查看參數replica.fetch.max.bytes,返回

replica.fetch.max.bytes=30240000 sensitive=false synonyms={STATIC_BROKER_CONFIG:replica.fetch.max.bytes=30240000, DEFAULT_CONFIG:replica.fetch.max.bytes=1048576}
replica.fetch.max.bytes=30240000 sensitive=false synonyms={STATIC_BROKER_CONFIG:replica.fetch.max.bytes=30240000, DEFAULT_CONFIG:replica.fetch.max.bytes=1048576}
replica.fetch.max.bytes=30240000 sensitive=false synonyms={STATIC_BROKER_CONFIG:replica.fetch.max.bytes=30240000, DEFAULT_CONFIG:replica.fetch.max.bytes=1048576}

將參數num.replica.fetchers修改為cpu核數

sh kafka-configs.sh \
--bootstrap-server 10.0.0.5:9094,10.0.0.6:9094,10.0.0.7:9094 \
--command-config  admin6.conf \
--alter --entity-type brokers --entity-default \
--add-config 'num.replica.fetchers=8' 

發壓

bash /root/kafka_2.12-2.8.2/bin/kafka-producer-perf-test.sh \
--producer-props \
	bootstrap.servers=10.0.0.5:9094,10.0.0.6:9094,10.0.0.7:9094 \
	acks=-1  \
max.request.size=2048000   \
--producer.config=/root/kafka_2.12-2.8.2/bin/admin6.conf \
--topic topic_a_12_3 --throughput=-1  --num-records 100000000 --record-size 1924000

一共啟動了24個producer壓測,強勁的cpu發揮了作用,寫入速率達到了950 M/s

[315, 315, 315]
rate is 945 MB/sec
[315, 317, 318]
rate is 950 MB/sec
[313, 317, 317]
rate is 947 MB/sec

5.3.3、多partition/三副本/leader

創建topic:【12 partition、3 replica、acks=1】

按照常規,我們壓一下三副本寫leader的case;此時num.replica.fetchers同樣設置為8

啟動28個壓測客戶端後,流量趨於穩定

1034, 1032, 1028]
rate is 3094 MB/sec
[1034, 1038, 1034]
rate is 3106 MB/sec
[1036, 1039, 1040]
rate is 3115 MB/sec

客戶端延遲1s+

426 records sent, 85.1 records/sec (156.14 MB/sec), 204.5 ms avg latency, 1791.0 ms max latency.

broker cpu使用率接近飽和

您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 如今,大規模、高時效、智能化數據處理已是“剛需”,企業需要更強大的數據平臺,來應對數據查詢、數據處理、數據挖掘、數據展示以及多種計算模型並行的挑戰,湖倉一體方案應運而生。 《實時湖倉實踐五講》是袋鼠雲打造的系列直播活動,將圍繞實時湖倉的建設趨勢和通用問題,邀請奮戰於企業數字化一線的核心產品&技術專家 ...
  • 隨著業務飛速發展,某汽車製造企業業務系統數量、複雜度和數據量都在呈幾何級數的上漲,這就對於企業IT能力和IT架構模式的要求越來越高。加之企業大力發展數字化營銷、新能源車等業務,希望通過持續優化客戶體驗,創造可持續發展的數字化轉型之路。 為更好應對數字化變革所帶來的挑戰,現有的豎井架構的數據體系難以滿 ...
  • 大家好,我是獨孤風,從本周開始,爭取每周為大家帶來一個優秀的開源項目推薦。 開源項目不僅促進了技術的發展和普及,還為全球範圍內的開發者和用戶社區建立了一個共用知識、協作和創新的平臺。站在巨人的肩膀上才能看的更遠,我們平時也應該多多關註開源項目,不僅學習其豐富的知識,也要找機會為開源事業做出自己的貢獻 ...
  • 蘋果在 iPhone 14 Pro 及 iPhone 14 Pro MAX 上推出了靈動島,是一次交互玩法的革新。本文從靈動島的展現形式、場景限制、適配情況和遠程通知更新數據幾個方面全面帶你走進靈動島 ...
  • 這裡給大家分享我在網上總結出來的一些知識,希望對大家有所幫助 背景 最近心血來潮,想要在本地開發時,也用CDN的方式引入 Vue,想著既然通過CDN引入了,那麼在項目中就沒必要再 import Vue,然後把項目中引入 Vue 的地方都刪掉,結果改完後,界面看似正常運行,但數據變更後,界面沒有重新渲 ...
  • 項目代碼同步至碼雲 weiz-vue3-template pina 是 vue3 官方推薦的狀態管理庫,由 Vue 核心團隊維護,旨在替代 vuex。pina 的更多介紹,可從 pina官網 查看 特點 更簡潔直接的 API,提供組合式風格的 API 支持模塊熱更新和服務端渲染 對TS支持更為友好 ...
  • 一、CSS簡介 CSS:層疊樣式表(英文全稱:Cascading Style Sheets):是一種用來表現HTML樣式的電腦語言。CSS不僅可以靜態地修飾網頁,還可以配合各種腳本語言動態地對網頁各元素進行格式化。 二、CSS選擇器 2.1基本選擇器(三種) 1.標簽選擇器 <style> p { ...
  • 前言 這是第三次博客作業,總結了近三次PTA大作業的完成情況,作業7、8次的大作業的小題目圍繞著HashMap、ArrayList和自定義介面來展開,大題目則是課程成績程式的第二次第三次迭代,因為第一次課程成績的程式寫的結構不太好,於是重新寫的,第三次迭代並沒有拿到滿分,後面也沒有時間改了。期末考試 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...