Kafka2.0服務端啟動源碼

来源:https://www.cnblogs.com/bigshark/archive/2019/07/18/11204428.html
-Advertisement-
Play Games

  Kafka 服務端通過 的主函數 方法啟動。 類提供讀取配置文件、啟動/停止服務的方法。而啟動/停止服務最終調用的是 的`startup/shutdown`方法。 啟動流程 1. 啟動 zk 客戶端。 2. 啟動動態配置。 3. 啟動調度線程池。 4. 啟動日誌管理器的後臺線 ...


  Kafka 服務端通過Kafka.scala的主函數main方法啟動。KafkaServerStartable類提供讀取配置文件、啟動/停止服務的方法。而啟動/停止服務最終調用的是KafkaServerstartup/shutdown方法。

啟動流程

  1. 啟動 zk 客戶端。
  2. 啟動動態配置。
  3. 啟動調度線程池。
  4. 啟動日誌管理器的後臺線程,包括日誌清理、日誌刷盤、日誌刪除、日誌壓縮。
  5. 啟動 NIO Socket 服務
    1. 初始化一個接收器Acceptor,即啟動 NIO Socket。
    2. 添加num.network.threads個接收器到請求通道RequestChannel的處理器緩存ConcurrentHashMap,key 為遞增編號,value 為處理器Processor
    3. Acceptor執行CountDownLatch.await等待通知啟動。
    4. 緩存AcceptorConcurrentHashMap,key 為EndPoint,value 為Acceptor
  6. 啟動副本管理器。
  7. 在 zk 註冊 broker。
  8. 啟動控制器。
  9. 啟動組協調器。
  10. 啟動事務協調器。
  11. 初始化KafkaApis
  12. 初始化處理器線程緩存池
    1. 啟動num.io.threads個請求處理器線程KafkaRequestHandler
    2. 從阻塞隊列ArrayBlockingQueue獲取請求,調用KafkaApis.handle方法,進行集中處理請求。
  13. 啟動處理器線程
    1. 首先CountDownLatch.countDown通知喚醒Acceptor線程。
      1. 使用NIO.select輪詢。
      2. 如果有可接收就緒的事件,則將當前的SocketChannel加入緩存隊列ConcurrentLinkedQueue
    2. 從上述緩存隊列取出SocketChannel,綁定到KafkaChannel
    3. 將接收到的請求緩存到限長阻塞隊列ArrayBlockingQueue

請求處理流程

服務端請求處理流程

詳細源碼分析

Acceptor 線程

def run() {
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 註冊接收事件
  startupComplete() // 通知 Acceptor 線程
  var currentProcessor = 0
  while (isRunning) {
    val ready = nioSelector.select(500) // 輪詢事件
    if (ready > 0) {
      val keys = nioSelector.selectedKeys()
      val iter = keys.iterator()
      while (iter.hasNext && isRunning) {
        val key = iter.next
        iter.remove()
        if (key.isAcceptable) { // 有可接受事件
          val processor = synchronized {
            currentProcessor = currentProcessor % processors.size
            processors(currentProcessor) // 緩存 Processor 
          }
          accept(key, processor) // 將 SocketChannel 緩存到隊列
        }
      }
    }
  }
}

Processor 線程

override def run() {
  startupComplete() // CountDownLatch.countDown 喚醒 Acceptor 線程。
  while (isRunning) {
    configureNewConnections() // 從緩存隊列取出 SocketChannel,綁定到 KafkaChannel
    processNewResponses() // 處理返回客戶端的響應
    poll() // Kafka.Selector 輪詢讀取/寫入事件
    processCompletedReceives() // 處理客戶端的請求,放到阻塞隊列
    processCompletedSends() // 處理返回客戶端響應後的回調
    processDisconnected() // 斷開連接後的處理
  }
}

KafkaRequestHandler 線程阻塞隊列

def run() {
  while (!stopped) {
    val startSelectTime = time.nanoseconds
    // 從阻塞隊列拉取請求
    val req = requestChannel.receiveRequest(300) 

    req match {
      case request: RequestChannel.Request =>
        try {
          apis.handle(request) // 調用`KafkaApis.handle`方法,進行集中處理請求。
        }
    }
  }
}

KSelector

  參考客戶端源碼分析。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 快速熱門指導,櫥窗註意細節 雙語字幕添加,資源對接擔保 全國抖音聯盟內部唯一官網 : http://douyinlianmeng.tk ...
  • 接上篇內容描述談談平臺設計思路及方法。前一篇簡單的介紹了一下整體的實現思路。那從本節開始開始說明如何引入資料庫表視圖等,因為我們不管做什麼項目軟體,設計的時候能直接將頁面控制項綁定到指定的欄位,那樣後續的操作就會很方便。至於實現的方法,有很多種,每個人可能有各自不同的思路,我不去評價別人是怎麼做的,僅 ...
  • 1、背景 友情鏈接:https://www.cnblogs.com/Agui520/p/11187972.html https://blog.csdn.net/fd2025/article/details/79863390 以支付、電商下單為例子。一個電商系統包含了好幾大類模塊,就比如有用戶模塊、商 ...
  • 我的那些年(13)~主推微服務架構 整個系統走向微服務架構 網關 服務註冊與發現 配置中心 熔斷器 鏈路跟蹤 授權與鑒權 服務間的通訊 同步feign 服務間的通訊 非同步消息 日誌收集 個系統走向微服務架構 公司系統比較多,耦合度比較大,將這些模塊進行拆分,各個負責自己的模塊,減少相互之間的直接依賴 ...
  • 架構雜談《四》 分散式一致性協議 一、引言 在分散式系統中,為了保證數據的高可用,通常會將數據保留多個副本(replica),這些個副本會放在不同的物理機上,為了對用戶提供正確的數據,我們需要保證這些放在不同物理機上的副本是一致的。為瞭解決這種分散式一致性問題,提出了很多經典的協議和演算法,比較著名的 ...
  • 1.怎麼 判斷元素是否存在? 判斷元素是否存在和是否出現不同, 判斷是否存在意味著如果這個元素壓根就不存在, 就會拋出NoSuchElementException 這樣就可以使用try catch,如果catch到NoSuchElementException 就返回false 2.如何判斷元素是否出 ...
  • 此套視頻信息量非常大,我畫了個思維導圖,除了上面的知識點,還有大量的練習和實踐項目。 下載地址 此套視頻信息量非常大,我畫了個思維導圖,除了上面的知識點,還有大量的練習和實踐項目。 下載地址 ...
  • 真正從事開發的第二個月,回顧之前所學的一些知識,從最開始的登錄註冊開始做,沒想到竟然會如此的慘烈。各種問題頻出,也算是增長了一點知識 首先從最開始的創建項目開始:遇到問題一:右鍵點擊新建沒有創建Java類的按鈕,真是第一次遇到,後通過查詢才知道 IDEA開發需要先將文件夾轉換為Sources文件夾才 ...
一周排行
    -Advertisement-
    Play Games
  • 示例項目結構 在 Visual Studio 中創建一個 WinForms 應用程式後,項目結構如下所示: MyWinFormsApp/ │ ├───Properties/ │ └───Settings.settings │ ├───bin/ │ ├───Debug/ │ └───Release/ ...
  • [STAThread] 特性用於需要與 COM 組件交互的應用程式,尤其是依賴單線程模型(如 Windows Forms 應用程式)的組件。在 STA 模式下,線程擁有自己的消息迴圈,這對於處理用戶界面和某些 COM 組件是必要的。 [STAThread] static void Main(stri ...
  • 在WinForm中使用全局異常捕獲處理 在WinForm應用程式中,全局異常捕獲是確保程式穩定性的關鍵。通過在Program類的Main方法中設置全局異常處理,可以有效地捕獲並處理未預見的異常,從而避免程式崩潰。 註冊全局異常事件 [STAThread] static void Main() { / ...
  • 前言 給大家推薦一款開源的 Winform 控制項庫,可以幫助我們開發更加美觀、漂亮的 WinForm 界面。 項目介紹 SunnyUI.NET 是一個基於 .NET Framework 4.0+、.NET 6、.NET 7 和 .NET 8 的 WinForm 開源控制項庫,同時也提供了工具類庫、擴展 ...
  • 說明 該文章是屬於OverallAuth2.0系列文章,每周更新一篇該系列文章(從0到1完成系統開發)。 該系統文章,我會儘量說的非常詳細,做到不管新手、老手都能看懂。 說明:OverallAuth2.0 是一個簡單、易懂、功能強大的許可權+可視化流程管理系統。 有興趣的朋友,請關註我吧(*^▽^*) ...
  • 一、下載安裝 1.下載git 必須先下載並安裝git,再TortoiseGit下載安裝 git安裝參考教程:https://blog.csdn.net/mukes/article/details/115693833 2.TortoiseGit下載與安裝 TortoiseGit,Git客戶端,32/6 ...
  • 前言 在項目開發過程中,理解數據結構和演算法如同掌握蓋房子的秘訣。演算法不僅能幫助我們編寫高效、優質的代碼,還能解決項目中遇到的各種難題。 給大家推薦一個支持C#的開源免費、新手友好的數據結構與演算法入門教程:Hello演算法。 項目介紹 《Hello Algo》是一本開源免費、新手友好的數據結構與演算法入門 ...
  • 1.生成單個Proto.bat內容 @rem Copyright 2016, Google Inc. @rem All rights reserved. @rem @rem Redistribution and use in source and binary forms, with or with ...
  • 一:背景 1. 講故事 前段時間有位朋友找到我,說他的窗體程式在客戶這邊出現了卡死,讓我幫忙看下怎麼回事?dump也生成了,既然有dump了那就上 windbg 分析吧。 二:WinDbg 分析 1. 為什麼會卡死 窗體程式的卡死,入口門檻很低,後續往下分析就不一定了,不管怎麼說先用 !clrsta ...
  • 前言 人工智慧時代,人臉識別技術已成為安全驗證、身份識別和用戶交互的關鍵工具。 給大家推薦一款.NET 開源提供了強大的人臉識別 API,工具不僅易於集成,還具備高效處理能力。 本文將介紹一款如何利用這些API,為我們的項目添加智能識別的亮點。 項目介紹 GitHub 上擁有 1.2k 星標的 C# ...