高吞吐量的分散式發佈訂閱消息系統Kafka之Producer源碼分析

来源:https://www.cnblogs.com/MonsterJ/archive/2020/05/30/12994418.html
-Advertisement-
Play Games

引言 Kafka是一款很棒的消息系統,今天我們就來深入瞭解一下它的實現細節,首先關註Producer這一方。 要使用kafka首先要實例化一個KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、ba ...


引言

Kafka是一款很棒的消息系統,今天我們就來深入瞭解一下它的實現細節,首先關註Producer這一方。

要使用kafka首先要實例化一個KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、batch.size等非必要Properties,通過這個簡單的介面可以控制Producer大部分行為,實例化後就可以調用send方法發送消息了。

核心實現是這個方法:

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//①
    return doSend(interceptedRecord, callback);//②
}

通過不同的模式可以實現發送即忘(忽略返回結果)、同步發送(獲取返回的future對象,回調函數置為null)、非同步發送(設置回調函數)三種消息模式。

我們來看看消息類ProducerRecord有哪些屬性:

private final String topic;//主題
private final Integer partition;//分區
private final Headers headers;//頭
private final K key;//鍵
private final V value;//值
private final Long timestamp;//時間戳

它有多個構造函數,可以適應不同的消息類型:比如有無分區、有無key等。

①中ProducerInterceptors(有0 ~ 無窮多個,形成一個攔截鏈)對ProducerRecord進行攔截處理(比如打上時間戳,進行審計與統計等操作)

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
    ProducerRecord<K, V> interceptRecord = record;
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptRecord = interceptor.onSend(interceptRecord);
        } catch (Exception e) {
            // 不拋出異常,繼續執行下一個攔截器
            if (record != null)
                log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
            else
                log.warn("Error executing interceptor onSend callback", e);
        }
    }
    return interceptRecord;
}

如果用戶有定義就進行處理並返回處理後的ProducerRecord,否則直接返回本身。
然後②中doSend真正發送消息,並且是非同步的(源碼太長只保留關鍵):

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // 序列化 key 和 value
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
        }
        // 計算分區獲得主題與分區
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);
        // 回調與事務處理省略。
        Header[] headers = record.headers().toArray();
        // 消息追加到RecordAccumulator中
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs);
        // 該批次滿了或者創建了新的批次就要喚醒IO線程發送該批次了,也就是sender的wakeup方法
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
    } catch (Exception e) {
        // 攔截異常並拋出
        this.interceptors.onSendError(record, tp, e);
        throw e;
    }
}

下麵是計算分區的方法:

private int partition(ProducerRecord<K, V> record, 
byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    // 消息有分區就直接使用,否則就使用分區器計算
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey,
                     record.value(), serializedValue, cluster);
}

預設的分區器DefaultPartitioner實現方式是如果partition存在就直接使用,否則根據key計算partition,如果key也不存在就使用round robin演算法分配partition。

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
 */
public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {//key為空 
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分區
            if (availablePartitions.size() > 0) {//有分區,取模就行
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {// 無分區,
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {// key 不為空,計算key的hash並取模獲得分區
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();//返回並加一,在取模的配合下就是round robin
    }
}

以上就是發送消息的邏輯處理,接下來我們再看看消息發送的物理處理。

Sender(是一個Runnable,被包含在一個IO線程ioThread中,該線程不斷從RecordAccumulator隊列中的讀取消息並通過Selector將數據發送給Broker)的wakeup方法,實際上是KafkaClient介面的wakeup方法,由NetworkClient類實現,採用了NIO,也就是java.nio.channels.Selector.wakeup()方法實現。

Sender的run中主要邏輯是不停執行準備消息和等待消息:

long pollTimeout = sendProducerData(now);//③
client.poll(pollTimeout, now);//④

③完成消息設置並保存到通道中,然後監聽感興趣的key,由KafkaChannel實現。

public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

// transportLayer的一種實現中的相關方法
public void addInterestOps(int ops) {
    key.interestOps(key.interestOps() | ops);
}

④主要是Selector的poll,其select被wakeup喚醒:

public void poll(long timeout) throws IOException {
    /* check ready keys */
    long startSelect = time.nanoseconds();
    int numReadyKeys = select(timeout);//wakeup使其停止阻塞
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
        Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

        // Poll from channels that have buffered data (but nothing more from the underlying socket)
        if (dataInBuffers) {
            keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
            Set<SelectionKey> toPoll = keysWithBufferedRead;
            keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
            pollSelectionKeys(toPoll, false, endSelect);
        }

        // Poll from channels where the underlying socket has more data
        pollSelectionKeys(readyKeys, false, endSelect);
        // Clear all selected keys so that they are included in the ready count for the next select
        readyKeys.clear();

        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
        immediatelyConnectedKeys.clear();
    } else {
        madeReadProgressLastPoll = true; //no work is also "progress"
    }

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
}

其中pollSelectionKeys方法會調用如下方法完成消息發送:

public Send write() throws IOException {
    Send result = null;
    if (send != null && send(send)) {
        result = send;
        send = null;
    }
    return result;
}
private boolean send(Send send) throws IOException {
    send.writeTo(transportLayer);
    if (send.completed())
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
    return send.completed();
}

Send是一次數據發包,一般由ByteBufferSend或者MultiRecordsSend實現,其writeTo調用transportLayer的write方法,一般由PlaintextTransportLayer或者SslTransportLayer實現,區分是否使用ssl:

public long writeTo(GatheringByteChannel channel) throws IOException {
    long written = channel.write(buffers);
    if (written < 0)
        throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
    remaining -= written;
    pending = TransportLayers.hasPendingWrites(channel);
    return written;
}

public int write(ByteBuffer src) throws IOException {
    return socketChannel.write(src);
}

到此就把Producer的業務相關邏輯處理和非業務相關的網路 2方面的主要流程梳理清楚了。其他額外的功能是通過一些配置保證的。

比如順序保證就是max.in.flight.requests.per.connection,InFlightRequests的doSend會進行判斷(由NetworkClient的canSendRequest調用),只要該參數設為1即可保證當前包未確認就不能發送下一個包從而實現有序性

public boolean canSendMore(String node) {
    Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
    return queue == null || queue.isEmpty() ||
           (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}

再比如可靠性,通過設置acks,Sender中sendProduceRequest的clientRequest加入了回調函數:

  RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());//調用completeBatch
        }
    };
    
     /**
     * 完成或者重試投遞,這裡如果acks不對就會重試
     *
     * @param batch The record batch
     * @param response The produce response
     * @param correlationId The correlation id for the request
     * @param now The current POSIX timestamp in milliseconds
     */
    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                               long now, long throttleUntilTimeMs) {
    }
    
    public class ProduceResponse extends AbstractResponse {
      /**
         * Possible error code:
         * INVALID_REQUIRED_ACKS (21)
         */
    }

kafka源碼一層一層包裝很多,錯綜複雜,如有錯誤請大家不吝賜教。


您的分享是我們最大的動力!

-Advertisement-
Play Games
更多相關文章
  • 圖中是暗黑領域,非常牛逼的技能。 背景 DDD中出現的名詞: 領域,子領域,核心域,通用域,支撐域,限界上下文,聚合,聚合根,實體,值對象 都是關鍵概念,但是又比較晦澀,在開始DDD之前,搞清楚這些關鍵概念名詞非常的重要。 那它們作用體現在哪裡呢? 領域-子領域 領域是: 從事專門活動或者事業的範圍 ...
  • #include<vector> // 包含頭文件vector ... using namespace std; // vector包含在std中,因此必須包含std::vector vector <int> vi; // create a zero-size array of int int n; ...
  • python的變數是存在作用域的,在代碼中不同位置的變數作用的範圍會有所不同,比如有的變數在整段代碼中都可以使用,有的變數卻只在函數內部使用。python中把能夠在整段代碼任意位置有效的變數稱為全局變數,只在函數內部使用的變數稱作局部變數。 全局變數: a = 520 #此時a作為全局變數 def ...
  • 1. 應用測試的介紹 一般我們在寫完代碼之後,代碼的測試是會給專門的測試人員來測試的,如果一個測試跑到你的工位上對你說,你的代碼好像有Bug,你肯定會不爽,反正我就是這樣的🙃。所以為了顯示自己的代碼質量高一點,在功能提交給測試之前,我們會自己測試一下,接下來給大家介紹一下 Spring Boot ...
  • 前言 本文的文字及圖片來源於網路,僅供學習、交流使用,不具有任何商業用途,版權歸原作者所有,如有問題請及時聯繫我們以作處理。 當需要進行大規模查詢時(比如目前遇到的情形:查詢某個省所有發債企業的YY評級分數),人工查詢顯然太過費時,那就寫個爬蟲吧。 由於該爬蟲實在過於簡單,就只簡單概述下。 一、請求 ...
  • 幾乎所有語言的第一個程式都是"HelloWorld" 就像所有單片機初學者一樣,點亮第一個LED燈開始 而起初我們編寫/學習Java程式,都是通過記事本來編寫的,這裡推薦一個Editplus(提取碼:qq1t)記事本文件給大家 這裡要分清楚一個概念,所有Java源程式的尾碼都是*.Java,可以新建 ...
  • 對於動漫愛好者來說,海賊王、火影、死神三大動漫神作你肯定肯定不陌生了。小編身邊很多的同事仍然深愛著這些經典神作,可見“中毒”至深。今天小編利用Python大法帶大家分析一下這些神作,看看這些神作到底在講些神馬。 人生苦短,我用Python。小編利用Python網路爬蟲爬取了豆瓣網,將網站上關於這三部 ...
  • Linux命令之top命令介紹 top命令是Linux下常用的性能分析工具,能夠實時顯示系統中各個進程的資源占用狀況,類似於Windows的任務管理器。下麵詳細介紹它的使用方法。 top是一個動態顯示過程,即可以通過用戶按鍵來不斷刷新當前狀態.如果在前臺執行該命令,它將獨占前臺,直到用戶終止該程式為 ...
一周排行
    -Advertisement-
    Play Games
  • 移動開發(一):使用.NET MAUI開發第一個安卓APP 對於工作多年的C#程式員來說,近來想嘗試開發一款安卓APP,考慮了很久最終選擇使用.NET MAUI這個微軟官方的框架來嘗試體驗開發安卓APP,畢竟是使用Visual Studio開發工具,使用起來也比較的順手,結合微軟官方的教程進行了安卓 ...
  • 前言 QuestPDF 是一個開源 .NET 庫,用於生成 PDF 文檔。使用了C# Fluent API方式可簡化開發、減少錯誤並提高工作效率。利用它可以輕鬆生成 PDF 報告、發票、導出文件等。 項目介紹 QuestPDF 是一個革命性的開源 .NET 庫,它徹底改變了我們生成 PDF 文檔的方 ...
  • 項目地址 項目後端地址: https://github.com/ZyPLJ/ZYTteeHole 項目前端頁面地址: ZyPLJ/TreeHoleVue (github.com) https://github.com/ZyPLJ/TreeHoleVue 目前項目測試訪問地址: http://tree ...
  • 話不多說,直接開乾 一.下載 1.官方鏈接下載: https://www.microsoft.com/zh-cn/sql-server/sql-server-downloads 2.在下載目錄中找到下麵這個小的安裝包 SQL2022-SSEI-Dev.exe,運行開始下載SQL server; 二. ...
  • 前言 隨著物聯網(IoT)技術的迅猛發展,MQTT(消息隊列遙測傳輸)協議憑藉其輕量級和高效性,已成為眾多物聯網應用的首選通信標準。 MQTTnet 作為一個高性能的 .NET 開源庫,為 .NET 平臺上的 MQTT 客戶端與伺服器開發提供了強大的支持。 本文將全面介紹 MQTTnet 的核心功能 ...
  • Serilog支持多種接收器用於日誌存儲,增強器用於添加屬性,LogContext管理動態屬性,支持多種輸出格式包括純文本、JSON及ExpressionTemplate。還提供了自定義格式化選項,適用於不同需求。 ...
  • 目錄簡介獲取 HTML 文檔解析 HTML 文檔測試參考文章 簡介 動態內容網站使用 JavaScript 腳本動態檢索和渲染數據,爬取信息時需要模擬瀏覽器行為,否則獲取到的源碼基本是空的。 本文使用的爬取步驟如下: 使用 Selenium 獲取渲染後的 HTML 文檔 使用 HtmlAgility ...
  • 1.前言 什麼是熱更新 游戲或者軟體更新時,無需重新下載客戶端進行安裝,而是在應用程式啟動的情況下,在內部進行資源或者代碼更新 Unity目前常用熱更新解決方案 HybridCLR,Xlua,ILRuntime等 Unity目前常用資源管理解決方案 AssetBundles,Addressable, ...
  • 本文章主要是在C# ASP.NET Core Web API框架實現向手機發送驗證碼簡訊功能。這裡我選擇是一個互億無線簡訊驗證碼平臺,其實像阿裡雲,騰訊雲上面也可以。 首先我們先去 互億無線 https://www.ihuyi.com/api/sms.html 去註冊一個賬號 註冊完成賬號後,它會送 ...
  • 通過以下方式可以高效,並保證數據同步的可靠性 1.API設計 使用RESTful設計,確保API端點明確,並使用適當的HTTP方法(如POST用於創建,PUT用於更新)。 設計清晰的請求和響應模型,以確保客戶端能夠理解預期格式。 2.數據驗證 在伺服器端進行嚴格的數據驗證,確保接收到的數據符合預期格 ...