本文解析生产者流程图中的步骤5、6、7。
概述:Sender线程负责将ProducerBatch列表构建ProducerRequest,然后用来构造ClientRequest,最后将其交给NetworkClient处理。
首先看下Sender的定义。
Sender:
The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
接下来重点看下Sender的run方法的处理逻辑。
使用一个volatile boolean类型的变量running,判断Sender线程是否在运行。
如果running变量为true,则一直循环调用runOnce()方法。
(除非强制关闭)在Sender线程关闭之后,仍发送剩下的消息。
forceClose: 用于标识是否强制关闭。Accumulator#hasUndrained():检查批次中是否有未排空的队列。NetworkClient#inFlightRequestCount():检查是否还有等待确认的请求。hasPendingTransactionalRequests():检查是否有未完成的事务请求,并且事务状态处于进行中、正在提交、正在中止、有可中止的错误。对进行中的事务,进行中断。并且仍发送剩下的消息。
如果强制关闭,则会关闭TransactionManager、终止消息的追加(清空未完成的批次),关闭NetworkClient。
发送消息到Kafka Cluster。
这个会在下一篇NetworkClient文章中详细分析该方法。
计算pollTimeout。批次发送Produce Request。
发送Produce Request。
这个方法篇幅比较长。实际上由ProducerBatch列表创建ProducerRequest,然后用来构造ClientRequest,将其发送给NetworkClient。
首先看下第一部分的逻辑。
要求给定的ProducerBatch列表不为空。从ApiVersions获取maxUsableProduceMagic,分别与ProducerBatch列表的元素的magic进行比较,取最小值作为minUsedMagic。对ProducerBatch列表遍历。获取每个列表元素的TopicPartition。每个列表元素构造一个MemoryRecords实例,然后对该实例的magic与minUsedMagic进行比较,如果不相等,则进行转换。使用produceRecordsByPartition存储每个列表元素的TopicPartition、MemoryRecords。使用recordsByPartition存储每个列表元素的TopicPartition、ProducerBatch。接着看第二部分的逻辑:
主要是使用一些给定的参数构造ProduceRequest。进而用来构造ClientRequest实例。最后就是调用NetworkClient#send(…)方法。
从上面方法中,看到有一个RequestCompletionHandler,它是一个回调接口。它的onComplete(…)方法是当请求完成,并且收到响应时触发。
接着看它给予的回调处理方法。如下:
对于handleProduceResponse(…)方法,这里只看下正常情况(没有异常)的处理逻辑,如下:
接着看completeBatch(…)方法。
修改状态。触发回调。解除线程阻塞状态。将该ProducerBatch从InflightBatches中移除。释放资源。
对于ProducerBatch#done(…)方法:
回调的触发:
解除阻塞状态:
回过头看下maybeRemoveAndDeallocateBatch(…)方法:
从InflightBatches中移除:
释放资源: