Kafka 是一款非常优秀的消息中间件,流式处理平台。在本文中,我们将深入分析 Kafka 消息生产过程的源码实现,揭示生产者从消息生成到发送的完整流程。 本质旨在描述消息生产和传输的大致流程,具体诸多细节需要读者自行学习。

生产者

首先介绍 Kafka 生产者的核心组件,包括 ProducerConfig 的配置参数解析、消息累加器(RecordAccumulator)的缓冲机制、元数据管理、发送线程等。

实例代码:


Properties props = new Properties();

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

 return new KafkaProducer<>(props);

上面是客户端使用的示例代码,首先需要创建配置,然后创建 kafka 客户端类 KafkaProducer

ProducerConfig

kafka 定义了非常多的配置满足使用者的定制化需求,文档地址

生产者的配置定义在 ProducerConfig 类中,并且每个配置都有对应的文档说明。

public static final String BATCH_SIZE_CONFIG = "batch.size";
 private static final String BATCH_SIZE_DOC = "The producer will attempt ....."

客户端配置的初始化本质是 ProducerConfig 类的初始化。

return new KafkaProducer<>(props);

//props -> Utils.propsToMap(properties)

public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
  this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, 
  keySerializer, valueSerializer)),..... )
}

public ProducerConfig(Map<String, Object> props) { super(CONFIG, props); }

调用父类初始化方法,CONFIG 是静态遍历,类加载时初始化,为每个配置的属性指定默认值。

单个配置由 ConfigKey 定义。

 CONFIG = new ConfigDef()
 .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
 .....
 .defineXXX();

public static class ConfigKey {
    public final String name;
    public final Type type;
    public final String documentation;
    public final Object defaultValue;
}

在父类方法中将默认配置和用户定义配置进行合并。

public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
    this(definition, originals, Collections.emptyMap(), true);
 }

public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
    this.values = definition.parse(this.originals);
    this.values.putAll(configUpdates);
    definition.parse(this.values);
    this.definition = definition;
}

生产者初始化

下面是生产者初始化的核心方法,config 的值是上文刚刚创建的,time 的值是 Time.SYSTEM,其余参数均为 null

KafkaProducer(ProducerConfig config,
              Serializer<K> keySerializer, Serializer<V> valueSerializer,
              ProducerMetadata metadata,
              KafkaClient kafkaClient,
              ProducerInterceptors<K, V> interceptors,
              Time time);

KafkaProducer 有很多核心组件,首先简单罗列组件,使用时详细介绍实现原理。

分区器

this.partitioner = config.getConfiguredInstance(
        ProducerConfig.PARTITIONER_CLASS_CONFIG,
        Partitioner.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));

boolean enableAdaptivePartitioning = partitioner == null &&
                config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
                
RecordAccumulator.PartitionerConfig partitionerConfig = 
new RecordAccumulator.PartitionerConfig( enableAdaptivePartitioning,
                config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG))

序列化

this.keySerializer = config.getConfiguredInstance(
   ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.valueSerializer = config.getConfiguredInstance(
    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
拦截器
List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.configuredInterceptors(
    config, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);

压缩算法

this.compression = configureCompression(config);

累加器

this.accumulator = new RecordAccumulator(logContext,
    batchSize, compression, lingerMs(config), retryBackoffMs,
    retryBackoffMaxMs, deliveryTimeoutMs, partitionerConfig, metrics,
    PRODUCER_METRIC_GROUP_NAME, time, apiVersions, transactionManager,
    new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));

元数据管理

List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
this.metadata = new ProducerMetadata(retryBackoffMs,
        retryBackoffMaxMs,
        config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
        config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
        logContext,
        clusterResourceListeners,
        Time.SYSTEM);
this.metadata.bootstrap(addresses);

发送者线程

this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

网络层客户端

public static NetworkClient createNetworkClient(AbstractConfig config,
                                              String clientId,
                                              Metrics metrics,
                                              String metricsGroupPrefix,
                                              LogContext logContext,
                                              ApiVersions apiVersions,
                                              Time time,..);

元数据管理

元数据

客户端需要获取 kafka 的集群信息以便在发送数据的时候使用。这些数据包括集群节点信息、topic 、partition、partition、leader 等。

维护元数据的类并不做过多介绍,只需要知道这些数据需要在客户端维护,并且需要根据情况进行更新。

public class Metadata implements Closeable {
    private volatile MetadataSnapshot metadataSnapshot = MetadataSnapshot.empty();
    private List<InetSocketAddress> bootstrapAddresses;
}

public class MetadataSnapshot {
    private final String clusterId;
    private final Map<Integer, Node> nodes;
    private final Node controller;
    private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
    private final Map<String, Uuid> topicIds;
    private final Map<Uuid, String> topicNames;
    private Cluster clusterInstance;
}

public final class Cluster {
    private final List<Node> nodes;
    private final Node controller;
    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
    private final Map<String, List<PartitionInfo>> partitionsByTopic;
    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
    private final Map<Integer, Node> nodesById;
    private final ClusterResource clusterResource;
    private final Map<String, Uuid> topicIds;
    private final Map<Uuid, String> topicNames;
}

public final class TopicPartition implements Serializable {
    private final int partition;
    private final String topic;
}

拉取元数据

发送请求

sendInternalMetadataRequest 发送元数据同步请求。

// -> Sender#run -> NetworkClient#poll -> 
//-> NetworkClient.DefaultMetadataUpdater#maybeUpdate(long, org.apache.kafka.common.Node)
//-> NetworkClient#sendInternalMetadataRequest

void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
    ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
    doSend(clientRequest, true, now);
}

Selector#send 将请求设置到 KafkaChannel 中等待发送。

public void send(NetworkSend send){
    String connectionId = send.destinationId();
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
    channel.setSend(send);
}

public void setSend(NetworkSend send) {
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

Selectable#poll 会处理 channel 的事件将请求发送出去。

在 kafka 中,对所有请求进行了定义。

public enum ApiKeys {
    METADATA(ApiMessageType.METADATA),
    //omit
}

public Builder(MetadataRequestData data) {
    super(ApiKeys.METADATA);
    this.data = data;
}

处理响应

handleMetadataResponse 对响应返回进行处理,最终数据都存储在 MetadataSnapshot 中。

//-> NetworkClient#handleCompletedReceives
//-> NetworkClient.DefaultMetadataUpdater#handleSuccessfulResponse
//->Metadata#update

public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) {
    this.metadataSnapshot = handleMetadataResponse(response, isPartialUpdate, nowMs);
}

private MetadataSnapshot handleMetadataResponse(MetadataResponse metadataResponse,...) {
    for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
        String topicName = metadata.topic();
        Uuid topicId = metadata.topicId();
        for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
            updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs(), topicId, oldTopicId)
                .ifPresent(partitions::add);
        }
    }
}

PartitionMetadata 存储了 partition信息,leader partition、ISR 等。

public static class PartitionMetadata {
    public final TopicPartition topicPartition;
    public final Optional<Integer> leaderId;
    public final Optional<Integer> leaderEpoch;
    public final List<Integer> replicaIds;
    public final List<Integer> inSyncReplicaIds;
    public final List<Integer> offlineReplicaIds;
}

服务端

当请求发送出去,broker 作为服务端就开始处理请求。KafkaApis#handle 是所有请求的统一入口。

KafkaApis#handleTopicMetadataRequest 获取元数据并且返回给客户端。

//kafka.server.KafkaApis#handle
def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
    request.header.apiKey match {
        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
    }
}

def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
    val topicMetadata = getTopicMetadata(request, metadataRequest.isAllTopics, allowAutoCreation, authorizedTopics,
      request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)

    val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName)

    requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
    MetadataResponse.prepareResponse(
        brokers.toList.asJava,
        clusterId,......
        completeTopicMetadata.asJava,
    ))
 }

各种代码细节在后文介绍。

消息累加器

KafkaProducer#doSend 是消息发送的逻辑,但是实际这里是指把消息发送到累加器缓冲区。

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    //刷新元数据
    clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);

    //计算 partition
     int partition = partition(record, serializedKey, serializedValue, cluster);

    //将消息添加到累加器
     RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
                    serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);

    // 唤醒发送线程
     if (result.batchIsFull || result.newBatchCreated) {
        this.sender.wakeup();
    }
}

计算 partition

partition 由四种方式可以计算:

  1. 应用程序已经明确指定。
  2. 应用程序通过 partitioner 指定。
  3. 通过 serializedKey 计算。
  4. 默认,RecordMetadata.UNKNOWN_PARTITION,累加器通过原生逻辑指定。

Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION, which means that the RecordAccumulator would pick a partition using built-in logic (which may take into account broker load, the amount of data produced to each partition, etc.).

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    if (record.partition() != null)
        return record.partition();

    if (partitioner != null) {
        int customPartition = partitioner.partition(
            record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
        if (customPartition < 0) {
            throw new IllegalArgumentException(String.format(...));
        }
        return customPartition;
    }

    if (serializedKey != null && !partitionerIgnoreKeys) {
        // hash the keyBytes to choose a partition
        return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
    } else {
        return RecordMetadata.UNKNOWN_PARTITION;
    }
}

累加器结构

在累加器中,根据 topic 和 partition 对消息分门别类进行累加,最终储存消息的是 ProducerBatch。累加器缓冲区是一个双端队列。

public class RecordAccumulator {
    private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = 
        new CopyOnWriteMap<>();
}

private static class TopicInfo {
    public final ConcurrentMap<Integer /*partition*/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();

    public final BuiltInPartitioner builtInPartitioner;
    public TopicInfo(BuiltInPartitioner builtInPartitioner) {
        this.builtInPartitioner = builtInPartitioner;
    }
}

累加消息

append 通过 topic 和 partition 获取到 Deque<ProducerBatch>

tryAppend 找到 ProducerBatch

public RecordAppendResult append(String topic, int partition, long timestamp,
                                 byte[] key, byte[] value,...){
    //获取 topic
    TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, ......);
    while(true){
        effectivePartition = partition;
         Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());

        synchronized (dq) {
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
        }
    }
}

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                        Callback callback, Deque<ProducerBatch> deque, long nowMs) {
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        int initialBytes = last.estimatedSizeInBytes();
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
        int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
         return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes);
    }
    return null;
}

通过 writeTo 将 key、value、offsetDelta、timestampDelta 依次写入到缓冲区。

appendStream 理解成一段连续的字节数组。

//-> ProducerBatch#tryAppend -> MemoryRecordsBuilder#append()
//-> MemoryRecordsBuilder#appendWithOffset()
//->MemoryRecordsBuilder#appendDefaultRecord
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
                                    Header[] headers) throws IOException {
    int offsetDelta = (int) (offset - baseOffset);
    long timestampDelta = timestamp - baseTimestamp;
    int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
    recordWritten(offset, timestamp, sizeInBytes);
}

public static int writeTo(DataOutputStream out, int offsetDelta,
                            long timestampDelta, ByteBuffer key,
                            ByteBuffer value,){
  ByteUtils.writeVarlong(timestampDelta, out);
  ByteUtils.writeVarint(offsetDelta, out);

  int keySize = key.remaining();
  ByteUtils.writeVarint(keySize, out);
  Utils.writeTo(out, key, keySize);

  int valueSize = value.remaining();
  ByteUtils.writeVarint(valueSize, out);
  Utils.writeTo(out, value, valueSize);

  ByteUtils.writeVarint(headers.length, out);

  for (Header header : headers) {
   ByteUtils.writeVarint(utf8Bytes.length, out);
   out.write(utf8Bytes);
   out.write(headerValue);
  }
}

空间不足添加失败时,需要新创建 ProducerBatch 对象。

int size = Math.max(this.batchSize,....);
buffer = free.allocate(size, maxTimeToBlock);

private RecordAppendResult appendNewBatch(String topic,
                                              int partition,
                                              Deque<ProducerBatch> dq,...){
  //omit
    MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());

    ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);

    FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
            callbacks, nowMs));
}

最后唤醒发送线程

if (result.batchIsFull || result.newBatchCreated) {
   this.sender.wakeup();
}

发送线程

发送线程循环处理可发送的数据。

public class Sender implements Runnable {
      public void run() {
        while (running) {
          runOnce();
        }
    }
}

void runOnce() {
    //....
     long pollTimeout = sendProducerData(currentTimeMs);
     client.poll(pollTimeout, currentTimeMs);
}

目标服务器

通过遍历累加器找到目标 partition 的leader 节点。

//-> Sender#sendProducerData-> RecordAccumulator#ready
 public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    for (Map.Entry<String, TopicInfo> topicInfoEntry : this.topicInfoMap.entrySet()) {
       final String topic = topicInfoEntry.getKey();
         nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics);
    }
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, String topic,...){
    ConcurrentMap<Integer, Deque<ProducerBatch>> batches = topicInfo.batches;
     for (Map.Entry<Integer, Deque<ProducerBatch>> entry : batches.entrySet()) {
         TopicPartition part = new TopicPartition(topic, entry.getKey());
         Node leader = metadataSnapshot.cluster().leaderFor(part);
         //.....
        readyNodes.add(leader);
     }
}

为目标 leader 节点所在的 broker 准备好连接。

Iterator<Node> iter = result.readyNodes.iterator();
while (iter.hasNext()) {
    Node node = iter.next();
    if (!this.client.ready(node, now)) {
    //..
        }
    }

收集批量数据

以 broker 为维度收集发送数据。

public Map<Integer, List<ProducerBatch>> drain(MetadataSnapshot metadataSnapshot, Set<Node> nodes, int maxSize, long now) {
    Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
    for (Node node : nodes) {
        List<ProducerBatch> ready = drainBatchesForOneNode(metadataSnapshot, node, maxSize, now);
        batches.put(node.id(), ready);
    }
    return batches;
}

写入数据到缓冲区

ProduceRequest 包装批量数据为请求结构,callback绑定回调用函数,ApiKeys.PRODUCE 指定 api 类型,client.send 发送请求。

private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
  for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
     sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}

private void sendProduceRequest(....., List<ProducerBatch> batches) {
     ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
                new ProduceRequestData() .setTopicData(tpd));
     RequestCompletionHandler callback = ....;
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, ....., callback);
    client.send(clientRequest, now);
}

public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
    private final ProduceRequestData data;
    public Builder(..... ProduceRequestData data) {
        super(ApiKeys.PRODUCE, minVersion, maxVersion);
        this.data = data;
    }
}

最后调用 doSend 将数据设置到传输层,绑定可写事件,等待被发送到 broker。

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String destination = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
    Send send = request.toSend(header);
    selector.send(new NetworkSend(clientRequest.destination(), send));
}

public void send(NetworkSend send) {
    String connectionId = send.destinationId();
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
    channel.setSend(send);
}

public void setSend(NetworkSend send) {
    this.send = send;
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

服务端处理消息

服务端处理消息的是 handleProduceRequest,遍历 entriesPerPartition 找到对应的 partition 添加数据。

override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
     request.header.apiKey match {
        //....
        case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
     }
}

//->ReplicaManager#handleProduceAppend
//->ReplicaManager#appendRecords
//->ReplicaManager#appendToLocalLog
 private def appendToLocalLog(....){
    entriesPerPartition.map { case (topicPartition, records) =>
      val partition = getPartitionOrException(topicPartition)
      val info = partition.appendRecordsToLeader(records, origin,....)
    }
 }

def appendRecordsToLeader(....){
   val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch,....)
}

maybeRoll 决定是否创建新的日志段。

LogSegment#append 添加消息到文件中,并且根据情况更新索引文件。

logFileRecords,是对文件的包装。

//->UnifiedLog#append
 val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)

//-> LogSegment#append
public void append(long largestOffset, long largestTimestampMs, long shallowOffsetOfMaxTimestamp, MemoryRecords records) {

    long appendedBytes = log.append(records);

    offsetIndex().append(largestOffset, physicalPosition);
    timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
}

public static FileRecords open(File file,
                                boolean mutable,
                                boolean fileAlreadyExists,
                                int initFileSize,
                                boolean preallocate) throws IOException {
    FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
    int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
    return new FileRecords(file, channel, 0, end, false);
}

索引文件的写入

//OffsetIndex#append
public void append(long offset, int position) {
  mmap().putInt(relativeOffset(offset));
  mmap().putInt(position);
}

//TimeIndex#maybeAppend(long, long, boolean)
public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {
    MappedByteBuffer mmap = mmap();
    mmap.putLong(timestamp);
    mmap.putInt(relativeOffset(offset));
}

总结

本文概述了 Kafka 消息生产流程,从 Producer 初始化到消息发送及服务端接收的关键环节。首先,生产者通过配置初始化,包括分区器、序列化器组件,并利用累加器进行消息批量处理。元数据管理模块确保生产者获取最新的集群信息,而发送线程和网络客户端负责与服务端通信。生产者根据分区信息将消息写入目标服务器缓冲区,并等待响应。服务端处理并确认消息,以保证高效、可靠的数据传输。此流程实现了 Kafka 高吞吐、低延迟的消息传递机制。