onJoinComplete 完成rebalance最后一步,确认最终分配结果。
protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) { log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId); // only the leader is responsible for monitoring for metadata changes (i.e. partition changes) if (!isLeader) assignmentSnapshot = null; // 获取最终确定的分区分配策略对应的分区分配器 ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy); Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions()); // 反序列化获取分区分配信息 Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer); // 拿到分配的分区 Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions()); // 拿到的分区和我们的需要订阅的不一样,就重新rebalance if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) { log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " + "that the subscription has changed since we joined the group. Will try re-join the group with current subscription", assignment.partitions(), subscriptions.prettyString()); requestRejoin(); return; } final AtomicReference<Exception> firstException = new AtomicReference<>(null); Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions); addedPartitions.removeAll(ownedPartitions); if (protocol == RebalanceProtocol.COOPERATIVE) { Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions); revokedPartitions.removeAll(assignedPartitions); log.info("Updating assignment with\n" + "now assigned partitions: {}\n" + "compare with previously owned partitions: {}\n" + "newly added partitions: {}\n" + "revoked partitions: {}\n", Utils.join(assignedPartitions, ", "), Utils.join(ownedPartitions, ", "), Utils.join(addedPartitions, ", "), Utils.join(revokedPartitions, ", ") ); if (!revokedPartitions.isEmpty()) { // revoke partitions that were previously owned but no longer assigned; // note that we should only change the assignment (or update the assignor's state) // AFTER we've triggered the revoke callback firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions)); // if revoked any partitions, need to re-join the group afterwards log.debug("Need to revoke partitions {} and re-join the group", revokedPartitions); requestRejoin(); } } // The leader may have assigned partitions which match our subscription pattern, but which // were not explicitly requested, so we update the joined subscription here. // 以正则的方式订阅topic maybeUpdateJoinedSubscription(assignedPartitions); // give the assignor a chance to update internal state based on the received assignment // 更新本地缓存的集群元数据信息 ConsumerGroupMetadata metadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId); assignor.onAssignment(assignment, metadata); // reschedule the auto commit starting from now // 重置下次自动提交 offset 的截止时间 if (autoCommitEnabled) this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs); // 更新assignments字段 subscriptions.assignFromSubscribed(assignedPartitions); // add partitions that were not previously owned but are now assigned firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions)); if (firstException.get() != null) throw new KafkaException("User rebalance callback throws an error", firstException.get()); }