引言 Kafka是一款很棒的消息系統,今天我們就來深入瞭解一下它的實現細節,首先關註Producer這一方。 要使用kafka首先要實例化一個KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、ba ...





public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);//①
    return doSend(interceptedRecord, callback);//②



private final String topic;//主題
private final Integer partition;//分區
private final Headers headers;//頭
private final K key;//鍵
private final V value;//值
private final Long timestamp;//時間戳


①中ProducerInterceptors(有0 ~ 無窮多個,形成一個攔截鏈)對ProducerRecord進行攔截處理(比如打上時間戳,進行審計與統計等操作)

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
    ProducerRecord<K, V> interceptRecord = record;
    for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
        try {
            interceptRecord = interceptor.onSend(interceptRecord);
        } catch (Exception e) {
            // 不拋出異常,繼續執行下一個攔截器
            if (record != null)
                log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                log.warn("Error executing interceptor onSend callback", e);
    return interceptRecord;


private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // 序列化 key 和 value
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
        // 計算分區獲得主題與分區
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);
        // 回調與事務處理省略。
        Header[] headers = record.headers().toArray();
        // 消息追加到RecordAccumulator中
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs);
        // 該批次滿了或者創建了新的批次就要喚醒IO線程發送該批次了,也就是sender的wakeup方法
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
        return result.future;
    } catch (Exception e) {
        // 攔截異常並拋出
        this.interceptors.onSendError(record, tp, e);
        throw e;


private int partition(ProducerRecord<K, V> record, 
byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    // 消息有分區就直接使用,否則就使用分區器計算
    return partition != null ?
            partition :
                    record.topic(), record.key(), serializedKey,
                     record.value(), serializedValue, cluster);

預設的分區器DefaultPartitioner實現方式是如果partition存在就直接使用,否則根據key計算partition,如果key也不存在就使用round robin演算法分配partition。

 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose a partition in a round-robin fashion
public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {//key為空 
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分區
            if (availablePartitions.size() > 0) {//有分區,取模就行
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {// 無分區,
                return Utils.toPositive(nextValue) % numPartitions;
        } else {// key 不為空,計算key的hash並取模獲得分區
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
        return counter.getAndIncrement();//返回並加一,在取模的配合下就是round robin




long pollTimeout = sendProducerData(now);//③
client.poll(pollTimeout, now);//④


public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;

// transportLayer的一種實現中的相關方法
public void addInterestOps(int ops) {
    key.interestOps(key.interestOps() | ops);


public void poll(long timeout) throws IOException {
    /* check ready keys */
    long startSelect = time.nanoseconds();
    int numReadyKeys = select(timeout);//wakeup使其停止阻塞
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
        Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

        // Poll from channels that have buffered data (but nothing more from the underlying socket)
        if (dataInBuffers) {
            keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
            Set<SelectionKey> toPoll = keysWithBufferedRead;
            keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
            pollSelectionKeys(toPoll, false, endSelect);

        // Poll from channels where the underlying socket has more data
        pollSelectionKeys(readyKeys, false, endSelect);
        // Clear all selected keys so that they are included in the ready count for the next select

        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    } else {
        madeReadProgressLastPoll = true; //no work is also "progress"

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());


public Send write() throws IOException {
    Send result = null;
    if (send != null && send(send)) {
        result = send;
        send = null;
    return result;
private boolean send(Send send) throws IOException {
    if (send.completed())
    return send.completed();


public long writeTo(GatheringByteChannel channel) throws IOException {
    long written = channel.write(buffers);
    if (written < 0)
        throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
    remaining -= written;
    pending = TransportLayers.hasPendingWrites(channel);
    return written;

public int write(ByteBuffer src) throws IOException {
    return socketChannel.write(src);

到此就把Producer的業務相關邏輯處理和非業務相關的網路 2方面的主要流程梳理清楚了。其他額外的功能是通過一些配置保證的。


public boolean canSendMore(String node) {
    Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
    return queue == null || queue.isEmpty() ||
           (queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);


  RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) {
            handleProduceResponse(response, recordsByPartition, time.milliseconds());//調用completeBatch
     * 完成或者重試投遞,這裡如果acks不對就會重試
     * @param batch The record batch
     * @param response The produce response
     * @param correlationId The correlation id for the request
     * @param now The current POSIX timestamp in milliseconds
    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                               long now, long throttleUntilTimeMs) {
    public class ProduceResponse extends AbstractResponse {
         * Possible error code:



