Kafka2.0服務端啟動源碼

来源:https://www.cnblogs.com/bigshark/archive/2019/07/18/11204428.html
-Advertisement-
Play Games

  Kafka 服務端通過 的主函數 方法啟動。 類提供讀取配置文件、啟動/停止服務的方法。而啟動/停止服務最終調用的是 的`startup/shutdown`方法。 啟動流程 1. 啟動 zk 客戶端。 2. 啟動動態配置。 3. 啟動調度線程池。 4. 啟動日誌管理器的後臺線 ...


  Kafka 服務端通過Kafka.scala的主函數main方法啟動。KafkaServerStartable類提供讀取配置文件、啟動/停止服務的方法。而啟動/停止服務最終調用的是KafkaServerstartup/shutdown方法。

啟動流程

  1. 啟動 zk 客戶端。
  2. 啟動動態配置。
  3. 啟動調度線程池。
  4. 啟動日誌管理器的後臺線程,包括日誌清理、日誌刷盤、日誌刪除、日誌壓縮。
  5. 啟動 NIO Socket 服務
    1. 初始化一個接收器Acceptor,即啟動 NIO Socket。
    2. 添加num.network.threads個接收器到請求通道RequestChannel的處理器緩存ConcurrentHashMap,key 為遞增編號,value 為處理器Processor
    3. Acceptor執行CountDownLatch.await等待通知啟動。
    4. 緩存AcceptorConcurrentHashMap,key 為EndPoint,value 為Acceptor
  6. 啟動副本管理器。
  7. 在 zk 註冊 broker。
  8. 啟動控制器。
  9. 啟動組協調器。
  10. 啟動事務協調器。
  11. 初始化KafkaApis
  12. 初始化處理器線程緩存池
    1. 啟動num.io.threads個請求處理器線程KafkaRequestHandler
    2. 從阻塞隊列ArrayBlockingQueue獲取請求,調用KafkaApis.handle方法,進行集中處理請求。
  13. 啟動處理器線程
    1. 首先CountDownLatch.countDown通知喚醒Acceptor線程。
      1. 使用NIO.select輪詢。
      2. 如果有可接收就緒的事件,則將當前的SocketChannel加入緩存隊列ConcurrentLinkedQueue
    2. 從上述緩存隊列取出SocketChannel,綁定到KafkaChannel
    3. 將接收到的請求緩存到限長阻塞隊列ArrayBlockingQueue

請求處理流程

服務端請求處理流程

詳細源碼分析

Acceptor 線程

def run() {
  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 註冊接收事件
  startupComplete() // 通知 Acceptor 線程
  var currentProcessor = 0
  while (isRunning) {
    val ready = nioSelector.select(500) // 輪詢事件
    if (ready > 0) {
      val keys = nioSelector.selectedKeys()
      val iter = keys.iterator()
      while (iter.hasNext && isRunning) {
        val key = iter.next
        iter.remove()
        if (key.isAcceptable) { // 有可接受事件
          val processor = synchronized {
            currentProcessor = currentProcessor % processors.size
            processors(currentProcessor) // 緩存 Processor 
          }
          accept(key, processor) // 將 SocketChannel 緩存到隊列
        }
      }
    }
  }
}

Processor 線程

override def run() {
  startupComplete() // CountDownLatch.countDown 喚醒 Acceptor 線程。
  while (isRunning) {
    configureNewConnections() // 從緩存隊列取出 SocketChannel,綁定到 KafkaChannel
    processNewResponses() // 處理返回客戶端的響應
    poll() // Kafka.Selector 輪詢讀取/寫入事件
    processCompletedReceives() // 處理客戶端的請求,放到阻塞隊列
    processCompletedSends() // 處理返回客戶端響應後的回調
    processDisconnected() // 斷開連接後的處理
  }
}

KafkaRequestHandler 線程阻塞隊列

def run() {
  while (!stopped) {
    val startSelectTime = time.nanoseconds
    // 從阻塞隊列拉取請求
    val req = requestChannel.receiveRequest(300) 

    req match {
      case request: RequestChannel.Request =>
        try {
          apis.handle(request) // 調用`KafkaApis.handle`方法,進行集中處理請求。
        }
    }
  }
}

KSelector

  參考客戶端源碼分析。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 快速熱門指導,櫥窗註意細節 雙語字幕添加,資源對接擔保 全國抖音聯盟內部唯一官網 : http://douyinlianmeng.tk ...
  • 接上篇內容描述談談平臺設計思路及方法。前一篇簡單的介紹了一下整體的實現思路。那從本節開始開始說明如何引入資料庫表視圖等,因為我們不管做什麼項目軟體,設計的時候能直接將頁面控制項綁定到指定的欄位,那樣後續的操作就會很方便。至於實現的方法,有很多種,每個人可能有各自不同的思路,我不去評價別人是怎麼做的,僅 ...
  • 1、背景 友情鏈接:https://www.cnblogs.com/Agui520/p/11187972.html https://blog.csdn.net/fd2025/article/details/79863390 以支付、電商下單為例子。一個電商系統包含了好幾大類模塊,就比如有用戶模塊、商 ...
  • 我的那些年(13)~主推微服務架構 整個系統走向微服務架構 網關 服務註冊與發現 配置中心 熔斷器 鏈路跟蹤 授權與鑒權 服務間的通訊 同步feign 服務間的通訊 非同步消息 日誌收集 個系統走向微服務架構 公司系統比較多,耦合度比較大,將這些模塊進行拆分,各個負責自己的模塊,減少相互之間的直接依賴 ...
  • 架構雜談《四》 分散式一致性協議 一、引言 在分散式系統中,為了保證數據的高可用,通常會將數據保留多個副本(replica),這些個副本會放在不同的物理機上,為了對用戶提供正確的數據,我們需要保證這些放在不同物理機上的副本是一致的。為瞭解決這種分散式一致性問題,提出了很多經典的協議和演算法,比較著名的 ...
  • 1.怎麼 判斷元素是否存在? 判斷元素是否存在和是否出現不同, 判斷是否存在意味著如果這個元素壓根就不存在, 就會拋出NoSuchElementException 這樣就可以使用try catch,如果catch到NoSuchElementException 就返回false 2.如何判斷元素是否出 ...
  • 此套視頻信息量非常大,我畫了個思維導圖,除了上面的知識點,還有大量的練習和實踐項目。 下載地址 此套視頻信息量非常大,我畫了個思維導圖,除了上面的知識點,還有大量的練習和實踐項目。 下載地址 ...
  • 真正從事開發的第二個月,回顧之前所學的一些知識,從最開始的登錄註冊開始做,沒想到竟然會如此的慘烈。各種問題頻出,也算是增長了一點知識 首先從最開始的創建項目開始:遇到問題一:右鍵點擊新建沒有創建Java類的按鈕,真是第一次遇到,後通過查詢才知道 IDEA開發需要先將文件夾轉換為Sources文件夾才 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...