零售商家為什麼要建設線上商城? 傳統的實體門店服務範圍有限,只能吸引周邊500米以內的消費者。因此,如何拓展服務範圍,吸引更多的消費者到店,成為了店家迫切需要解決的問題。 缺乏忠實顧客,客戶基礎不穩,往往是一次性購物,門店無法形成有效的顧客迴流。在當前的市場環境下,構建並維護粉絲群體,成為了商家的核 ...
一、背景
在Kafka的組成部分(Broker、Consumer、Producer)中,設計理念迥異,每個部分都有自己獨特的思考。而把這些部分有機地組織起來,使其成為一個整體的便是「網路傳輸」。區別於其他消息隊列的方式(RocketMQ處理網路部分直接使用成熟的組件Netty),Kafka則是直接對java的NIO進行了二次包裝,從而實現了高效的傳輸
然而處理網路相關的工作是非常複雜的,本文我們只聚焦於網路傳輸的Producer端,而Producer端也只聚焦在消息發送的部分,使用場景帶入的方式來分析一下消息發送的環節,Producer是如何將其扔給網路併發送出去的
二、概述
我們首先回想一下Producer消息發送的整體流程
- 客戶端線程會不斷地寫入數據,當前線程並不會阻塞,而是馬上返回。這個時候消息被Producer放在了緩存內,消息並沒有真正發送出去
- Producer內部為每個Partition維護了一個RecordBatch的隊列,先進先出的模式,統稱為RecordAccumulator,數據第一步會先放在這個組件中。放在這個組件中的數據什麼時候會真正發送出去呢?這裡其實是“大巴車”邏輯(大巴車邏輯是指:1、如果某輛大巴車已經坐滿人了,這個時候無條件立即發車。2、雖然本輛大巴車沒有坐滿人,但是車已經在原地等待了1個小時,即便是只有1位乘客,也要立即發車),以下2個條件滿足其一即可:
- RecordBatch已滿,無法寫入新數據
- RecordBatch雖然還未滿,但是已超時
- 為了給RecordAccumulator提速,又引申出了MemoryPool的概念,它主要的作用是一次性分配了一塊記憶體池,避免每次RecordBatch新建時,臨時開闢記憶體空間
- 為什麼開闢記憶體空間會很慢呢? 單純分配一個連續的記憶體空間不會有太多的耗時,這裡主要是JDK為ByteBuffer的置0操作,byte[]數組開闢空間耗時也是同理。例如DirectByteBuffer.java的構造函數是這樣實現的
unsafe.setMemory(base, size, (byte) 0);
- 以上所有部分均不涉及網路相關操作,真正網路發送/接收的邏輯是放在了Sender.java線程中。Sender線程會不斷地從RecordAccumulator中拉取滿足條件的消息記錄,從而與Broker建聯併發送
以上是Producer發送消息的主流程,本文將會聚焦在Sender網路線程及其相關的邏輯
三、Java NIO
因為Kafka的網路實現是對Java NIO的二次封裝,因此在真正開始分析之前,我們有必要先對NIO做個簡單回顧
3.1、Server端
public void startServer() throws IOException {
// Selector選擇器,NIO中的核心組件,也就是用它來監聽所有的網路事件
selector = Selector.open();
// 打開服務端的ServerSocketChannel,只有服務端需要打開ServerSocket
server = ServerSocketChannel.open();
// 為服務端綁定埠,之後所有的client均連接次埠
server.bind(new InetSocketAddress(PORT));
// 這裡配置為非阻塞
server.configureBlocking(false);
// 主要是將ServerSocketChannel與selector進行綁定
server.register(selector, SelectionKey.OP_ACCEPT);
SocketChannel client;
SelectionKey key;
Iterator<SelectionKey> keyIterator;
while (true) {
// 調用select()方法,如果有新的請求,則馬上返回,否則最多阻塞100ms
int select = selector.select(100);
if (select == 0) {
continue;
}
// 將所有的事件拿出來,準備迭代
keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
key = keyIterator.next();
if (key.isAcceptable()) {
// 處理新進來的鏈接請求
operateConnectEvent(key);
}
if (key.isReadable()) {
// 處理讀取事件的請求
operateReadEvent(key);
}
// 事件處理完後,將其移除
keyIterator.remove();
}
}
}
想必大家對上面的demo不會太陌生,這就是JDK給我們提供的server端啟動埠監聽的經典demo,這裡需要註意的是:處理讀寫請求時,儘量開闢線程來非同步處理,避免影響selector監聽線程
3.2、Client端
public void startClient() throws IOException {
// 客戶端打開自己的Selector
selector = Selector.open();
// 與Server端開始建聯
socketChannel = SocketChannel.open(new InetSocketAddress(port));
// 設置非阻塞
socketChannel.configureBlocking(false);
// 註冊讀時間到selector上,這樣通過selector就可以監聽網路讀取的事件了
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
// attach方法並沒有實質的含義,一般是將自己業務中的輔助類綁定在key上,方便後續的讀取及處理
selectionKey.attach(null);
Iterator<SelectionKey> ikeys;
SelectionKey key;
SocketChannel client;
try {
while (flag) {
// 調用此方法一直阻塞,直到有channel可用
selector.select();
ikeys = selector.selectedKeys().iterator();
while (ikeys.hasNext()) {
key = ikeys.next();
if (key.isReadable()) {
// 處理讀事件,一般開多線程處理
handleReadEvent();
}
ikeys.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
主要思想也是通過selector監聽不同的事件,一旦獲取到事件,便開線程非同步處理,不阻塞selector,從而達到高效通信的目的
Server端+Client端不足百行的demo代碼,會被Kafka如何包裝呢?
四、Sender線程概述
我們在具體介紹Sender線程之前,首先對其有個全貌的認知,也就是先整體後細節。而本節就是一個整體認知,雖然很多細節不會展開,但是對網路線程的理解至關重要
4.1、單網路線程
Sender線程是在KafkaProducer對象初始化啟動的,啟動後便是一個無限迴圈的調用。run()方法如下
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
.... // 此處是一些優雅關機的代碼
}
由此我們可以得出結論
- Sender線程是一個單線程,一個KafkaProducer只會對應一個網路線程
- Sender伴隨Producer的啟動而啟動,同時也伴隨Producer的消亡而消亡
其實這也就解釋了當我們啟動一個Producer進行壓測的時候,通常不能把帶寬打滿。因為網路線程所做的事兒,無非是將用戶生產的數據從JVM中拷貝至網卡,而這個動作又是典型的cpu密集型,單個線程確實無法提高資源利用率
而相比較Broker而言,Broker提供了更為豐富的線程數量配置策略,比如網路線程數的配置num.network.threads及IO線程數的配置num.io.threads等,為什麼Producer的網路線程要配置為單個呢?我個人認為主要是以下原因:
- Producer客戶端的定位;雖然Producer與Broker都存在網路傳輸的功能,但是Producer終究只是一個客戶端,通常客戶端所在的機器配置是比不上Broker端的,因此單個網路線程足以滿足絕大部分場景的需求
- 容易橫向擴展;我們可以通過簡單地啟動多個Producer從而實現性能的提升,而這個操作是輕量的
- 開發的複雜性;即便是在單網路線程的case下,Producer需要處理數據收集、發送、接收、維護元數據等,複雜性已然不低,如果需要單KafkaProducer同時啟動多個網路線程的話,勢必帶來較大的複雜性
4.2、Sender概覽
由上節我們知道Sender線程一直在重覆調用runOnce()方法,這個方法又做了什麼操作呢?
void runOnce() {
if (transactionManager != null) {......}
long currentTimeMs = time.milliseconds();
// 拉取已經準備好的數據
long pollTimeout = sendProducerData(currentTimeMs);
// 執行網路數據發送
client.poll(pollTimeout, currentTimeMs);
}
簡單概括就是兩件事兒:
- 將RecordAccumulator中已經準備好的數據,進行位元組粒度的協議編碼,最終放入待發送區
- 真正執行NIO數據發送、接收響應
五、線程相關類
在NIO中很重要的2個概念是:Selector及SelectionKey,即一個是多路復用器,一個可以簡單認為是Channel。Kafka分別對這個概念進行了包裝,整體的類圖如下:
5.1、NetworkClient
顧名思義,這個類是為客戶端而服務的,它是所有客戶端訪問網路的總入口,他只實現了一個介面KafkaClient,也是KafkaClient的唯一實現類。介面有如下方法:
- boolean isReady(Node node, long now);
- boolean ready(Node node, long now);
- long connectionDelay(Node node, long now);
- long pollDelayMs(Node node, long now);
- boolean connectionFailed(Node node);
- AuthenticationException authenticationException(Node node);
- void send(ClientRequest request, long now);
- List<ClientResponse> poll(long timeout, long now);
- void disconnect(String nodeId);
- void close(String nodeId);
- Node leastLoadedNode(long now);
- int inFlightRequestCount();
- boolean hasInFlightRequests();
- int inFlightRequestCount(String nodeId);
- boolean hasInFlightRequests(String nodeId);
- boolean hasReadyNodes(long now);
- void wakeup();
- ClientRequest newClientRequest(String nodeId, AbstractRequest.Builder<?> requestBuilder,long createdTimeMs, boolean expectResponse);
- ClientRequest newClientRequest(String nodeId,AbstractRequest.Builder<?> requestBuilder,long createdTimeMs,boolean expectResponse,int requestTimeoutMs,RequestCompletionHandler callback);
- void initiateClose();
- boolean active();
可以看到大部分都是與網路相關的,而裡面比較重要且高頻的方法不外乎以下兩個,也是本文著重展開介紹的2個方法
- void send(ClientRequest request, long now);
- List<ClientResponse> poll(long timeout, long now);
字面含義,即一個發送、一個接受,然而這裡封裝的這2個方法卻是更偏重業務語義
void send(ClientRequest request, long now); |
|
List<ClientResponse> poll(long timeout, long now); |
而poll方法中則包含了真正網路的發送與接收,也就是只有poll方法會調用Java NIO的相關api介面
|
後續的分析也主要圍繞這兩個入口方法展開
NetworkClient作為網路請求的入口類,很多關於底層網路的操作都是交由Selector來實現的,而NetworkClient則更多的將自己的職責放在了網路上層的建設上,諸如維護髮送列表、inFlightRequests等業務控制上
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
5.2、Selector
註意,與nio包中Selector不同的是,這裡的Selector的全路徑是org.apache.kafka.common.network.Selector
,我們看一下它的主要成員變數,也大致能猜到它在網路請求中發揮哪些功能
private final java.nio.channels.Selector nioSelector;
private final Map<String, KafkaChannel> channels;
private final Set<KafkaChannel> explicitlyMutedChannels;
private final List<NetworkSend> completedSends;
private final LinkedHashMap<String, NetworkReceive> completedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
private final Map<String, KafkaChannel> closingChannels;
private Set<SelectionKey> keysWithBufferedRead;
private final Map<String, ChannelState> disconnected;
private final List<String> connected;
private final List<String> failedSends;
private final ChannelBuilder channelBuilder;
.........
- 它持有nio的java.nio.channels.Selector引用,這樣可以直接進行一些底層的網路操作,比如真正與broker建聯
- 所有已建聯的channel列表,這樣通過nodeId可以快速找到對應的KafkaChannel
- 保存已完成發送的列表,當NetworkClient需要處理這些請求時,可以直接在這裡獲取到;註意這個list的實現類就是簡單的ArrayList,因為網路線程是單線程,所以不存在併發衝突的問題
- 同樣存儲了接收請求列表LinkedHashMap<String, NetworkReceive>,不再展開
- 以及存儲正在關閉的channel、已經關閉的Channel等
- 。。。。。。
Selector相對比NetworkClient來說,它做了更貼合網路的操作,比如真正與broker建聯、維護Channel列表、處理超時時,關閉Channel等等
值得一提的是,Selector對於SocketChannel的配置,我們看一下關於配置的方法org.apache.kafka.common.network.Selector#configureSocketChannel
private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize)
throws IOException {
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
}
有這樣幾個配置項:
- socketChannel.configureBlocking(false); 配置為非阻塞
- socket.setKeepAlive(true); 保持連接
- socket.setSendBufferSize(sendBufferSize); 配置發送緩衝區,預設為128K
- socket.setReceiveBufferSize(receiveBufferSize); 配置接收緩衝區,預設32K
- socket.setTcpNoDelay(true); 非Delay
我們註意到,上面幾個配置都比較好理解,包括發送的緩衝區比接收緩衝區大了不少,因為Producer是典型的發送多,接收少的場景;而TcpNoDelay屬性為什麼要設置為true呢? 設置為false不是能提高網路性能嗎?
其實在TCP中有個Nagle演算法,這個演算法的目的是減少網路中小包的數量,通俗點來講就是將諸多的小包整合為一個大包進行傳輸,這樣做的好處是,提高網路利用率,使得整體發送速率更快;弊端是可能會增加某些包的延遲。那Kafka為什麼要禁用這個演算法呢?其實本質原因就是Kafka在Producer內部已經完成了消息攢批,這些待發送的包無非以下兩種狀態:
- 攢批充足,預設是16K,包足夠大,不需要Nagle演算法的干預
- 攢批不理想,待發送的數據可能很小;但是即便是數據很少,這些消息也在批次中停留了足夠久的時間,這個時候唯一要做的就是儘快把數據發出去,不要再幫我為了優化TCP而增加延遲了,否則超過Producer等待的最大時長,Producer會將本次請求標記超時。其次這種攢批不理想的場景,通常是Producer發送量很小,數據在時間軸上很稀疏,即便是啟用了Nagle演算法,大概率也不能產生TCP變大包的預期
總結一句話就是Producer的聚批做的已經足夠好了,Nagle演算法的介入只會帶來負優化
5.3、KafkaChannel
上文可知,Selector管理了所有的Channel,而KafkaChannel則是與某個broker連接的具體通道。Selector的各類網路請求也均交給KafkaChannel來執行。
需要說明的是,KafkaChannel其實是通道註冊的時候attach在SelectionKey上的,也就是KafkaChannel與SelectionKey是1對1的關係。以下是KafkaChannel所對外暴露的方法
除了網路相關的操作外,KafkaChannel還有一個很重要的特性,就是緩存當天通道的即將要發送的數據,以及接收到的數據
- private NetworkSend send;
- private NetworkReceive receive;
當然放在這裡的數據都是已經經過編解碼完畢的數據
小結:3個類的總代碼行數超過了3000行,是Producer發送消息的核心類,我們先對這3個類有個大致的概念,以及它們每個類主要實現的功能點,這樣非常有益於對整體的認知。至於Kafka將網路操作的相關操作抽象為這3個類是否合理,就仁者見仁智者見智了,其實這3個類是nio及Kafka自身業務的混合體,它的核心就是為Kafka的客戶端網路請求提供服務;筆者認為雖然這樣的設計並沒有把一些特性或者功能的界限畫的很清楚,但是整體運行Kafka的網路功能還是游刃有餘的
六、數據準備
單純地將數據放入聚合器Accumulator還不算是數據準備完成,消息發送需要將其轉換為面向位元組的網路協議的格式,才是真正具備待發送的前提
6.1、消息編碼
發送請求的編碼在類org.apache.kafka.common.protocol.SendBuilder中:
private static Send buildSend(
Message header,
short headerVersion,
Message apiMessage,
short apiVersion
) {
ObjectSerializationCache serializationCache = new ObjectSerializationCache();
MessageSizeAccumulator messageSize = new MessageSizeAccumulator();
header.addSize(messageSize, serializationCache, headerVersion);
apiMessage.addSize(messageSize, serializationCache, apiVersion);
SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
builder.writeInt(messageSize.totalSize());
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);
return builder.build();
}
編碼分2部分,首先需要計算當前類型編碼占用空間的總大小,然後將內容進行逐一填充;這裡Kafka將各類消息定義了統一的介面 org.apache.kafka.common.protocol.Message,這樣每種類型的消息都寫好各自的編解碼協議即可,類似於命令模式,對各類編解碼進行瞭解耦,方便後續的擴展與維護。而Message介面的實現類居然有 357 個之多,可見其軟體設計的複雜度
如此之多的實現類,我們無法窮舉,因此以發送消息類型的Message為demo來闡述一下編碼的格式;任何消息類型的網路編碼格式均分為header與body兩部分,發送消息類型的header與body分別為
org.apache.kafka.common.message.RequestHeaderData
org.apache.kafka.common.message.ProduceRequestData
6.1.1、header編碼
首先看header部分的編碼
下麵分別對上述欄位進行簡要說明
- api key
- 協議的類型編碼,即每種類型的協議編碼均不同,例如消息發送-0,消息拉取-1,獲取位點-2,獲取metadata-3等等,2.8.2版本的kafka的協議類型已經到達了64種,具體可以查看枚舉類
org.apache.kafka.common.message.ApiMessageType
- api key version
- 根據調整會不斷迭代升級,2.8.2版本已經升級到了9
- correlation id
- 相關性id,客戶端指定,從0開始累加,服務端返回時,也需要回應同樣的編號,這樣客戶端就可以將一次request跟response聯繫起來。這裡可以看到,相關性id的長度是4個位元組,難道不怕越界嗎?其實相關性id不需要保證唯一性,當達到了int最大值後,下一次從0繼續即可
- client id length
- 客戶端id其實是一個字元串,比如“producer-1”,因為是可變內容,因此照例使用length+content的方式進行存儲
- client id content
- 同上,真正存儲 client id 的內容
- tagged field
- 標記位,一般情況下,這個屬性通常是empty的,主要是為了提供RPC協議的靈活性,例如一些可有可無的欄位,在沒有這個欄位時如何節省傳輸帶寬;另外協議傳輸的擴展性也可以依賴這個欄位,例如我們想在一次消息發送的時候,順便將traceID也帶上,等等
- 背景可以參考 KIP-482 https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
另外我們還留意到某些欄位使用了varint,這個又是什麼鬼呢?簡單來說就是可變長度的int,還有varlong等類型。當我們想將一個欄位設置為int型,而大多數的情況下,它可能只占用了很小的空間,例如0、1、2等,這個時候可變長度就派上了用場,本文不再對此展開,相關的論文讀者可參考 http://code.google.com/apis/protocolbuffers/docs/encoding.html
Kafka中varint的實現:
public static void writeUnsignedVarint(int value, ByteBuffer buffer) {
while ((value & 0xffffff80) != 0L) {
byte b = (byte) ((value & 0x7f) | 0x80);
buffer.put(b);
value >>>= 7;
}
buffer.put((byte) value);
}
6.1.2、body編碼
消息發送對應的body編碼實現類是org.apache.kafka.common.message.ProduceRequestData
雖然body真正的內容會比header多很多,但是其編碼協議並不複雜,大部分欄位上圖均已說明。值得一提的是,Kafka協議為了壓縮協議體積,可謂“無所不用其極”,還有很多很多細節層面的優化,也正是由於這些一點一滴的積累,成就了Kafka在消息隊列中行業大佬的地位
6.2、TCP粘包/拆包
這裡沒有太多需要展開論述的,Kafka處理粘包/拆包的問題,採用的就是經典的length+content的方式;也就是首先寫入4個位元組的包長度,後面緊跟消息內容;而拆包時,先讀取4個位元組,繼而完整讀取後續的消息
6.3、零拷貝?
細心的讀者可能會發現,在構造Send類的時候,也就是數據編碼的時候org.apache.kafka.common.protocol.SendBuilder#buildSend
,有這樣的代碼:
SendBuilder builder = new SendBuilder(messageSize.sizeExcludingZeroCopy() + 4);
builder.writeInt(messageSize.totalSize());
header.write(builder, serializationCache, headerVersion);
apiMessage.write(builder, serializationCache, apiVersion);
感覺像是所有的協議層均用堆記憶體的ByteBuffer來構建,但是消息體本身卻讓它走零拷貝。消息發送也可以零拷貝嗎?消息本身不是已經進入到了JVM堆記憶體中了嗎?零拷貝又是如何實現呢
而且我們在類org.apache.kafka.common.message.ProduceRequestData.PartitionProduceData#addSize
中也發現了操作零拷貝的代碼_size.addZeroCopyBytes(records.sizeInBytes())
,但是在真正寫入消息記錄的時候,卻只是將消息記錄放入了SendBuilder的成員變數中org.apache.kafka.common.protocol.SendBuilder#buffers
。究竟是怎麼回事兒呢?
原來這裡鬧了烏龍,SendBuilder也僅是復用了org.apache.kafka.common.protocol.MessageSizeAccumulator
類的功能,而這個類除了給Producer使用外,還會給消息拉取、同broker內同步流量等使用,在消費的場景確實用到了零拷貝的能力,而Producer卻是一定無法執行零拷貝的,因為消息體已經存在於了JVM堆中
6.4、放入待發送區
經過上述一大圈工作,Kafka將編碼好的消息封裝進入了org.apache.kafka.common.network.ByteBufferSend
中,接下來就是放入網路待發送區了
其實放入待發送區就是將ByteBufferSend放入KafkaChannel的成員變數send中,org.apache.kafka.common.network.KafkaChannel#setSend
public void setSend(NetworkSend send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
因為馬上就要執行write操作了,因此開始監聽channel的OP_WRITE寫入事件
有個細節需要註意下,這裡會判斷send != null
,這個網路send會有不等於null的時候嗎?這裡先賣個關子,後文會提及。
另外有同學說,“Producer有兩個隊列,一個攢批隊列,一個網路待發送的隊列”,我們通過讀源碼發現,這個說法肯定是有問題的,Sender網路線程只有一個,也不存在網路待發送隊列,而是每個Channel在同一時刻只會發送一個Send
七、執行發送
7.1、發送流程
真正執行發送的邏輯反而變得簡單,入口方法為org.apache.kafka.clients.NetworkClient#poll
最終會調用至KafkaChannel的write()方法
public long write() throws IOException {
if (send == null)
return 0;
midWrite = true;
return send.writeTo(transportLayer);
}
這裡簡單提一下介面org.apache.kafka.common.network.TransportLayer
,這個介面的實現類有2個:PlaintextTransportLayer、SslTransportLayer,也就是Kafka抽象了一個TransportLayer層,通過對TransportLayer層的切換來實現網路加解密的動作,這樣把對SSL的操作放在了最底層,即便於維護,又不會對上層的代碼產生影響,的確是高
繼續往下追的話,我們在PlaintextTransportLayer發現了NIO的jdk調用
public int write(ByteBuffer[] srcs) throws IOException {
return socketChannel.write(srcs);
}
至此寫入動作完結
7.2、只發送一次?
通常我們向網路或者文件發送ByteBuffer時,為了確保ByteBuffer中的內容全部發送完畢,會一直判斷ByteBuffer的remaining()已經為空,才停止發送,這樣能保證將ByteBuffer中的數據全部發送出去,如下:
while (byteBuffer.remaining() > 0) {
socketChannel.write(byteBuffer);
}
但在真正執行發送的代碼中,我們註意到,調用網路請求發送的代碼中,只觸發了一次
public long writeTo(TransferableChannel channel) throws IOException {
long written = channel.write(buffers);
if (written < 0)
throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
pending = channel.hasPendingWrites();
return written;
}
再結合6.4小節中,如果發現send屬性不為空,則會拋出異常:
public void setSend(NetworkSend send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
拋出異常後,Producer就會把當前的Channel關閉。那我們能否得出這樣一個結論:一旦某個Channel中的數據不能一次性發送完畢,Kafka就會拋出異常,然後關閉當前Channel ?
答案是否定的,因為在數據準備準備階段,Kafka會判斷,如果當前的Channel中有待發送數據,則不會從Accumulator中拉取該Channel的數據,而在發送階段,會任然將未發送出去的數據再次執行發送
org.apache.kafka.clients.NetworkClient#isReady
的方法的相關調用如下
// org.apache.kafka.clients.NetworkClient#isReady
@Override
public boolean isReady(Node node, long now) {
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
}
// org.apache.kafka.clients.NetworkClient#canSendRequest
private boolean canSendRequest(String node, long now) {
return connectionStates.isReady(node, now) && selector.isChannelReady(node) &&
inFlightRequests.canSendMore(node);
}
// org.apache.kafka.clients.InFlightRequests#canSendMore
public boolean canSendMore(String node) {
Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
// org.apache.kafka.common.network.ByteBufferSend#completed
public boolean completed() {
return remaining <= 0 && !pending;
}
Kafka為什麼要這樣設計,一次性將數據發送出去不好嗎?一定要這樣費盡周章的各種判斷嗎?
這裡正印證了那句老話:細節之處見功力!通常情況下,當我們調用write()介面,向操作系統發送網路數據時,只要網路不太繁忙,一般都是能夠一次性將數據發完的,OS也會儘量將數據全部發送出去。而如果一次沒有將緩衝區的數據發送完畢,那很有可能印證出當前channel的網路壓力非常大,如果採用不發送完畢不罷休的邏輯,那很有可能當前Sender線程要在這裡“卡”很久,而Sender我們知道是單線程的,他是會向多個Broker也就是多個Channel發送數據的,我們不能因為一個Broker的網路阻塞,而影響了整個Producer的吞吐。當然未發送的數據也不是不管了,而是等到下一個周期再嘗試發送,這樣即便是某個Broker網路擁塞了,不會影響其他Broker的吞吐
Kafka在接受網路數據的時候,同樣也是採用“只接受一次”的策略
八、處理響應
處理響應的過程與發送數據一樣,入口均在org.apache.kafka.clients.NetworkClient#poll
中,相比較發送而言,處理響應是一個相對輕量的操作,主要經歷以下幾個步驟:
- 接收網路read信號
- 這個的觸發代碼就是JDK的select,
this.nioSelector.selectedKeys()
,只要有read信號就會進來
- 這個的觸發代碼就是JDK的select,
- 讀取4位元組的header
- 有同學會說,4個位元組,還有可能讀不滿嗎?的確,因為雖然只讀4個位元組,但也是通過TCP打包進行傳輸,很有可能出現粘包的情況,只讀取到1、2個位元組。如果遇到讀不滿的情況,Producer會將已讀取的數據暫時放入ByteBuffer緩存中,等待下一次的Sender線程的輪循,直到讀取滿4個位元組
- 讀取消息體X位元組
- 消息體的讀取跟header一致,只不過讀取header的時候,明確知道是4個位元組,而讀取body的時候,是一個動態值,需要讀取從header獲取
- 將消息放入待處理列表
- 消息協議解碼
- 這裡正好是消息發送時的逆操作,進行協議解碼。需要註意,消息解碼分兩步,一個是header解碼,一個是body解碼
- 主要註意的是,header解碼的時候,會判斷response的correlationId與request的是否一致,如果發現匹配不上,將會拋出異常
- 解碼的入口方法為
org.apache.kafka.common.requests.AbstractResponse#parseResponse(org.apache.kafka.common.protocol.ApiKeys, java.nio.ByteBuffer, short)
裡面根據請求類型的不同,分別做了對應的處理,此處不再贅述
- 執行回調函數
- Producer的
org.apache.kafka.clients.NetworkClient#inFlightRequests
成員變數存儲了所有已經發送給Broker但是還未收到響應的請求列表,這個列表預設大小是5,即最多同時允許5個請求沒有收到響應,且裡面存儲了回調函數org.apache.kafka.clients.NetworkClient.InFlightRequest#callback
,方便收到響應後,觸發後續的業務邏輯,比如歸還MemoryPool等
九、總結
至此,一次網路的發送與接收便告一段落,不過需要指明的是,本文僅僅是闡述了消息發送時候,涉及網路相關部分的主流程,還有很多細節並沒有展開,比如Selector是如何關閉Channel的、Channel遇到不預期異常後如何觸發重連等。另外Producer還有很多請求類型,比如獲取meta數據、心跳、sync api version等,不過這些連接類型雖然與消息發送不一樣,但是網路流程均是相通的。網路策略是Kafka的基建,對網路部分有個整體的瞭解,有利於我們更快地學習、吸收Kafka
雖然 talk is cheap, show me the code,但我本人不喜歡在技術的文章中,黏貼大量的代碼段,因為要讀源碼的話,讀者直接在github上拉取對應版本代碼閱讀更方便。不過由於Kafka網路部分相對還是比較複雜的,也摻雜了很多業務處理的邏輯,本文還是黏貼了很多關鍵部分的代碼以及出處的method,所幸代碼篇幅都很短,希望不要給閱讀帶來不便
簡單總結一下Producer網路部分的特點
- 非同步;非同步處理也是Producer的整體特點,Producer將網路線程與發送線程獨立開來,也就為消息攢批提供了底層支持;同時接收到消息響應後,非同步調用callback等,設計都非常合理
- 細節;我理解Kafka的高吞吐並不是某個設計帶來的巨集利,而是對於諸多細節完美處理而實現的。例如屏蔽Nagle演算法、一次發送/接收、可變長度編碼、使用隊列緩存等等
- 原生JDK NIO;Kafka對NIO的介面進行了二次封裝,不過封裝的很輕量,基本上均是使用JDK的介面。此外使用header+body的方式解決了TCP粘包/拆包的問題