kafka 消息生产过程
Kafka 是一款非常优秀的消息中间件,流式处理平台。在本文中,我们将深入分析 Kafka 消息生产过程的源码实现,揭示生产者从消息生成到发送的完整流程。 本质旨在描述消息生产和传输的大致流程,具体诸多细节需要读者自行学习。
生产者
首先介绍 Kafka 生产者的核心组件,包括 ProducerConfig 的配置参数解析、消息累加器(RecordAccumulator)的缓冲机制、元数据管理、发送线程等。
实例代码:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaProducer<>(props);
上面是客户端使用的示例代码,首先需要创建配置,然后创建 kafka 客户端类 KafkaProducer
。
ProducerConfig
kafka 定义了非常多的配置满足使用者的定制化需求,文档地址。
生产者的配置定义在 ProducerConfig
类中,并且每个配置都有对应的文档说明。
public static final String BATCH_SIZE_CONFIG = "batch.size";
private static final String BATCH_SIZE_DOC = "The producer will attempt ....."
客户端配置的初始化本质是 ProducerConfig
类的初始化。
return new KafkaProducer<>(props);
//props -> Utils.propsToMap(properties)
public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs,
keySerializer, valueSerializer)),..... )
}
public ProducerConfig(Map<String, Object> props) { super(CONFIG, props); }
调用父类初始化方法,CONFIG
是静态遍历,类加载时初始化,为每个配置的属性指定默认值。
单个配置由 ConfigKey
定义。
CONFIG = new ConfigDef()
.define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC)
.....
.defineXXX();
public static class ConfigKey {
public final String name;
public final Type type;
public final String documentation;
public final Object defaultValue;
}
在父类方法中将默认配置和用户定义配置进行合并。
public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
this(definition, originals, Collections.emptyMap(), true);
}
public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
this.values = definition.parse(this.originals);
this.values.putAll(configUpdates);
definition.parse(this.values);
this.definition = definition;
}
生产者初始化
下面是生产者初始化的核心方法,config 的值是上文刚刚创建的,time 的值是 Time.SYSTEM
,其余参数均为 null
。
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer, Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time);
KafkaProducer
有很多核心组件,首先简单罗列组件,使用时详细介绍实现原理。
分区器
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
boolean enableAdaptivePartitioning = partitioner == null &&
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
RecordAccumulator.PartitionerConfig partitionerConfig =
new RecordAccumulator.PartitionerConfig( enableAdaptivePartitioning,
config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG))
序列化
this.keySerializer = config.getConfiguredInstance(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.valueSerializer = config.getConfiguredInstance(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
拦截器
List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.configuredInterceptors(
config, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
压缩算法
this.compression = configureCompression(config);
累加器
this.accumulator = new RecordAccumulator(logContext,
batchSize, compression, lingerMs(config), retryBackoffMs,
retryBackoffMaxMs, deliveryTimeoutMs, partitionerConfig, metrics,
PRODUCER_METRIC_GROUP_NAME, time, apiVersions, transactionManager,
new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
元数据管理
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config);
this.metadata = new ProducerMetadata(retryBackoffMs,
retryBackoffMaxMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
this.metadata.bootstrap(addresses);
发送者线程
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
网络层客户端
public static NetworkClient createNetworkClient(AbstractConfig config,
String clientId,
Metrics metrics,
String metricsGroupPrefix,
LogContext logContext,
ApiVersions apiVersions,
Time time,..);
元数据管理
元数据
客户端需要获取 kafka 的集群信息以便在发送数据的时候使用。这些数据包括集群节点信息、topic 、partition、partition、leader 等。
维护元数据的类并不做过多介绍,只需要知道这些数据需要在客户端维护,并且需要根据情况进行更新。
public class Metadata implements Closeable {
private volatile MetadataSnapshot metadataSnapshot = MetadataSnapshot.empty();
private List<InetSocketAddress> bootstrapAddresses;
}
public class MetadataSnapshot {
private final String clusterId;
private final Map<Integer, Node> nodes;
private final Node controller;
private final Map<TopicPartition, PartitionMetadata> metadataByPartition;
private final Map<String, Uuid> topicIds;
private final Map<Uuid, String> topicNames;
private Cluster clusterInstance;
}
public final class Cluster {
private final List<Node> nodes;
private final Node controller;
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
private final Map<String, List<PartitionInfo>> partitionsByTopic;
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;
private final Map<String, Uuid> topicIds;
private final Map<Uuid, String> topicNames;
}
public final class TopicPartition implements Serializable {
private final int partition;
private final String topic;
}
拉取元数据
发送请求
在 sendInternalMetadataRequest
发送元数据同步请求。
// -> Sender#run -> NetworkClient#poll ->
//-> NetworkClient.DefaultMetadataUpdater#maybeUpdate(long, org.apache.kafka.common.Node)
//-> NetworkClient#sendInternalMetadataRequest
void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
doSend(clientRequest, true, now);
}
Selector#send
将请求设置到 KafkaChannel
中等待发送。
public void send(NetworkSend send){
String connectionId = send.destinationId();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
channel.setSend(send);
}
public void setSend(NetworkSend send) {
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
Selectable#poll
会处理 channel 的事件将请求发送出去。
在 kafka 中,对所有请求进行了定义。
public enum ApiKeys {
METADATA(ApiMessageType.METADATA),
//omit
}
public Builder(MetadataRequestData data) {
super(ApiKeys.METADATA);
this.data = data;
}
处理响应
handleMetadataResponse
对响应返回进行处理,最终数据都存储在 MetadataSnapshot
中。
//-> NetworkClient#handleCompletedReceives
//-> NetworkClient.DefaultMetadataUpdater#handleSuccessfulResponse
//->Metadata#update
public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) {
this.metadataSnapshot = handleMetadataResponse(response, isPartialUpdate, nowMs);
}
private MetadataSnapshot handleMetadataResponse(MetadataResponse metadataResponse,...) {
for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) {
String topicName = metadata.topic();
Uuid topicId = metadata.topicId();
for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) {
updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs(), topicId, oldTopicId)
.ifPresent(partitions::add);
}
}
}
PartitionMetadata
存储了 partition信息,leader partition、ISR 等。
public static class PartitionMetadata {
public final TopicPartition topicPartition;
public final Optional<Integer> leaderId;
public final Optional<Integer> leaderEpoch;
public final List<Integer> replicaIds;
public final List<Integer> inSyncReplicaIds;
public final List<Integer> offlineReplicaIds;
}
服务端
当请求发送出去,broker 作为服务端就开始处理请求。KafkaApis#handle
是所有请求的统一入口。
KafkaApis#handleTopicMetadataRequest
获取元数据并且返回给客户端。
//kafka.server.KafkaApis#handle
def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
request.header.apiKey match {
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
}
}
def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = {
val topicMetadata = getTopicMetadata(request, metadataRequest.isAllTopics, allowAutoCreation, authorizedTopics,
request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners)
val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName)
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
MetadataResponse.prepareResponse(
brokers.toList.asJava,
clusterId,......
completeTopicMetadata.asJava,
))
}
各种代码细节在后文介绍。
消息累加器
KafkaProducer#doSend
是消息发送的逻辑,但是实际这里是指把消息发送到累加器缓冲区。
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
//刷新元数据
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
//计算 partition
int partition = partition(record, serializedKey, serializedValue, cluster);
//将消息添加到累加器
RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
// 唤醒发送线程
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
}
计算 partition
partition 由四种方式可以计算:
- 应用程序已经明确指定。
- 应用程序通过 partitioner 指定。
- 通过 serializedKey 计算。
- 默认,RecordMetadata.UNKNOWN_PARTITION,累加器通过原生逻辑指定。
Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION, which means that the RecordAccumulator would pick a partition using built-in logic (which may take into account broker load, the amount of data produced to each partition, etc.).
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
if (record.partition() != null)
return record.partition();
if (partitioner != null) {
int customPartition = partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(...));
}
return customPartition;
}
if (serializedKey != null && !partitionerIgnoreKeys) {
// hash the keyBytes to choose a partition
return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
} else {
return RecordMetadata.UNKNOWN_PARTITION;
}
}
累加器结构
在累加器中,根据 topic 和 partition 对消息分门别类进行累加,最终储存消息的是 ProducerBatch
。累加器缓冲区是一个双端队列。
public class RecordAccumulator {
private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap =
new CopyOnWriteMap<>();
}
private static class TopicInfo {
public final ConcurrentMap<Integer /*partition*/, Deque<ProducerBatch>> batches = new CopyOnWriteMap<>();
public final BuiltInPartitioner builtInPartitioner;
public TopicInfo(BuiltInPartitioner builtInPartitioner) {
this.builtInPartitioner = builtInPartitioner;
}
}
累加消息
append
通过 topic 和 partition 获取到 Deque<ProducerBatch>
。
tryAppend
找到 ProducerBatch
。
public RecordAppendResult append(String topic, int partition, long timestamp,
byte[] key, byte[] value,...){
//获取 topic
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, ......);
while(true){
effectivePartition = partition;
Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);
}
}
}
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
ProducerBatch last = deque.peekLast();
if (last != null) {
int initialBytes = last.estimatedSizeInBytes();
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs);
int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes);
}
return null;
}
通过 writeTo
将 key、value、offsetDelta、timestampDelta 依次写入到缓冲区。
appendStream
理解成一段连续的字节数组。
//-> ProducerBatch#tryAppend -> MemoryRecordsBuilder#append()
//-> MemoryRecordsBuilder#appendWithOffset()
//->MemoryRecordsBuilder#appendDefaultRecord
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
int offsetDelta = (int) (offset - baseOffset);
long timestampDelta = timestamp - baseTimestamp;
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}
public static int writeTo(DataOutputStream out, int offsetDelta,
long timestampDelta, ByteBuffer key,
ByteBuffer value,){
ByteUtils.writeVarlong(timestampDelta, out);
ByteUtils.writeVarint(offsetDelta, out);
int keySize = key.remaining();
ByteUtils.writeVarint(keySize, out);
Utils.writeTo(out, key, keySize);
int valueSize = value.remaining();
ByteUtils.writeVarint(valueSize, out);
Utils.writeTo(out, value, valueSize);
ByteUtils.writeVarint(headers.length, out);
for (Header header : headers) {
ByteUtils.writeVarint(utf8Bytes.length, out);
out.write(utf8Bytes);
out.write(headerValue);
}
}
空间不足添加失败时,需要新创建 ProducerBatch 对象。
int size = Math.max(this.batchSize,....);
buffer = free.allocate(size, maxTimeToBlock);
private RecordAppendResult appendNewBatch(String topic,
int partition,
Deque<ProducerBatch> dq,...){
//omit
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic());
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callbacks, nowMs));
}
最后唤醒发送线程
if (result.batchIsFull || result.newBatchCreated) {
this.sender.wakeup();
}
发送线程
发送线程循环处理可发送的数据。
public class Sender implements Runnable {
public void run() {
while (running) {
runOnce();
}
}
}
void runOnce() {
//....
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
目标服务器
通过遍历累加器找到目标 partition 的leader 节点。
//-> Sender#sendProducerData-> RecordAccumulator#ready
public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
for (Map.Entry<String, TopicInfo> topicInfoEntry : this.topicInfoMap.entrySet()) {
final String topic = topicInfoEntry.getKey();
nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics);
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, String topic,...){
ConcurrentMap<Integer, Deque<ProducerBatch>> batches = topicInfo.batches;
for (Map.Entry<Integer, Deque<ProducerBatch>> entry : batches.entrySet()) {
TopicPartition part = new TopicPartition(topic, entry.getKey());
Node leader = metadataSnapshot.cluster().leaderFor(part);
//.....
readyNodes.add(leader);
}
}
为目标 leader 节点所在的 broker 准备好连接。
Iterator<Node> iter = result.readyNodes.iterator();
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
//..
}
}
收集批量数据
以 broker 为维度收集发送数据。
public Map<Integer, List<ProducerBatch>> drain(MetadataSnapshot metadataSnapshot, Set<Node> nodes, int maxSize, long now) {
Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
for (Node node : nodes) {
List<ProducerBatch> ready = drainBatchesForOneNode(metadataSnapshot, node, maxSize, now);
batches.put(node.id(), ready);
}
return batches;
}
写入数据到缓冲区
ProduceRequest
包装批量数据为请求结构,callback
绑定回调用函数,ApiKeys.PRODUCE
指定 api 类型,client.send
发送请求。
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
private void sendProduceRequest(....., List<ProducerBatch> batches) {
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
new ProduceRequestData() .setTopicData(tpd));
RequestCompletionHandler callback = ....;
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, ....., callback);
client.send(clientRequest, now);
}
public static class Builder extends AbstractRequest.Builder<ProduceRequest> {
private final ProduceRequestData data;
public Builder(..... ProduceRequestData data) {
super(ApiKeys.PRODUCE, minVersion, maxVersion);
this.data = data;
}
}
最后调用 doSend
将数据设置到传输层,绑定可写事件,等待被发送到 broker。
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
Send send = request.toSend(header);
selector.send(new NetworkSend(clientRequest.destination(), send));
}
public void send(NetworkSend send) {
String connectionId = send.destinationId();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
channel.setSend(send);
}
public void setSend(NetworkSend send) {
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
服务端处理消息
服务端处理消息的是 handleProduceRequest
,遍历 entriesPerPartition
找到对应的 partition 添加数据。
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
request.header.apiKey match {
//....
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
}
}
//->ReplicaManager#handleProduceAppend
//->ReplicaManager#appendRecords
//->ReplicaManager#appendToLocalLog
private def appendToLocalLog(....){
entriesPerPartition.map { case (topicPartition, records) =>
val partition = getPartitionOrException(topicPartition)
val info = partition.appendRecordsToLeader(records, origin,....)
}
}
def appendRecordsToLeader(....){
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch,....)
}
maybeRoll
决定是否创建新的日志段。
LogSegment#append
添加消息到文件中,并且根据情况更新索引文件。
log
是 FileRecords
,是对文件的包装。
//->UnifiedLog#append
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
//-> LogSegment#append
public void append(long largestOffset, long largestTimestampMs, long shallowOffsetOfMaxTimestamp, MemoryRecords records) {
long appendedBytes = log.append(records);
offsetIndex().append(largestOffset, physicalPosition);
timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar());
}
public static FileRecords open(File file,
boolean mutable,
boolean fileAlreadyExists,
int initFileSize,
boolean preallocate) throws IOException {
FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate);
int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
return new FileRecords(file, channel, 0, end, false);
}
索引文件的写入
//OffsetIndex#append
public void append(long offset, int position) {
mmap().putInt(relativeOffset(offset));
mmap().putInt(position);
}
//TimeIndex#maybeAppend(long, long, boolean)
public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) {
MappedByteBuffer mmap = mmap();
mmap.putLong(timestamp);
mmap.putInt(relativeOffset(offset));
}
总结
本文概述了 Kafka 消息生产流程,从 Producer 初始化到消息发送及服务端接收的关键环节。首先,生产者通过配置初始化,包括分区器、序列化器组件,并利用累加器进行消息批量处理。元数据管理模块确保生产者获取最新的集群信息,而发送线程和网络客户端负责与服务端通信。生产者根据分区信息将消息写入目标服务器缓冲区,并等待响应。服务端处理并确认消息,以保证高效、可靠的数据传输。此流程实现了 Kafka 高吞吐、低延迟的消息传递机制。