Kafka 生产者模块(二):KafkaProducer 的 send 接口发送消息

    科技2022-07-10  93

            send接口会把消息经过处理后,放在一个缓存中,由后台sender线程从缓存中取出,然后发送到服务端,这一篇介绍放入缓存的send接口。

    了解了 KafkaProducer 的字段定义和对象的构造过程之后,下面正式开始对消息收集的过程进行分析,相关实现位于 KafkaProducer#send 方法中:

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions // 遍历注册的拦截器对待发送的消息执行拦截修改 ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); // 调用 doSend 方法开始发送消息 return doSend(interceptedRecord, callback); }

    方法 ProducerInterceptor#doSend 用于收集消息的过程

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { throwIfProducerClosed(); // first make sure the metadata for the topic is available ClusterAndWaitTime clusterAndWaitTime; try { // 1. 获取 kafka 集群元数据信息,如果当前请求的是新 topic,或者指定的分区超过已知的分区范围,则会触发更新集群元数据信息 clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; // 2 基于注册的序列化器对 key 执行序列化 byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } // 3. 基于注册的序列化器对 value 执行序列化 byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } // 4. 为当前消息选择合适的分区,如果未明确指定的话,则基于注册的分区器为当前消息计算分区 int partition = partition(record, serializedKey, serializedValue, cluster); // 消息投递的目标 topic 分区 tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); /* 5. 将消息追加到消息收集器(RecordAccumulator)中 */ Header[] headers = record.headers().toArray(); // 计算当前消息大小,并校验消息是否过大 int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); // 如果未明确为当前消息指定时间戳,则设置为当前时间戳 long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } // producer callback will make sure to call both 'callback' and interceptor callback Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } // 追加消息到收集器中 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true); if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false); } if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); /* 6. 条件性唤醒消息发送线程 */ if (result.batchIsFull || result.newBatchCreated) { // 如果队列中不止一个 RecordBatch,或者最后一个 RecordBatch 满了,或者有创建新的 RecordBatch,则唤醒 Sender 线程发送消息 log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; } }

    waitOnMetadata 

            获取集群元数据信息,如果感知到本地缓存的集群元数据信息已经过期,则会通知 Sender 线程进行更新。

    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException { // add topic to metadata topic list if it is not there already and reset expiry // 获取当前集群信息 Cluster cluster = metadata.fetch(); if (cluster.invalidTopics().contains(topic)) throw new InvalidTopicException(topic); // 添加 topic 到集合中,如果是新 topic,标记需要更新集群元数据信息,即把metadata中的 needUpdate 置为true metadata.add(topic); Integer partitionsCount = cluster.partitionCountForTopic(topic); // Return cached metadata if we have it, and if the record's partition is either undefined // or within the known partition range // 如果参数未指定分区,或指定的分区在当前记录的分区范围之内,则返回历史集群信息 if (partitionsCount != null && (partition == null || partition < partitionsCount)) return new ClusterAndWaitTime(cluster, 0); /* 否则,当前缓存的集群元数据信息可能已经过期,需要进行更新 */ long begin = time.milliseconds(); long remainingWaitMs = maxWaitMs; long elapsed; // Issue metadata requests until we have metadata for the topic and the requested partition, // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata // is stale and the number of partitions for this topic has increased in the meantime. do { if (partition != null) { log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic); } else { log.trace("Requesting metadata update for topic {}.", topic); } metadata.add(topic); // 更新 Metadata 的 needUpdate 字段,并获取当前元数据的版本号 int version = metadata.requestUpdate(); // 唤醒 sender 线程,由 sender 线程负责更新元数据信息 sender.wakeup(); try { // 等待元数据更新完成 metadata.awaitUpdate(version, remainingWaitMs); } catch (TimeoutException ex) { // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs // 等待超时 throw new TimeoutException( String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs)); } // 获取更新后的集群信息 cluster = metadata.fetch(); elapsed = time.milliseconds() - begin; if (elapsed >= maxWaitMs) { // 等待超时 throw new TimeoutException(partitionsCount == null ? String.format("Topic %s not present in metadata after %d ms.", topic, maxWaitMs) : String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.", partition, topic, partitionsCount, maxWaitMs)); } metadata.maybeThrowExceptionForTopic(topic); // 更新剩余等待时间 remainingWaitMs = maxWaitMs - elapsed; // 获取指定 topic 的分区数目 partitionsCount = cluster.partitionCountForTopic(topic); // // 更新集群信息失败,继续重试,或者更新到底 partition 数目,小于指定要发送的partition数目,则重新更新metadata } while (partitionsCount == null || (partition != null && partition >= partitionsCount)); return new ClusterAndWaitTime(cluster, elapsed); }

    RecordAccumulator#append 方法

            send方法最终调用 RecordAccumulator#append 方法将消息缓存到收集器 RecordAccumulator 中。

    public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock, boolean abortOnNewBatch) throws InterruptedException { // We keep track of the number of appending thread to make sure we do not miss batches in // abortIncompleteBatches(). // 记录正在向收集器中追加消息的线程数 appendsInProgress.incrementAndGet(); ByteBuffer buffer = null; if (headers == null) headers = Record.EMPTY_HEADERS; try { // check if we have an in-progress batch // 获取当前 topic 分区对应的 Deque,如果不存在则创建一个 Deque<ProducerBatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) // producer 已经被关闭了,抛出异常 throw new KafkaException("Producer closed while send in progress"); // 向 Deque 中最后一个 RecordBatch 追加 Record,并返回对应的 RecordAppendResult 对象 // 如果追加失败,一般都是因为该 RecordBatch 没有足够的空间足以容纳 RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); // 追加成功,直接返回 if (appendResult != null) return appendResult; } // we don't have an in-progress record batch try to allocate a new batch if (abortOnNewBatch) { // Return a result that will cause another call to append. return new RecordAppendResult(null, false, false, true); } byte maxUsableMagic = apiVersions.maxUsableProduceMagic(); /* 追加 Record 失败,尝试申请新的 buffer */ int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); // 申请新的 buffer buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new KafkaException("Producer closed while send in progress"); // 可能中间有空闲的batch出来,所以再次尝试向 Deque 中最后一个 RecordBatch 追加 Record RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // 追加成功则返回,同时在finally归还之前申请的 buffer // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } /* 仍然追加失败,创建一个新的 RecordBatch 进行追加 */ MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); // 在新创建的 RecordBatch 中追加 Record FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); // 追加到未完成的集合中 incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; // 封装成 RecordAppendResult 对象返回 return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false); } } finally { // 归还之前申请的 buffer if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }

    上述过程多次调用到 RecordAccumulator#tryAppend 方法,下面来看一下该方法的实现:

    private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) { // 获取 deque 的最后一个 RecordBatch ProducerBatch last = deque.peekLast(); if (last != null) { // 尝试往该 RecordBatch 末尾追加消息 FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()); if (future == null) // 追加失败 last.closeForRecordAppends(); else // 追加成功,将结果封装成 RecordAppendResult 对象返回 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false); } return null; }

    最终调用ProducerBatch的tryAppend

    public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { // 检测是否还有多余的空间容纳该消息 if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { // 没有多余的空间则直接返回,后面会尝试申请新的空间 return null; } else { // 添加当前消息到 MemoryRecords,并返回消息对应的 CRC32 校验码 Long checksum = this.recordsBuilder.append(timestamp, key, value, headers); // 更新最大 record 字节数 this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers)); // 更新最后一次追加记录时间戳 this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length, Time.SYSTEM); // we have to keep every future returned to the users in case the batch needs to be // split to several new batches and resent. // 如果指定了 Callback,将 Callback 和 FutureRecordMetadata 封装到 Trunk 中 thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } }

     

    Processed: 0.011, SQL: 8