寫入流程 Kafka 服務端通過 的`handle() ApiKeys handle() PRODUCE`類型,表示有生產者客戶端發送了消息,之後將消息傳遞給副本管理器處理。 副本管理器會將消息追加到分區 leader 副本的日誌文件中,然而實際上並不 ...
寫入流程
Kafka 服務端通過KafkaApis
的handle()
方法來統一處理請求,ApiKeys
枚舉了能被handle()
方法處理的請求類型嗎,如果是PRODUCE
類型,表示有生產者客戶端發送了消息,之後將消息傳遞給副本管理器處理。
副本管理器會將消息追加到分區 leader 副本的日誌文件中,然而實際上並不是直接寫入磁碟的,Kafka 會將日誌的段 segment 緩存到跳躍表ConcurrentSkipListMap
。寫入日誌時,首先會從緩存中查找段,如果能找到,則向該段中追加記錄,記錄包含日誌文件、索引文件、時間戳文件。日誌文件會被寫入FileChannel
中,索引文件和時間戳文件會被寫入MappedByteBuffer
中。最後,後臺調度程式會周期地將段文件刷新到磁碟持久化。
如果段緩存中沒有找到合適的段,則通過FileChannel
來打開一個新的段,返回磁碟文件的段映射,封裝為 segment 後,再緩存到跳躍表中,供下一次追加日誌時使用。
日誌的寫入處理流程
索引和偏移量的寫入處理流程
讀取流程
如果請求是FETCH
類型,表示有消費者客戶端發送了拉取請求,同樣,將請求傳遞給副本管理器處理。
副本管理器在寫入日誌時,將段緩存到了跳躍表中,因此讀取時,可以直接從跳躍表中獲取段,向該段發起讀取操作。首先,利用二分查找演算法,查找mmap
中的索引文件,根據索引文件記錄的日誌偏移量,遍歷查找FileChannel
中的日誌文件所在的位置,最後,根據偏移量和需要拉取的大小獲取日誌片段,返回給消費者。