Kafka之Producer網路傳輸

来源:https://www.cnblogs.com/xijiu/p/18090940
-Advertisement-
Play Games

零售商家為什麼要建設線上商城? 傳統的實體門店服務範圍有限,只能吸引周邊500米以內的消費者。因此,如何拓展服務範圍,吸引更多的消費者到店,成為了店家迫切需要解決的問題。 缺乏忠實顧客,客戶基礎不穩,往往是一次性購物,門店無法形成有效的顧客迴流。在當前的市場環境下,構建並維護粉絲群體,成為了商家的核 ...


一、背景

在Kafka的組成部分(Broker、Consumer、Producer)中,設計理念迥異,每個部分都有自己獨特的思考。而把這些部分有機地組織起來,使其成為一個整體的便是「網路傳輸」。區別於其他消息隊列的方式(RocketMQ處理網路部分直接使用成熟的組件Netty),Kafka則是直接對java的NIO進行了二次包裝,從而實現了高效的傳輸

然而處理網路相關的工作是非常複雜的,本文我們只聚焦於網路傳輸的Producer端,而Producer端也只聚焦在消息發送的部分,使用場景帶入的方式來分析一下消息發送的環節,Producer是如何將其扔給網路併發送出去的

二、概述

我們首先回想一下Producer消息發送的整體流程

  1. 客戶端線程會不斷地寫入數據,當前線程並不會阻塞,而是馬上返回。這個時候消息被Producer放在了緩存內,消息並沒有真正發送出去
  2. Producer內部為每個Partition維護了一個RecordBatch的隊列,先進先出的模式,統稱為RecordAccumulator,數據第一步會先放在這個組件中。放在這個組件中的數據什麼時候會真正發送出去呢?這裡其實是“大巴車”邏輯(大巴車邏輯是指:1、如果某輛大巴車已經坐滿人了,這個時候無條件立即發車。2、雖然本輛大巴車沒有坐滿人,但是車已經在原地等待了1個小時,即便是只有1位乘客,也要立即發車),以下2個條件滿足其一即可:
    1. RecordBatch已滿,無法寫入新數據
    2. RecordBatch雖然還未滿,但是已超時
  1. 為了給RecordAccumulator提速,又引申出了MemoryPool的概念,它主要的作用是一次性分配了一塊記憶體池,避免每次RecordBatch新建時,臨時開闢記憶體空間
    1. 為什麼開闢記憶體空間會很慢呢? 單純分配一個連續的記憶體空間不會有太多的耗時,這裡主要是JDK為ByteBuffer的置0操作,byte[]數組開闢空間耗時也是同理。例如DirectByteBuffer.java的構造函數是這樣實現的 unsafe.setMemory(base, size, (byte) 0);
  1. 以上所有部分均不涉及網路相關操作,真正網路發送/接收的邏輯是放在了Sender.java線程中。Sender線程會不斷地從RecordAccumulator中拉取滿足條件的消息記錄,從而與Broker建聯併發送

以上是Producer發送消息的主流程,本文將會聚焦在Sender網路線程及其相關的邏輯

三、Java NIO

因為Kafka的網路實現是對Java NIO的二次封裝,因此在真正開始分析之前,我們有必要先對NIO做個簡單回顧

3.1、Server端

public void startServer() throws IOException {
    // Selector選擇器,NIO中的核心組件,也就是用它來監聽所有的網路事件
    selector = Selector.open();
    // 打開服務端的ServerSocketChannel,只有服務端需要打開ServerSocket
    server = ServerSocketChannel.open();
    // 為服務端綁定埠,之後所有的client均連接次埠
    server.bind(new InetSocketAddress(PORT));
    // 這裡配置為非阻塞
    server.configureBlocking(false);
    // 主要是將ServerSocketChannel與selector進行綁定
    server.register(selector, SelectionKey.OP_ACCEPT);

    SocketChannel client;
    SelectionKey key;
    Iterator<SelectionKey> keyIterator;

    while (true) {
        // 調用select()方法,如果有新的請求,則馬上返回,否則最多阻塞100ms
        int select = selector.select(100);
        if (select == 0) {
            continue;
        }

        // 將所有的事件拿出來,準備迭代
        keyIterator = selector.selectedKeys().iterator();
        while (keyIterator.hasNext()) {
            key = keyIterator.next();
            if (key.isAcceptable()) {        
                // 處理新進來的鏈接請求
                operateConnectEvent(key);
            }
            if (key.isReadable()) {
                // 處理讀取事件的請求
                operateReadEvent(key);
            }
            // 事件處理完後,將其移除
            keyIterator.remove();               
        }
    }
}

想必大家對上面的demo不會太陌生,這就是JDK給我們提供的server端啟動埠監聽的經典demo,這裡需要註意的是:處理讀寫請求時,儘量開闢線程來非同步處理,避免影響selector監聽線程

3.2、Client端

public void startClient() throws IOException {
    // 客戶端打開自己的Selector
    selector = Selector.open();                           
    // 與Server端開始建聯
    socketChannel = SocketChannel.open(new InetSocketAddress(port));
    // 設置非阻塞
    socketChannel.configureBlocking(false);
    // 註冊讀時間到selector上,這樣通過selector就可以監聽網路讀取的事件了
    SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
    // attach方法並沒有實質的含義,一般是將自己業務中的輔助類綁定在key上,方便後續的讀取及處理
    selectionKey.attach(null);

    Iterator<SelectionKey> ikeys;
    SelectionKey key;
    SocketChannel client;
    try {
        while (flag) {
            // 調用此方法一直阻塞,直到有channel可用
            selector.select();   
            ikeys = selector.selectedKeys().iterator();
            while (ikeys.hasNext()) {
                key = ikeys.next();
                if (key.isReadable()) {    
                    // 處理讀事件,一般開多線程處理
                    handleReadEvent();
                }
                ikeys.remove();
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

主要思想也是通過selector監聽不同的事件,一旦獲取到事件,便開線程非同步處理,不阻塞selector,從而達到高效通信的目的

Server端+Client端不足百行的demo代碼,會被Kafka如何包裝呢?

四、Sender線程概述

我們在具體介紹Sender線程之前,首先對其有個全貌的認知,也就是先整體後細節。而本節就是一個整體認知,雖然很多細節不會展開,但是對網路線程的理解至關重要

4.1、單網路線程

Sender線程是在KafkaProducer對象初始化啟動的,啟動後便是一個無限迴圈的調用。run()方法如下

@Override
public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    // main loop, runs until close is called
    while (running) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    ....  // 此處是一些優雅關機的代碼
}

由此我們可以得出結論

  • Sender線程是一個單線程,一個KafkaProducer只會對應一個網路線程
  • Sender伴隨Producer的啟動而啟動,同時也伴隨Producer的消亡而消亡

其實這也就解釋了當我們啟動一個Producer進行壓測的時候,通常不能把帶寬打滿。因為網路線程所做的事兒,無非是將用戶生產的數據從JVM中拷貝至網卡,而這個動作又是典型的cpu密集型,單個線程確實無法提高資源利用率

而相比較Broker而言,Broker提供了更為豐富的線程數量配置策略,比如網路線程數的配置num.network.threads及IO線程數的配置num.io.threads等,為什麼Producer的網路線程要配置為單個呢?我個人認為主要是以下原因:

  • Producer客戶端的定位;雖然Producer與Broker都存在網路傳輸的功能,但是Producer終究只是一個客戶端,通常客戶端所在的機器配置是比不上Broker端的,因此單個網路線程足以滿足絕大部分場景的需求
  • 容易橫向擴展;我們可以通過簡單地啟動多個Producer從而實現性能的提升,而這個操作是輕量的
  • 開發的複雜性;即便是在單網路線程的case下,Producer需要處理數據收集、發送、接收、維護元數據等,複雜性已然不低,如果需要單KafkaProducer同時啟動多個網路線程的話,勢必帶來較大的複雜性

4.2、Sender概覽

由上節我們知道Sender線程一直在重覆調用runOnce()方法,這個方法又做了什麼操作呢?

void runOnce() {
    if (transactionManager != null) {......}

    long currentTimeMs = time.milliseconds();
    // 拉取已經準備好的數據
    long pollTimeout = sendProducerData(currentTimeMs);
    // 執行網路數據發送
    client.poll(pollTimeout, currentTimeMs);
}

簡單概括就是兩件事兒:

  1. 將RecordAccumulator中已經準備好的數據,進行位元組粒度的協議編碼,最終放入待發送區
  2. 真正執行NIO數據發送、接收響應

五、線程相關類

在NIO中很重要的2個概念是:Selector及SelectionKey,即一個是多路復用器,一個可以簡單認為是Channel。Kafka分別對這個概念進行了包裝,整體的類圖如下:

5.1、NetworkClient

顧名思義,這個類是為客戶端而服務的,它是所有客戶端訪問網路的總入口,他只實現了一個介面KafkaClient,也是KafkaClient的唯一實現類。介面有如下方法:

  • boolean isReady(Node node, long now);
  • boolean ready(Node node, long now);
  • long connectionDelay(Node node, long now);
  • long pollDelayMs(Node node, long now);
  • boolean connectionFailed(Node node);
  • AuthenticationException authenticationException(Node node);
  • void send(ClientRequest request, long now);
  • List<ClientResponse> poll(long timeout, long now);
  • void disconnect(String nodeId);
  • void close(String nodeId);
  • Node leastLoadedNode(long now);
  • int inFlightRequestCount();
  • boolean hasInFlightRequests();
  • int inFlightRequestCount(String nodeId);
  • boolean hasInFlightRequests(String nodeId);
  • boolean hasReadyNodes(long now);
  • void wakeup();
  • ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder,long createdTimeMs, boolean expectResponse);
  • ClientRequest newClientRequest(String nodeId,AbstractRequest.Builder<?> requestBuilder,long createdTimeMs,boolean expectResponse,int requestTimeoutMs,RequestCompletionHandler callback);
  • void initiateClose();
  • boolean active();

可以看到大部分都是與網路相關的,而裡面比較重要且高頻的方法不外乎以下兩個,也是本文著重展開介紹的2個方法

  • void send(ClientRequest request, long now);
  • List<ClientResponse> poll(long timeout, long now);

字面含義,即一個發送、一個接受,然而這裡封裝的這2個方法卻是更偏重業務語義

void send(ClientRequest request, long now);

  • 對消息進行協議編碼
  • 是將消息放入待發送區,但並不會真正出髮網絡請求

List<ClientResponse> poll(long timeout, long now);

而poll方法中則包含了真正網路的發送與接收,也就是只有poll方法會調用Java NIO的相關api介面

  • 真正提交網路發送
  • 處理髮送成功的請求列表
  • 處理接收到的response
  • connection相關處理
  • .......

後續的分析也主要圍繞這兩個入口方法展開

NetworkClient作為網路請求的入口類,很多關於底層網路的操作都是交由Selector來實現的,而NetworkClient則更多的將自己的職責放在了網路上層的建設上,諸如維護髮送列表、inFlightRequests等業務控制上

List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);

5.2、Selector

註意,與nio包中Selector不同的是,這裡的Selector的全路徑是org.apache.kafka.common.network.Selector,我們看一下它的主要成員變數,也大致能猜到它在網路請求中發揮哪些功能

private final java.nio.channels.Selector nioSelector;
private final Map<String, KafkaChannel> channels;
private final Set<KafkaChannel> explicitlyMutedChannels;
private final List<NetworkSend> completedSends;
private final LinkedHashMap<String, NetworkReceive> completedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
private final Map<String, KafkaChannel> closingChannels;
private Set<SelectionKey> keysWithBufferedRead;
private final Map<String, ChannelState> disconnected;
private final List<String> connected;
private final List<String> failedSends;
private final ChannelBuilder channelBuilder;
.........
  • 它持有nio的java.nio.channels.Selector引用,這樣可以直接進行一些底層的網路操作,比如真正與broker建聯
  • 所有已建聯的channel列表,這樣通過nodeId可以快速找到對應的KafkaChannel
  • 保存已完成發送的列表,當NetworkClient需要處理這些請求時,可以直接在這裡獲取到;註意這個list的實現類就是簡單的ArrayList,因為網路線程是單線程,所以不存在併發衝突的問題
  • 同樣存儲了接收請求列表LinkedHashMap<String, NetworkReceive>,不再展開
  • 以及存儲正在關閉的channel、已經關閉的Channel等
  • 。。。。。。

Selector相對比NetworkClient來說,它做了更貼合網路的操作,比如真正與broker建聯、維護Channel列表、處理超時時,關閉Channel等等

值得一提的是,Selector對於SocketChannel的配置,我們看一下關於配置的方法org.apache.kafka.common.network.Selector#configureSocketChannel

private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize)
        throws IOException {
    socketChannel.configureBlocking(false);
    Socket socket = socketChannel.socket();
    socket.setKeepAlive(true);
    if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setSendBufferSize(sendBufferSize);
    if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socket.setReceiveBufferSize(receiveBufferSize);
    socket.setTcpNoDelay(true);
}

有這樣幾個配置項:

  • socketChannel.configureBlocking(false); 配置為非阻塞
  • socket.setKeepAlive(true); 保持連接
  • socket.setSendBufferSize(sendBufferSize); 配置發送緩衝區,預設為128K
  • socket.setReceiveBufferSize(receiveBufferSize); 配置接收緩衝區,預設32K
  • socket.setTcpNoDelay(true); 非Delay

我們註意到,上面幾個配置都比較好理解,包括發送的緩衝區比接收緩衝區大了不少,因為Producer是典型的發送多,接收少的場景;而TcpNoDelay屬性為什麼要設置為true呢? 設置為false不是能提高網路性能嗎?

其實在TCP中有個Nagle演算法,這個演算法的目的是減少網路中小包的數量,通俗點來講就是將諸多的小包整合為一個大包進行傳輸,這樣做的好處是,提高網路利用率,使得整體發送速率更快;弊端是可能會增加某些包的延遲。那Kafka為什麼要禁用這個演算法呢?其實本質原因就是Kafka在Producer內部已經完成了消息攢批,這些待發送的包無非以下兩種狀態:

  • 攢批充足,預設是16K,包足夠大,不需要Nagle演算法的干預
  • 攢批不理想,待發送的數據可能很小;但是即便是數據很少,這些消息也在批次中停留了足夠久的時間,這個時候唯一要做的就是儘快把數據發出去,不要再幫我為了優化TCP而增加延遲了,否則超過Producer等待的最大時長,Producer會將本次請求標記超時。其次這種攢批不理想的場景,通常是Producer發送量很小,數據在時間軸上很稀疏,即便是啟用了Nagle演算法,大概率也不能產生TCP變大包的預期

總結一句話就是Producer的聚批做的已經足夠好了,Nagle演算法的介入只會帶來負優化

5.3、KafkaChannel

上文可知,Selector管理了所有的Channel,而KafkaChannel則是與某個broker連接的具體通道。Selector的各類網路請求也均交給KafkaChannel來執行。

需要說明的是,KafkaChannel其實是通道註冊的時候attach在SelectionKey上的,也就是KafkaChannel與SelectionKey是1對1的關係。以下是KafkaChannel所對外暴露的方法

除了網路相關的操作外,KafkaChannel還有一個很重要的特性,就是緩存當天通道的即將要發送的數據,以及接收到的數據

  • private NetworkSend send;
  • private NetworkReceive receive;

當然放在這裡的數據都是已經經過編解碼完畢的數據

小結:3個類的總代碼行數超過了3000行,是Producer發送消息的核心類,我們先對這3個類有個大致的概念,以及它們每個類主要實現的功能點,這樣非常有益於對整體的認知。至於Kafka將網路操作的相關操作抽象為這3個類是否合理,就仁者見仁智者見智了,其實這3個類是nio及Kafka自身業務的混合體,它的核心就是為Kafka的客戶端網路請求提供服務;筆者認為雖然這樣的設計並沒有把一些特性或者功能的界限畫的很清楚,但是整體運行Kafka的網路功能還是游刃有餘的

六、數據準備

單純地將數據放入聚合器Accumulator還不算是數據準備完成,消息發送需要將其轉換為面向位元組的網路協議的格式,才是真正具備待發送的前提

6.1、消息編碼

發送請求的編碼在類org.apache.kafka.common.protocol.SendBuilder中:

private static Send buildSend(
    Message header,
    short headerVersion,
    Message apiMessage,
    short apiVersion
) {
    ObjectSerializationCache serializationCache = new ObjectSerializationCache();

    MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
    header.addSize(messageSize, serializationCache, headerVersion);
    apiMessage.addSize(messageSize, serializationCache, apiVersion);

    SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
    builder.writeInt(messageSize.totalSize());
    header.write(builder, serializationCache, headerVersion);
    apiMessage.write(builder, serializationCache, apiVersion);

    return builder.build();
}

編碼分2部分,首先需要計算當前類型編碼占用空間的總大小,然後將內容進行逐一填充;這裡Kafka將各類消息定義了統一的介面 org.apache.kafka.common.protocol.Message,這樣每種類型的消息都寫好各自的編解碼協議即可,類似於命令模式,對各類編解碼進行瞭解耦,方便後續的擴展與維護。而Message介面的實現類居然有 357 個之多,可見其軟體設計的複雜度

如此之多的實現類,我們無法窮舉,因此以發送消息類型的Message為demo來闡述一下編碼的格式;任何消息類型的網路編碼格式均分為header與body兩部分,發送消息類型的header與body分別為

  • org.apache.kafka.common.message.RequestHeaderData
  • org.apache.kafka.common.message.ProduceRequestData

6.1.1、header編碼

首先看header部分的編碼

下麵分別對上述欄位進行簡要說明

  • api key
    • 協議的類型編碼,即每種類型的協議編碼均不同,例如消息發送-0,消息拉取-1,獲取位點-2,獲取metadata-3等等,2.8.2版本的kafka的協議類型已經到達了64種,具體可以查看枚舉類org.apache.kafka.common.message.ApiMessageType
  • api key version
    • 根據調整會不斷迭代升級,2.8.2版本已經升級到了9
  • correlation id
    • 相關性id,客戶端指定,從0開始累加,服務端返回時,也需要回應同樣的編號,這樣客戶端就可以將一次request跟response聯繫起來。這裡可以看到,相關性id的長度是4個位元組,難道不怕越界嗎?其實相關性id不需要保證唯一性,當達到了int最大值後,下一次從0繼續即可
  • client id length
    • 客戶端id其實是一個字元串,比如“producer-1”,因為是可變內容,因此照例使用length+content的方式進行存儲
  • client id content
    • 同上,真正存儲 client id 的內容
  • tagged field

另外我們還留意到某些欄位使用了varint,這個又是什麼鬼呢?簡單來說就是可變長度的int,還有varlong等類型。當我們想將一個欄位設置為int型,而大多數的情況下,它可能只占用了很小的空間,例如0、1、2等,這個時候可變長度就派上了用場,本文不再對此展開,相關的論文讀者可參考 http://code.google.com/apis/protocolbuffers/docs/encoding.html

Kafka中varint的實現:

public static void writeUnsignedVarint(int value, ByteBuffer buffer) {
    while ((value & 0xffffff80) != 0L) {
        byte b = (byte) ((value & 0x7f) | 0x80);
        buffer.put(b);
        value >>>= 7;
    }
    buffer.put((byte) value);
}

6.1.2、body編碼

消息發送對應的body編碼實現類是org.apache.kafka.common.message.ProduceRequestData

雖然body真正的內容會比header多很多,但是其編碼協議並不複雜,大部分欄位上圖均已說明。值得一提的是,Kafka協議為了壓縮協議體積,可謂“無所不用其極”,還有很多很多細節層面的優化,也正是由於這些一點一滴的積累,成就了Kafka在消息隊列中行業大佬的地位

6.2、TCP粘包/拆包

這裡沒有太多需要展開論述的,Kafka處理粘包/拆包的問題,採用的就是經典的length+content的方式;也就是首先寫入4個位元組的包長度,後面緊跟消息內容;而拆包時,先讀取4個位元組,繼而完整讀取後續的消息

6.3、零拷貝?

細心的讀者可能會發現,在構造Send類的時候,也就是數據編碼的時候org.apache.kafka.common.protocol.SendBuilder#buildSend,有這樣的代碼:

SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
builder.writeInt(messageSize.totalSize());
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);

感覺像是所有的協議層均用堆記憶體的ByteBuffer來構建,但是消息體本身卻讓它走零拷貝。消息發送也可以零拷貝嗎?消息本身不是已經進入到了JVM堆記憶體中了嗎?零拷貝又是如何實現呢

而且我們在類org.apache.kafka.common.message.ProduceRequestData.PartitionProduceData#addSize中也發現了操作零拷貝的代碼_size.addZeroCopyBytes(records.sizeInBytes())但是在真正寫入消息記錄的時候,卻只是將消息記錄放入了SendBuilder的成員變數中org.apache.kafka.common.protocol.SendBuilder#buffers。究竟是怎麼回事兒呢?

原來這裡鬧了烏龍,SendBuilder也僅是復用了org.apache.kafka.common.protocol.MessageSizeAccumulator類的功能,而這個類除了給Producer使用外,還會給消息拉取、同broker內同步流量等使用,在消費的場景確實用到了零拷貝的能力,而Producer卻是一定無法執行零拷貝的,因為消息體已經存在於了JVM堆中

6.4、放入待發送區

經過上述一大圈工作,Kafka將編碼好的消息封裝進入了org.apache.kafka.common.network.ByteBufferSend中,接下來就是放入網路待發送區了

其實放入待發送區就是將ByteBufferSend放入KafkaChannel的成員變數send中,org.apache.kafka.common.network.KafkaChannel#setSend

public void setSend(NetworkSend send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

因為馬上就要執行write操作了,因此開始監聽channel的OP_WRITE寫入事件

有個細節需要註意下,這裡會判斷send != null,這個網路send會有不等於null的時候嗎?這裡先賣個關子,後文會提及。

另外有同學說,“Producer有兩個隊列,一個攢批隊列,一個網路待發送的隊列”,我們通過讀源碼發現,這個說法肯定是有問題的,Sender網路線程只有一個,也不存在網路待發送隊列,而是每個Channel在同一時刻只會發送一個Send

七、執行發送

7.1、發送流程

真正執行發送的邏輯反而變得簡單,入口方法為org.apache.kafka.clients.NetworkClient#poll

最終會調用至KafkaChannel的write()方法

public long write() throws IOException {
    if (send == null)
        return 0;

    midWrite = true;
    return send.writeTo(transportLayer);
}

這裡簡單提一下介面org.apache.kafka.common.network.TransportLayer,這個介面的實現類有2個:PlaintextTransportLayer、SslTransportLayer,也就是Kafka抽象了一個TransportLayer層,通過對TransportLayer層的切換來實現網路加解密的動作,這樣把對SSL的操作放在了最底層,即便於維護,又不會對上層的代碼產生影響,的確是高

繼續往下追的話,我們在PlaintextTransportLayer發現了NIO的jdk調用

public int write(ByteBuffer[] srcs) throws IOException {
    return socketChannel.write(srcs);
}

至此寫入動作完結

7.2、只發送一次?

通常我們向網路或者文件發送ByteBuffer時,為了確保ByteBuffer中的內容全部發送完畢,會一直判斷ByteBuffer的remaining()已經為空,才停止發送,這樣能保證將ByteBuffer中的數據全部發送出去,如下:

while (byteBuffer.remaining() > 0) {
    socketChannel.write(byteBuffer);
}

但在真正執行發送的代碼中,我們註意到,調用網路請求發送的代碼中,只觸發了一次

public long writeTo(TransferableChannel channel) throws IOException {
    long written = channel.write(buffers);
    if (written < 0)
        throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
    remaining -= written;
    pending = channel.hasPendingWrites();
    return written;
}

再結合6.4小節中,如果發現send屬性不為空,則會拋出異常:

public void setSend(NetworkSend send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

拋出異常後,Producer就會把當前的Channel關閉。那我們能否得出這樣一個結論:一旦某個Channel中的數據不能一次性發送完畢,Kafka就會拋出異常,然後關閉當前Channel ?

答案是否定的,因為在數據準備準備階段,Kafka會判斷,如果當前的Channel中有待發送數據,則不會從Accumulator中拉取該Channel的數據,而在發送階段,會任然將未發送出去的數據再次執行發送

org.apache.kafka.clients.NetworkClient#isReady的方法的相關調用如下

// org.apache.kafka.clients.NetworkClient#isReady
@Override
public boolean isReady(Node node, long now) {
    return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
}

// org.apache.kafka.clients.NetworkClient#canSendRequest
private boolean canSendRequest(String node, long now) {
    return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
        inFlightRequests.canSendMore(node);
}

// org.apache.kafka.clients.InFlightRequests#canSendMore
public boolean canSendMore(String node) {
    Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
    return queue == null || queue.isEmpty() ||
           (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

// org.apache.kafka.common.network.ByteBufferSend#completed
public boolean completed() {
    return remaining <= 0 && !pending;
}

Kafka為什麼要這樣設計,一次性將數據發送出去不好嗎?一定要這樣費盡周章的各種判斷嗎?

這裡正印證了那句老話:細節之處見功力!通常情況下,當我們調用write()介面,向操作系統發送網路數據時,只要網路不太繁忙,一般都是能夠一次性將數據發完的,OS也會儘量將數據全部發送出去。而如果一次沒有將緩衝區的數據發送完畢,那很有可能印證出當前channel的網路壓力非常大,如果採用不發送完畢不罷休的邏輯,那很有可能當前Sender線程要在這裡“卡”很久,而Sender我們知道是單線程的,他是會向多個Broker也就是多個Channel發送數據的,我們不能因為一個Broker的網路阻塞,而影響了整個Producer的吞吐。當然未發送的數據也不是不管了,而是等到下一個周期再嘗試發送,這樣即便是某個Broker網路擁塞了,不會影響其他Broker的吞吐

Kafka在接受網路數據的時候,同樣也是採用“只接受一次”的策略

八、處理響應

處理響應的過程與發送數據一樣,入口均在org.apache.kafka.clients.NetworkClient#poll中,相比較發送而言,處理響應是一個相對輕量的操作,主要經歷以下幾個步驟:

  1. 接收網路read信號
    1. 這個的觸發代碼就是JDK的select,this.nioSelector.selectedKeys(),只要有read信號就會進來
  1. 讀取4位元組的header
    1. 有同學會說,4個位元組,還有可能讀不滿嗎?的確,因為雖然只讀4個位元組,但也是通過TCP打包進行傳輸,很有可能出現粘包的情況,只讀取到1、2個位元組。如果遇到讀不滿的情況,Producer會將已讀取的數據暫時放入ByteBuffer緩存中,等待下一次的Sender線程的輪循,直到讀取滿4個位元組
  1. 讀取消息體X位元組
    1. 消息體的讀取跟header一致,只不過讀取header的時候,明確知道是4個位元組,而讀取body的時候,是一個動態值,需要讀取從header獲取
  1. 將消息放入待處理列表
  2. 消息協議解碼
    1. 這裡正好是消息發送時的逆操作,進行協議解碼。需要註意,消息解碼分兩步,一個是header解碼,一個是body解碼
    2. 主要註意的是,header解碼的時候,會判斷response的correlationId與request的是否一致,如果發現匹配不上,將會拋出異常
    3. 解碼的入口方法為org.apache.kafka.common.requests.AbstractResponse#parseResponse(org.apache.kafka.common.protocol.ApiKeys, java.nio.ByteBuffer, short) 裡面根據請求類型的不同,分別做了對應的處理,此處不再贅述
  1. 執行回調函數
    1. Producer的org.apache.kafka.clients.NetworkClient#inFlightRequests成員變數存儲了所有已經發送給Broker但是還未收到響應的請求列表,這個列表預設大小是5,即最多同時允許5個請求沒有收到響應,且裡面存儲了回調函數org.apache.kafka.clients.NetworkClient.InFlightRequest#callback,方便收到響應後,觸發後續的業務邏輯,比如歸還MemoryPool等

九、總結

至此,一次網路的發送與接收便告一段落,不過需要指明的是,本文僅僅是闡述了消息發送時候,涉及網路相關部分的主流程,還有很多細節並沒有展開,比如Selector是如何關閉Channel的、Channel遇到不預期異常後如何觸發重連等。另外Producer還有很多請求類型,比如獲取meta數據、心跳、sync api version等,不過這些連接類型雖然與消息發送不一樣,但是網路流程均是相通的。網路策略是Kafka的基建,對網路部分有個整體的瞭解,有利於我們更快地學習、吸收Kafka

雖然 talk is cheap, show me the code,但我本人不喜歡在技術的文章中,黏貼大量的代碼段,因為要讀源碼的話,讀者直接在github上拉取對應版本代碼閱讀更方便。不過由於Kafka網路部分相對還是比較複雜的,也摻雜了很多業務處理的邏輯,本文還是黏貼了很多關鍵部分的代碼以及出處的method,所幸代碼篇幅都很短,希望不要給閱讀帶來不便

簡單總結一下Producer網路部分的特點

  • 非同步;非同步處理也是Producer的整體特點,Producer將網路線程與發送線程獨立開來,也就為消息攢批提供了底層支持;同時接收到消息響應後,非同步調用callback等,設計都非常合理
  • 細節;我理解Kafka的高吞吐並不是某個設計帶來的巨集利,而是對於諸多細節完美處理而實現的。例如屏蔽Nagle演算法、一次發送/接收、可變長度編碼、使用隊列緩存等等
  • 原生JDK NIO;Kafka對NIO的介面進行了二次封裝,不過封裝的很輕量,基本上均是使用JDK的介面。此外使用header+body的方式解決了TCP粘包/拆包的問題


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 原創研發flutter3+getX+mediaKit仿抖音app短視頻直播實戰Flutter-DouYin。 flutter3_dylive使用最新跨平臺技術flutter3.x+dart3+getx+get_storage+media_kit開發手機端仿抖音app小視頻直播實戰項目。實現了抖音全屏 ...
  • 一、Stack 1.概述 HarmonyOS中的層疊佈局Stack是一種可以將多個組件按照一定順序疊放的佈局。Stack佈局中的組件可以是任意類型的組件,且每個組件都可以設置在哪個位置疊放。在疊放時,後添加的組件會自動覆蓋前面添加的組件。 Stack佈局佈局中的每個子組件都可以設置偏移量、旋 ...
  • 一、是什麼 TCP/IP,傳輸控制協議/網際協議,是指能夠在多個不同網路間實現信息傳輸的協議簇 TCP(傳輸控制協議) 一種面向連接的、可靠的、基於位元組流的傳輸層通信協議 IP(網際協議) 用於封包交換數據網路的協議 TCP/IP協議不僅僅指的是TCP和IP兩個協議,而是指一個由FTP、SMTP、T ...
  • 前言 習慣了在 css 文件裡面編寫樣式,其實JavaScript 的 CSS對象模型也提供了強大的樣式操作能力, 那就隨文章一起看看,有多少能力是你不知道的吧。 樣式來源 客從八方來, 樣式呢, 樣式五方來。 chrome舊版本用戶自定義樣式目錄: %LocalAppData%/Google/Ch ...
  • VUE 腳手架 腳手架文件結構 ├── node_modules ├── public │ ├── favicon.ico: 頁簽圖標 │ └── index.html: 主頁面 ├── src │ ├── assets: 存放靜態資源 │ │ └── logo.png │ │── componen ...
  • 最近看到了許多關於 :has() 選擇器的知識點,在此總結下來。 MDN 對 :has() 選擇器 的解釋是這樣的: CSS 函數式偽類 :has() 表示一個元素,如果作為參數傳遞的任何相對選擇器在錨定到該元素時,至少匹配一個元素。這個偽類通過把可容錯相對選擇器列表作為參數,提供了一種針對引用元素 ...
  • 系統功能文檔是一種描述軟體系統功能和操作方式的文檔。它讓開發團隊、測試人員、項目管理者、客戶和最終用戶對系統行為有清晰、全面的瞭解。 通過ChatGPT,我們能讓編寫系統功能文檔的效率提升10倍以上。 用ChatGPT生成系統功能文檔 我們以線上商城系統為例,介紹如何使用ChatGPT幫我們完成系統 ...
  • isa 走點陣圖 在講 OC->Class 底層類結構之前,先看下下麵這張圖: 通過isa走點陣圖 得出的結論是: 1,類,父類,元類都包含了 isa, superclass 2,對象isa指向類對象,類對象的isa指向了元類,元類的 isa 指向了根元類,根元類 isa 指向自己 3,類的 super ...
一周排行
    -Advertisement-
    Play Games
  • 隨著Aspire發佈preview5的發佈,Microsoft.Extensions.ServiceDiscovery隨之更新, 服務註冊發現這個屬於老掉牙的話題解決什麼問題就不贅述了,這裡主要講講Microsoft.Extensions.ServiceDiscovery(preview5)以及如何 ...
  • 概述:通過使用`SemaphoreSlim`,可以簡單而有效地限制非同步HTTP請求的併發量,確保在任何給定時間內不超過20個網頁同時下載。`ParallelOptions`不適用於非同步操作,但可考慮使用`Parallel.ForEach`,儘管在非同步場景中謹慎使用。 對於併發非同步 I/O 操作的數量 ...
  • 1.Linux上安裝Docken 伺服器系統版本以及內核版本:cat /etc/redhat-release 查看伺服器內核版本:uname -r 安裝依賴包:yum install -y yum-utils device-mapper-persistent-data lvm2 設置阿裡雲鏡像源:y ...
  • 概述:WPF界面綁定和渲染大量數據可能導致性能問題。通過啟用UI虛擬化、非同步載入和數據分頁,可以有效提高界面響應性能。以下是簡單示例演示這些優化方法。 在WPF中,當你嘗試綁定和渲染大量的數據項時,性能問題可能出現。以下是一些可能導致性能慢的原因以及優化方法: UI 虛擬化: WPF提供了虛擬化技術 ...
  • 引言 上一章節介紹了 TDD 的三大法則,今天我們講一下在單元測試中模擬對象的使用。 Fake Fake - Fake 是一個通用術語,可用於描述 stub或 mock 對象。 它是 stub 還是 mock 取決於使用它的上下文。 也就是說,Fake 可以是 stub 或 mock Mock - ...
  • 為.net6在CentOS7上面做準備,先在vmware虛擬機安裝CentOS 7.9 新建CentOS764位的系統 因為CentOS8不更新了,所以安裝7;簡單就一筆帶過了 選擇下載好的操作系統的iso文件,下載地址https://mirrors.aliyun.com/centos/7.9.20 ...
  • 經過前面幾篇的學習,我們瞭解到指令的大概分類,如:參數載入指令,該載入指令以 Ld 開頭,將參數載入到棧中,以便於後續執行操作命令。參數存儲指令,其指令以 St 開頭,將棧中的數據,存儲到指定的變數中,以方便後續使用。創建實例指令,其指令以 New 開頭,用於在運行時動態生成並初始化對象。方法調用指... ...
  • LiteDB 是一個輕量級的嵌入式 NoSQL 資料庫,其設計理念與 MongoDB 類似,但它是完全使用 C# 開發的,因此與 C# 應用程式的集成非常順暢。與 SQLite 相比,LiteDB 提供了 NoSQL(即鍵值對)的數據存儲方式,並且是一個開源且免費的項目。它適用於桌面、移動以及 We ...
  • 1 開源解析和拆分文檔 第三方的工具去對文件解析拆分,去將我們的文件內容給提取出來,並將我們的文檔內容去拆分成一個小的chunk。常見的PDF word mark down, JSON、HTML。都可以有很好的一些模塊去把這些文件去進行一個東西去提取。 優勢 支持豐富的文檔類型 每種文檔多樣化選擇 ...
  • OOM是什麼?英文全稱為 OutOfMemoryError(記憶體溢出錯誤)。當程式發生OOM時,如何去定位導致異常的代碼還是挺麻煩的。 要檢查OOM發生的原因,首先需要瞭解各種OOM情況下會報的異常信息。這樣能縮小排查範圍,再結合異常堆棧、heapDump文件、JVM分析工具和業務代碼來判斷具體是哪 ...