NetworkClient#poll 方法执行具体的网络请求和响应。下面来看一下 NetworkClient#poll 方法的具体实现:
public List<ClientResponse> poll(long timeout, long now) { ensureActive(); if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, // handle them immediately without waiting for Selector#poll. List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } /* * 如果距离上次更新超过指定时间,且存在负载小的目标节点, * 则创建 MetadataRequest 请求更新本地缓存的集群元数据信息,并在下次执行 poll 操作时一并送出 */ long metadataTimeout = metadataUpdater.maybeUpdate(now); try { /* 发送网络请求 */ this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions /* 处理服务端响应 */ long updatedNow = this.time.milliseconds(); // 响应队列 List<ClientResponse> responses = new ArrayList<>(); // 对于发送成功且不期望服务端响应的请求,创建本地的响应对象添加到 responses 队列中 handleCompletedSends(responses, updatedNow); /* * 获取并解析服务端响应 * - 如果是更新集群元数据对应的响应,则更新本地缓存的集群元数据信息 * - 如果是更新 API 版本的响应,则更新本地缓存的目标节点支持的 API 版本信息 * - 否则,获取 ClientResponse 添加到 responses 队列中 */ handleCompletedReceives(responses, updatedNow); // 处理连接断开的请求,构建对应的 ClientResponse 添加到 responses 列表中,并标记需要更新集群元数据信息 handleDisconnections(responses, updatedNow); // 处理 connections 列表,更新相应节点的连接状态 handleConnections(); // 如果需要更新本地的 API 版本信息,则创建对应的 ApiVersionsRequest 请求,并在下次执行 poll 操作时一并送出 handleInitiateApiVersionRequests(updatedNow); // 遍历获取 inFlightRequests 中的超时请求,构建对应的 ClientResponse 添加到 responses 列表中,并标记需要更新集群元数据信息 handleTimedOutRequests(responses, updatedNow); // 遍历处理响应对应的 onComplete 方法 // 本质上就是在调用注册的 RequestCompletionHandler#onComplete 方法 completeResponses(responses); return responses; }更新集群元数据
首先来看更新本地缓存的集群元数据信息的过程( 步骤 1 ),前面曾多次提及到更新集群元数据的场景,而这些更新操作实际上都是标记集群元数据需要更新,真正执行更新的操作则发生在这里。实现位于 DefaultMetadataUpdater#maybeUpdate 方法中:
public long maybeUpdate(long now) { // should we update our metadata? // 获取下次更新集群信息的时间戳 long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now); // 检查是否已经发送了 MetadataRequest 请求 long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0; // 计算当前距离下次发送 MetadataRequest 请求的时间差 long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch); if (metadataTimeout > 0) { // 如果时间还未到,则暂时不更新 return metadataTimeout; } // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. // 寻找负载最小的可用节点,如果没有可用的节点则返回 null Node node = leastLoadedNode(now); if (node == null) { log.debug("Give up sending metadata request since no node is available"); return reconnectBackoffMs; } // 检查是否允许向目标节点发送请求,如果允许则创建 MetadataRequest 请求,并在下次执行 poll 操作时一并送出 return maybeUpdate(now, node); } private long maybeUpdate(long now, Node node) { String nodeConnectionId = node.idString(); // 如果允许向该节点发送请求 if (canSendRequest(nodeConnectionId, now)) { Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(); // 创建集群元数据请求 MetadataRequest 对象 MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder; log.debug("Sending metadata request {} to node {}", metadataRequest, node); // 将 MetadataRequest 包装成 ClientRequest 进行发送,在下次执行 poll 操作时一并发送 sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now); this.inProgressRequestVersion = requestAndVersion.requestVersion; return defaultRequestTimeoutMs; } /* 不允许向目标节点发送请求的场景 */ // If there's any connection establishment underway, wait until it completes. This prevents // the client from unnecessarily connecting to additional nodes while a previous connection // attempt has not been completed. if (isAnyNodeConnecting()) { // 如果存在到目标节点的连接,则等待一会,无需再次尝试创建新的连接 // Strictly the timeout we should return here is "connect timeout", but as we don't // have such application level configuration, using reconnect backoff instead. return reconnectBackoffMs; } /* 如果不存在到目标节点连接 */ if (connectionStates.canConnect(nodeConnectionId, now)) { // 如果允许创建到目标节点的连接,则初始化连接 // We don't have a connection to this node right now, make one log.debug("Initialize connection to node {} for sending metadata request", node); initiateConnect(node, now);// 初始化连接 return reconnectBackoffMs; } // connected, but can't send more OR connecting // In either case, we just need to wait for a network event to let us know the selected // connection might be usable again. return Long.MAX_VALUE; }handleAbortedSends
将 NetworkClient#abortedSends 字段中记录的 ClientResponse 响应对象添加到结果集合中,并清空该字段。
private void handleAbortedSends(List<ClientResponse> responses) { responses.addAll(abortedSends); abortedSends.clear(); }handleCompletedSends 遍历客户端已经发送成功的请求,对于那些不期望服务端响应的请求可以直接创建对应的 ClientResponse 响应对象,并添加到结果集合中。
private void handleCompletedSends(List<ClientResponse> responses, long now) { // if no response is expected then when the send is completed, return it for (Send send : this.selector.completedSends()) { // 获取缓存到 inFlightRequests 集合中的请求对象 InFlightRequest request = this.inFlightRequests.lastSent(send.destination()); // 检测请求是否期望响应 if (!request.expectResponse) { // 当前请求不期望服务端响应,则从 inFlightRequests 集合中删除 this.inFlightRequests.completeLastSent(send.destination()); // 为当前请求生成 ClientResponse 对象 responses.add(request.completed(null, now)); } } }handleCompletedReceives 获取并解析服务端的响应结果,并依据响应类型分别处理。
private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { // 获取返回响应的节点 ID String source = receive.source(); // 从 inFlightRequests 集合中获取缓存的 ClientRequest 对象 InFlightRequest req = inFlightRequests.completeNext(source); Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); if (log.isTraceEnabled()) { log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination, req.header.apiKey(), req.header.correlationId(), responseStruct); } // If the received response includes a throttle delay, throttle the connection. // 解析响应 AbstractResponse body = AbstractResponse. parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion()); maybeThrottle(body, req.header.apiVersion(), req.destination, now); if (req.isInternalRequest && body instanceof MetadataResponse) // 如果是更新集群元数据对应的响应,则更新本地的缓存的集群元数据信息 metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) // 如果是更新 API 版本的响应,则更新本地缓存的目标节点支持的 API 版本信息 handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); else // 否则,获取 ClientResponse 响应对象添加到队列中 responses.add(req.completed(body, now)); } }handleDisconnections 该方法会调用 Selector#disconnected 方法获取断开连接的节点 ID 集合,并更新相应节点的连接状态为 DISCONNECTED,同时会清空本地缓存的与该节点相关的数据,最终创建一个 disconnected 类型的 ClientResponse 对象添加到结果集合中。如果这一步确实发现了已断开的连接,则标记需要更新本地缓存的节点元数据信息。
private void handleDisconnections(List<ClientResponse> responses, long now) { for (Map.Entry<String, ChannelState> entry : this.selector.disconnected().entrySet()) { String node = entry.getKey(); log.debug("Node {} disconnected.", node); processDisconnection(responses, node, now, entry.getValue()); } } private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState) { // 更新相应节点的连接状态为 DISCONNECTED connectionStates.disconnected(nodeId, now); apiVersions.remove(nodeId); nodesNeedingApiVersionsFetch.remove(nodeId); switch (disconnectState.state()) { case AUTHENTICATION_FAILED: AuthenticationException exception = disconnectState.exception(); connectionStates.authenticationFailed(nodeId, now, exception); log.error("Connection to node {} ({}) failed authentication due to: {}", nodeId, disconnectState.remoteAddress(), exception.getMessage()); break; case AUTHENTICATE: log.warn("Connection to node {} ({}) terminated during authentication. This may happen " + "due to any of the following reasons: (1) Authentication failed due to invalid " + "credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS " + "traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.", nodeId, disconnectState.remoteAddress()); break; case NOT_CONNECTED: log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress()); break; default: break; // Disconnections in other states are logged at debug level in Selector } // 清空本地缓存的与该节点相关的数据 // 最终创建一个 disconnected 类型的 ClientResponse 对象添加到结果集合中 cancelInFlightRequests(nodeId, now, responses); metadataUpdater.handleServerDisconnect(now, nodeId, Optional.ofNullable(disconnectState.exception())); }handleConnections 获取断开连接的节点 ID 集合,并更新相应节点的连接状态为 READY.
private void handleConnections() { for (String node : this.selector.connected()) { // We are now connected. Note that we might not still be able to send requests. For instance, // if SSL is enabled, the SSL handshake happens after the connection is established. // Therefore, it is still necessary to check isChannelReady before attempting to send on this // connection. if (discoverBrokerVersions) { this.connectionStates.checkingApiVersions(node); nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder()); log.debug("Completed connection to node {}. Fetching API versions.", node); } else { // 更新相应节点的连接状态为 READY this.connectionStates.ready(node); log.debug("Completed connection to node {}. Ready.", node); } } }ClientResponse#onComplete
在完成了将各种类型请求对应的响应对象 ClientResponse 添加到结果集合中之后,会继续遍历该集合并应用 ClientResponse#onComplete 方法,该方法最终调用的是我们注册的 RequestCompletionHandler 对应的 RequestCompletionHandler#onComplete 方法。onComplete 方法时本质上也就是在调用 Sender#handleProduceResponse 方法,该方法所做的工作就是区分当前的响应类型,并针对每一种响应类型设置对应的参数并回调 Sender#completeBatch 方法。
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now, long throttleUntilTimeMs) { Errors error = response.error; if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() && (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) { // If the batch is too large, we split the batch and send the split batches again. We do not decrement // the retry attempts in this case. log.warn( "Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, this.retries - batch.attempts(), error); if (transactionManager != null) transactionManager.removeInFlightBatch(batch); // 消息过大,重整accumulator后重试 this.accumulator.splitAndReenqueue(batch); maybeRemoveAndDeallocateBatch(batch); this.sensors.recordBatchSplit(); } else if (error != Errors.NONE) { if (canRetry(batch, response, now)) { log.warn( "Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", correlationId, batch.topicPartition, this.retries - batch.attempts() - 1, error); if (transactionManager == null) { // 异常响应,但是允许重试 // 将消息重新添加到收集器中,等待再次发送 reenqueueBatch(batch, now); } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) { // If idempotence is enabled only retry the request if the current producer id is the same as // the producer id of the batch. log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}", batch.topicPartition, batch.producerId(), batch.baseSequence()); // 将消息重新添加到收集器中,等待再次发送 reenqueueBatch(batch, now); } else { failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " + "batch but the producer id changed from " + batch.producerId() + " to " + transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false); } } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) { // If we have received a duplicate sequence error, it means that the sequence number has advanced beyond // the sequence of the current batch, and we haven't retained batch metadata on the broker to return // the correct offset and timestamp. // // The only thing we can do is to return success to the user and not return a valid offset and timestamp. completeBatch(batch, response); } else { final RuntimeException exception; if (error == Errors.TOPIC_AUTHORIZATION_FAILED) exception = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic())); else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends"); else exception = error.exception(); // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust // its retries -- if it did, we don't know whether the sequence number was accepted or not, and // thus it is not safe to reassign the sequence. failBatch(batch, response, exception, batch.attempts() < this.retries); } if (error.exception() instanceof InvalidMetadataException) { if (error.exception() instanceof UnknownTopicOrPartitionException) { log.warn("Received unknown topic or partition error in produce request on partition {}. The " + "topic-partition may not exist or the user may not have Describe access to it", batch.topicPartition); } else { log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " + "to request metadata update now", batch.topicPartition, error.exception().toString()); } // 如果是集群元数据异常,则标记需要更新集群元数据信息 metadata.requestUpdate(); } } else { completeBatch(batch, response); } // Unmute the completed partition. // 释放已经处理完成的 topic 分区,对于需要保证消息强顺序性,以允许接收下一条消息 if (guaranteeMessageOrder) this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs); }上述方法会判断当前响应是否异常且可以需要重试,如果是则将 RecordBatch 重新添加到收集器 RecordAccumulator 中,等待再次发送。如果是正常响应或不允许重试,则调用 带两个参数的 completeBatch 方法结束本次发送消息的过程。
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) { if (transactionManager != null) { transactionManager.handleCompletedBatch(batch, response); } // done 方法结束本次发送消息的过程 if (batch.done(response.baseOffset, response.logAppendTime, null)) { maybeRemoveAndDeallocateBatch(batch); } }它本质上调用了batch.done结束本次发送消息的过程。
public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) { final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED; if (tryFinalState == FinalState.SUCCEEDED) { log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset); } else { log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception); } // 标识当前 RecordBatch 已经处理完成 if (this.finalState.compareAndSet(null, tryFinalState)) { completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception); return true; } if (this.finalState.get() != FinalState.SUCCEEDED) { if (tryFinalState == FinalState.SUCCEEDED) { // Log if a previously unsuccessful batch succeeded later on. log.debug("ProduceResponse returned {} for {} after batch with base offset {} had already been {}.", tryFinalState, topicPartition, baseOffset, this.finalState.get()); } else { // FAILED --> FAILED and ABORTED --> FAILED transitions are ignored. log.debug("Ignored state transition {} -> {} for {} batch with base offset {}", this.finalState.get(), tryFinalState, topicPartition, baseOffset); } } else { // A SUCCESSFUL batch must not attempt another state change. throw new IllegalStateException("A " + this.finalState.get() + " batch must not attempt another state change to " + tryFinalState); } return false; }completeFutureAndFireCallbacks方法回调消费者在发送时传入的callback事件。
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call // 设置当前 RecordBatch 发送之后的状态 produceFuture.set(baseOffset, logAppendTime, exception); // execute callbacks // 循环执行每个消息的 Callback 回调 for (Thunk thunk : thunks) { try { if (exception == null) { RecordMetadata metadata = thunk.future.value(); if (thunk.callback != null) thunk.callback.onCompletion(metadata, null); } else { // 消息处理异常 if (thunk.callback != null) thunk.callback.onCompletion(null, exception); } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e); } } // 标记本次请求已经完成(正常响应、超时,以及关闭生产者) produceFuture.done(); }至此,生产者发送消息的流程已经完成。