Kafka 生产者模块(三):KafkaProducer 的 sender线程

    科技2022-07-10  82

            客户端发送消息的过程实际上是一个异步的过程,由 2 个线程协同执行,其中 1 个线程将待发送的消息写入缓冲区,另外 1 个线程(Sender 线程)负责定期定量将缓冲区中的数据投递给远端 Kafka 集群,并反馈投递结果。         这一过程由 Sender 线程负责执行,前面的分析中曾多次唤醒过该线程,下面来看一下其实现,位于 Sender 类中,该类实现了 java.lang.Runnable 接口,其 Sender#run 方法实现如下:

    public void run() { log.debug("Starting Kafka producer I/O thread."); // main loop, runs until close is called // 主循环,一直运行直到 KafkaProducer 被关闭 while (running) { try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); // okay we stopped accepting requests but there may still be // requests in the transaction manager, accumulator or waiting for acknowledgment, // wait until these are completed. /* 如果 KafkaProducer 被关闭,尝试发送剩余的消息 */ while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) { // 不是强制关闭 // 存在未发送或已发送待响应的请求 try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } // Abort the transaction if any commit or abort didn't go through the transaction manager's queue while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) { if (!transactionManager.isCompleting()) { log.info("Aborting incomplete transaction due to shutdown"); transactionManager.beginAbort(); } try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // 如果是强制关闭,忽略所有未发送和已发送待响应的请求 // We need to fail all the incomplete transactional requests and batches and wake up the threads waiting on // the futures. if (transactionManager != null) { log.debug("Aborting incomplete transactional requests due to forced shutdown"); transactionManager.close(); } log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { // 关闭网络连接 this.client.close(); } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); } void runOnce() { if (transactionManager != null) { try { transactionManager.resetProducerIdIfNeeded(); if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { transactionManager.transitionToFatalError( new KafkaException("The client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); } else if (maybeSendAndPollTransactionalRequest()) { return; } // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, time.milliseconds()); return; } else if (transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); } } long currentTimeMs = time.milliseconds(); long pollTimeout = sendProducerData(currentTimeMs); // 方法执行具体的网络请求和响应。 client.poll(pollTimeout, currentTimeMs); }

            Sender 线程的核心实现是runOnce中的sendProducerData与poll方法,sendProducerData把实际要发的消息封装好,最后调用poll执行网络IO与后续处理。下面两篇会分别详细介绍这两个方法。

    Processed: 0.010, SQL: 8