Kafka 服務端通過 的主函數 方法啟動。 類提供讀取配置文件、啟動/停止服務的方法。而啟動/停止服務最終調用的是 的`startup/shutdown`方法。 啟動流程 1. 啟動 zk 客戶端。 2. 啟動動態配置。 3. 啟動調度線程池。 4. 啟動日誌管理器的後臺線 ...
Kafka 服務端通過Kafka.scala
的主函數main
方法啟動。KafkaServerStartable
類提供讀取配置文件、啟動/停止服務的方法。而啟動/停止服務最終調用的是KafkaServer
的startup/shutdown
方法。
啟動流程
- 啟動 zk 客戶端。
- 啟動動態配置。
- 啟動調度線程池。
- 啟動日誌管理器的後臺線程,包括日誌清理、日誌刷盤、日誌刪除、日誌壓縮。
- 啟動 NIO Socket 服務。
- 初始化一個接收器
Acceptor
,即啟動 NIO Socket。 - 添加
num.network.threads
個接收器到請求通道RequestChannel
的處理器緩存ConcurrentHashMap
,key 為遞增編號,value 為處理器Processor
。 Acceptor
執行CountDownLatch.await
等待通知啟動。- 緩存
Acceptor
到ConcurrentHashMap
,key 為EndPoint
,value 為Acceptor
。
- 初始化一個接收器
- 啟動副本管理器。
- 在 zk 註冊 broker。
- 啟動控制器。
- 啟動組協調器。
- 啟動事務協調器。
- 初始化
KafkaApis
。 - 初始化處理器線程緩存池。
- 啟動
num.io.threads
個請求處理器線程KafkaRequestHandler
。 - 從阻塞隊列
ArrayBlockingQueue
獲取請求,調用KafkaApis.handle
方法,進行集中處理請求。
- 啟動
- 啟動處理器線程。
- 首先
CountDownLatch.countDown
通知喚醒Acceptor
線程。- 使用
NIO.select
輪詢。 - 如果有可接收就緒的事件,則將當前的
SocketChannel
加入緩存隊列ConcurrentLinkedQueue
- 使用
- 從上述緩存隊列取出
SocketChannel
,綁定到KafkaChannel
。 - 將接收到的請求緩存到限長阻塞隊列
ArrayBlockingQueue
- 首先
請求處理流程
詳細源碼分析
Acceptor 線程
def run() {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 註冊接收事件
startupComplete() // 通知 Acceptor 線程
var currentProcessor = 0
while (isRunning) {
val ready = nioSelector.select(500) // 輪詢事件
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
val key = iter.next
iter.remove()
if (key.isAcceptable) { // 有可接受事件
val processor = synchronized {
currentProcessor = currentProcessor % processors.size
processors(currentProcessor) // 緩存 Processor
}
accept(key, processor) // 將 SocketChannel 緩存到隊列
}
}
}
}
}
Processor 線程
override def run() {
startupComplete() // CountDownLatch.countDown 喚醒 Acceptor 線程。
while (isRunning) {
configureNewConnections() // 從緩存隊列取出 SocketChannel,綁定到 KafkaChannel
processNewResponses() // 處理返回客戶端的響應
poll() // Kafka.Selector 輪詢讀取/寫入事件
processCompletedReceives() // 處理客戶端的請求,放到阻塞隊列
processCompletedSends() // 處理返回客戶端響應後的回調
processDisconnected() // 斷開連接後的處理
}
}
KafkaRequestHandler 線程阻塞隊列
def run() {
while (!stopped) {
val startSelectTime = time.nanoseconds
// 從阻塞隊列拉取請求
val req = requestChannel.receiveRequest(300)
req match {
case request: RequestChannel.Request =>
try {
apis.handle(request) // 調用`KafkaApis.handle`方法,進行集中處理請求。
}
}
}
}
KSelector
參考客戶端源碼分析。