Kafka 消费者模块(一):rebalance的触发

    科技2022-07-11  89

            在消费者调用poll拉消息的时候,消费者会先检测当前是否需要执行分区再分配操作,如果需要则直接返回空的结果,这样在不超时的情况下,方法 KafkaConsumer#pollOnce 会立即被再次调用,从而开始对当前 topic 分区执行再分配,即调用 ConsumerCoordinator#poll 方法。

    public boolean poll(Timer timer) { maybeUpdateSubscriptionMetadata(); // 触发执行注册的监听 offset 提交完成的方法 invokeCompletedOffsetCommitCallbacks(); // 确保当前是 AUTO_TOPICS 或 AUTO_PATTERN(USER_ASSIGNED 不需要再平衡)订阅模式, // 且目标 GroupCoordinator 节点可达,如果不可达,则会尝试寻找一个可用的节点 if (subscriptions.partitionsAutoAssigned()) { if (protocol == null) { throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG + " to empty while trying to subscribe for group protocol to auto assign partitions"); } // Always update the heartbeat last poll time so that the heartbeat thread does not leave the // group proactively due to application inactivity even if (say) the coordinator cannot be found. pollHeartbeat(timer.currentTimeMs()); if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { return false; } // 需要执行再平衡 if (rejoinNeededOrPending()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the cluster's topics at least once before joining. if (subscriptions.hasPatternSubscription()) { // For consumer group that uses pattern-based subscription, after a topic is created, // any consumer that discovers the topic after metadata refresh can trigger rebalance // across the entire consumer group. Multiple rebalances can be triggered after one topic // creation if consumers refresh metadata at vastly different times. We can significantly // reduce the number of rebalances caused by single topic creation by asking consumer to // refresh metadata before re-joining the group as long as the refresh backoff time has // passed. if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) { this.metadata.requestUpdate(); } if (!client.ensureFreshMetadata(timer)) { return false; } maybeUpdateSubscriptionMetadata(); } /* * 1. 检查目标 GroupCoordinator 节点是否准备好接收请求 * 2. 启动心跳线程 * 3. 执行分区再分配操作 */ // ensureActiveGroup 执行具体的分区再分配操作 if (!ensureActiveGroup(timer)) { return false; } } } else { // For manually assigned partitions, if there are no ready nodes, await metadata. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop. // When group management is used, metadata wait is already performed for this scenario as // coordinator is unknown, hence this check is not required. if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { client.awaitMetadataUpdate(timer); } } // 异步提交 offset maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); return true; }

    当我们使用 AUTO_TOPICS 或 AUTO_PATTERN 模式订阅 Kafka topic 时,我们并不需要考虑当前消费者具体消费哪个分区,Kafka 会依据分区分配策略为消费者分配一个或多个分区进行消费(一个分区至多被一个消费者消费,不允许多个消费者同时消费同一个分区)。但是消费者可能会中途加入,也可能会中途退出,topic 的分区数目也是允许改变的,此时就需要依赖分区再分配机制为注册的消费者重新分配分区。

    分区再分配操作分为 3 个阶段,并且是一个与集群交互联动的过程,这里我们以客户端视角,当消费者检测到需要重新分配分区时会触发执行:

    发送 GroupCoordinatorRequest 请求获取目标可用的 GroupCoordinator 实例所在的 broker 节点,如果没有则选择负载最小的节点并尝试建立连接;向 GroupCoordinator 实例所在节点发送 JoinGroupRequest 请求申请加入目标 group,GroupCoordinator 实例会在既定时间范围内等待消费者的申请加入请求,如果提前检测到已经接收到 group 名下所有消费者的申请,或者等待时间超时,则会返回 JoinGroupResponse 响应,主要目的是告知谁是新的 Group Leader 消费者,以及最终确定的分区分配策略;Group Leader 依据指定的分区分配策略为当前 group 名下的消费者分配分区,并向目标 GroupCoordinator 实例所在节点发送 SyncGroupRequest 请求以告知最终的分区分配结果。

    判定需要执行分区再分配操作的条件,位于 ConsumerCoordinator#rejoinNeededOrPending

    public boolean rejoinNeededOrPending() { // USER_ASSIGNED 订阅模式不需要执行分区再分配 if (!subscriptions.partitionsAutoAssigned()) return false; // we need to rejoin if we performed the assignment and metadata has changed; // also for those owned-but-no-longer-existed partitions we should drop them as lost // 再平衡过程中分区数量发生变化 if (assignmentSnapshot != null && !assignmentSnapshot.matches(metadataSnapshot)) { requestRejoin(); return true; } // we need to join if our subscription has changed since the last join // 消费者 topic 订阅信息发生变化 if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) { requestRejoin(); return true; } // 其它标识需要再平衡的操作,例如分区再分配执行失败、重置年代信息等 return super.rejoinNeededOrPending(); }

            如果判定需要执行分区再分配操作,消费者接下去会调用 AbstractCoordinator#ensureActiveGroup 方法确认所属 group 对应的目标 GroupCoordinator 实例所在节点是否准备好接收请求,如果对应节点不可用,则会发送 GroupCoordinatorRequest 请求查找负载较小且可用的节点,并与之建立连接。接着会调用 AbstractCoordinator#joinGroupIfNeeded 方法开始执行分区再分配策略,实现如下:

    boolean joinGroupIfNeeded(final Timer timer) { // 如果需要执行分区再分配,且目前正在进行中 while (rejoinNeededOrPending()) { // 再次检查目标 GroupCoordinator 节点是否准备好接收请求 if (!ensureCoordinatorReady(timer)) { return false; } // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second // time if the client is woken up before a pending rebalance completes. This must be called // on each iteration of the loop because an event requiring a rebalance (such as a metadata // refresh which changes the matched subscription set) can occur while another rebalance is // still in progress. // 执行前期准备工作 if (needsJoinPrepare) { // need to set the flag before calling onJoinPrepare since the user callback may throw // exception, in which case upon retry we should not retry onJoinPrepare either. /* * 1. 如果开启了 offset 自动提交,则同步提交 offset * 2. 调用注册的 ConsumerRebalanceListener 监听器的 onPartitionsRevoked 方法 * 3. 取消当前消费者的 leader 身份(如果是的话),恢复成为一个普通的消费者 */ needsJoinPrepare = false; onJoinPrepare(generation.generationId, generation.memberId); } // 创建并发送 JoinGroupRequest 请求,申请加入目标 group final RequestFuture<ByteBuffer> future = initiateJoinGroup(); client.poll(future, timer); if (!future.isDone()) { // we ran out of time return false; } // 执行分区分配成功 if (future.succeeded()) { Generation generationSnapshot; // Generation data maybe concurrently cleared by Heartbeat thread. // Can't use synchronized for {@code onJoinComplete}, because it can be long enough // and shouldn't block heartbeat thread. // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment} synchronized (AbstractCoordinator.this) { generationSnapshot = this.generation; } if (generationSnapshot != Generation.NO_GENERATION) { // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried. ByteBuffer memberAssignment = future.value().duplicate(); onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocol, memberAssignment); // Generally speaking we should always resetJoinGroupFuture once the future is done, but here // we can only reset the join group future after the completion callback returns. This ensures // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded. // And because of that we should explicitly trigger resetJoinGroupFuture in other conditions below. resetJoinGroupFuture(); needsJoinPrepare = true; } else { log.info("Generation data was cleared by heartbeat thread. Initiating rejoin."); resetStateAndRejoin(); resetJoinGroupFuture(); return false; } } else { resetJoinGroupFuture(); // 执行分区分配失败,依据失败类型考虑是否重试 final RuntimeException exception = future.exception(); if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException || exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) throw exception; timer.sleep(rebalanceConfig.retryBackoffMs); } } return true; }
    Processed: 0.013, SQL: 8