Kafka生产者架构(三) - Sender

    科技2022-07-13  134

    文章目录

    前言源码分析run()runOnce()sendProducerData(...)sendProduceRequest(...)handleProduceResponse(...)


    前言

    本文解析生产者流程图中的步骤5、6、7。

    概述:Sender线程负责将ProducerBatch列表构建ProducerRequest,然后用来构造ClientRequest,最后将其交给NetworkClient处理。


    源码分析


    run()

    首先看下Sender的定义。

    Sender:

    The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.


    接下来重点看下Sender的run方法的处理逻辑。

    使用一个volatile boolean类型的变量running,判断Sender线程是否在运行。

    如果running变量为true,则一直循环调用runOnce()方法。


    (除非强制关闭)在Sender线程关闭之后,仍发送剩下的消息。

    forceClose: 用于标识是否强制关闭。Accumulator#hasUndrained():检查批次中是否有未排空的队列。NetworkClient#inFlightRequestCount():检查是否还有等待确认的请求。hasPendingTransactionalRequests():检查是否有未完成的事务请求,并且事务状态处于进行中、正在提交、正在中止、有可中止的错误。

    对进行中的事务,进行中断。并且仍发送剩下的消息。


    如果强制关闭,则会关闭TransactionManager、终止消息的追加(清空未完成的批次),关闭NetworkClient。


    runOnce()

    TransactionManager#resetProducerIdIfNeeded():对于非事务类型的生产者,如果之前运行失效的分区没有被完全解析成功,就会重置producer id、epoch等信息。TransactionManager#isTransactional():判断是否是事务的。实际上是判断transaction id是否为空。maybeWaitForProducerId():选择合适的节点,发送InitProducerIdRequest,如果响应没有错误,就更新ProducerIdAndEpoch。maybeSendAndPollTransactionalRequest():事务请求被发送、拉取,或者一个FindCoordinator请求入队。

    maybeAbortBatches(…):如果IncompleteBatches非空,则终止MemoryRecordsBuilder,也就是终止记录的追加。然后执行回调。如果批次不是isSplitBatch(boolean), return buffers to the pool。RecordAccumulator#abortUndrainedBatches(…):终止MemoryRecordsBuilder。然后执行回调。如果批次不是isSplitBatch(boolean), return buffers to the pool。TransactionManager.authenticationFailed(…):对于验证失败的处理。

    发送消息到Kafka Cluster。


    这个会在下一篇NetworkClient文章中详细分析该方法。


    sendProducerData(…)

    由元数据获取集群信息。获取就绪检查结果。如果有leader无法识别的分区,强制更新元数据。

    获取就绪节点迭代器,进行迭代。如果指定节点没有准备就绪,就删除该节点。获取拉取的延迟时间,从而计算没有就绪的超时时间。

    guaranteeMessageOrder:生产者是否保证在broker上的消息顺序性。 将给定节点的所有数据转化到Map集合中。将批次添加到Inflight批次中。遍历,将TopicPartition放到muted的map中。

    重置下一个批次的失效时间。获取失效的inflight批次列表。获取放置在Accumulator的失效批次列表。将失效的inflight批次列表 加入到 放置在Accumulator的失效批次列表。

    遍历失效的批次,对于每一个失效批次,TransactionManager做失败处理。如果失效的批次在重试,标记给定TopicPartition为未解析的状态,确保在解析的过程中不会有新的批次被drained。

    计算pollTimeout。批次发送Produce Request。

    发送Produce Request。


    sendProduceRequest(…)

    这个方法篇幅比较长。实际上由ProducerBatch列表创建ProducerRequest,然后用来构造ClientRequest,将其发送给NetworkClient。


    首先看下第一部分的逻辑。

    要求给定的ProducerBatch列表不为空。从ApiVersions获取maxUsableProduceMagic,分别与ProducerBatch列表的元素的magic进行比较,取最小值作为minUsedMagic。对ProducerBatch列表遍历。获取每个列表元素的TopicPartition。每个列表元素构造一个MemoryRecords实例,然后对该实例的magic与minUsedMagic进行比较,如果不相等,则进行转换。使用produceRecordsByPartition存储每个列表元素的TopicPartition、MemoryRecords。使用recordsByPartition存储每个列表元素的TopicPartition、ProducerBatch。

    接着看第二部分的逻辑:

    主要是使用一些给定的参数构造ProduceRequest。进而用来构造ClientRequest实例。最后就是调用NetworkClient#send(…)方法。


    handleProduceResponse(…)

    从上面方法中,看到有一个RequestCompletionHandler,它是一个回调接口。它的onComplete(…)方法是当请求完成,并且收到响应时触发。

    接着看它给予的回调处理方法。如下:

    对于handleProduceResponse(…)方法,这里只看下正常情况(没有异常)的处理逻辑,如下:


    接着看completeBatch(…)方法。

    修改状态。触发回调。解除线程阻塞状态。将该ProducerBatch从InflightBatches中移除。释放资源。


    对于ProducerBatch#done(…)方法:

    回调的触发:

    解除阻塞状态:

    回过头看下maybeRemoveAndDeallocateBatch(…)方法:

    从InflightBatches中移除:

    释放资源:


    Processed: 0.011, SQL: 8