Kafka 生产者模块(一):KafkaProducer 以及Metadata、Cluster定义

    科技2022-07-10  88

    public class KafkaProducer<K, V> implements Producer<K, V> { private final Logger log; /** clientId 生成器,如果没有明确指定客户端 ID,则使用该字段顺序生成一个 */ private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE = new AtomicInteger(1); private static final String JMX_PREFIX = "kafka.producer"; public static final String NETWORK_THREAD_PREFIX = "kafka-producer-network-thread"; public static final String PRODUCER_METRIC_GROUP_NAME = "producer-metrics"; /** 生产者唯一标识(对应 client.id 属性配置 ) */ private final String clientId; // Visible for testing final Metrics metrics; /** 分区选择器(对应 partitioner.class 属性配置),如果未明确指定分区,则基于一定的策略为消息选择合适的分区 */ private final Partitioner partitioner; /** 消息的最大长度(对应 max.request.size 配置,包含消息头、序列化之后的 key 和 value) */ private final int maxRequestSize; /** 发送单条消息的缓冲区大小(对应 buffer.memory 配置) */ private final long totalMemorySize; /** kafka 集群元数据 */ private final ProducerMetadata metadata; /** 消息收集器,用于收集并缓存消息,等待 Sender 线程的发送 */ private final RecordAccumulator accumulator; /** 消息发送线程对象 */ private final Sender sender; /** 消息发送线程 */ private final Thread ioThread; /** 压缩算法(对应 compression.type 配置) */ private final CompressionType compressionType; private final Sensor errors; /** 时间戳工具 */ private final Time time; /** key 序列化器(对应 key.serializer 配置) */ private final Serializer<K> keySerializer; /** value 序列化器(对应 value.serializer 配置) */ private final Serializer<V> valueSerializer; /** 封装配置信息 */ private final ProducerConfig producerConfig; /** 等待更新 kafka 集群元数据的最大时长 */ private final long maxBlockTimeMs; /** 发送拦截器(对应 interceptor.classes 配置),用于待发送的消息进行拦截并修改,也可以对 ACK 响应进行拦截处理 */ private final ProducerInterceptors<K, V> interceptors; private final ApiVersions apiVersions; private final TransactionManager transactionManager; }

    生产者初始化如下:

    KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) { ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)); try { // 获取用户配置信息 Map<String, Object> userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = time; String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ? (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null; // 尝试获取用户配置的 clientId,如果未配置则基于 PRODUCER_CLIENT_ID_SEQUENCE 顺序生成一个 this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId); LogContext logContext; if (transactionalId == null) logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId)); else logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId)); log = logContext.logger(KafkaProducer.class); log.trace("Starting the Kafka producer"); Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) .tags(metricTags); List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class, Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); reporters.add(new JmxReporter(JMX_PREFIX)); this.metrics = new Metrics(metricConfig, reporters, time); // 获取配置的分区器对象(反射创建) this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); // 获取生产者重试间隔 long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); // 如果参数未指定 key 序列化器,则尝试从配置中获取 key 序列化器对象(反射创建) if (keySerializer == null) { this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); this.keySerializer.configure(config.originals(), true); } else { config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); this.keySerializer = keySerializer; } // 如果参数未指定 value 序列化器,则尝试从配置中获取 value 序列化器对象(反射创建) if (valueSerializer == null) { this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); this.valueSerializer.configure(config.originals(), false); } else { config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); this.valueSerializer = valueSerializer; } // load interceptors and make sure they get clientId userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false); // 获取注册的拦截器列表 List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances( ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); if (interceptors != null) this.interceptors = interceptors; else this.interceptors = new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); // 获取并设置生产者发送请求的大小 this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); // 获取并设置生产者内存缓冲区大小,用于缓存要发送到服务器的消息 this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); // 获取并设置消息压缩算法,可以设置为 snappy、gzip 或 lz4,默认不压缩。 this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG); this.transactionManager = configureTransactionState(config, logContext, log); int deliveryTimeoutMs = configureDeliveryTimeout(config, log); this.apiVersions = new ApiVersions(); // 创建消息收集器,用于异步发送消息 this.accumulator = new RecordAccumulator(logContext, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, metrics, PRODUCER_METRIC_GROUP_NAME, time, apiVersions, transactionManager, new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME)); // 获取 kafka 集群主机列表 List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)); if (metadata != null) { this.metadata = metadata; } else { // 创建并更新 kafka 集群的元数据信息 this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), logContext, clusterResourceListeners, Time.SYSTEM); this.metadata.bootstrap(addresses); } this.errors = this.metrics.sensor("errors"); // 创建并启动 Sender 线程 this.sender = newSender(logContext, kafkaClient, this.metadata); String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); this.ioThread.start(); // 打印未使用的配置 config.logUnused(); AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds()); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 close(Duration.ofMillis(0), true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); } }

    Metadata 定义         保存集群元数据信息的 Metadata 类的字段定义:

    public class Metadata implements Closeable { private final Logger log; /** 元数据最小更新时间间隔,默认是 100 毫秒,防止更新太频繁 */ private final long refreshBackoffMs; /** 元数据更新时间间隔,默认为 5 分钟 */ private final long metadataExpireMs; /** 元数据版本号,每更新成功一次则版本号加 1 */ private int updateVersion; // bumped on every metadata response /** 上一次更新元数据的时间戳,不管成功还是失败 */ private long lastRefreshMs; private int requestVersion; // bumped on every new topic addition /** 上一次成功更新元数据的时间戳 */ private long lastSuccessfulRefreshMs; private KafkaException fatalException; private Set<String> invalidTopics; private Set<String> unauthorizedTopics; private MetadataCache cache = MetadataCache.empty(); /** 标记是否需要更新集群元数据信息 */ private boolean needUpdate; private final ClusterResourceListeners clusterResourceListeners; private boolean isClosed; private final Map<TopicPartition, Integer> lastSeenLeaderEpochs; }

    Cluster          Cluster 类是对集群节点、topic、分区等信息的一个封装,其字段定义如下:

    public final class Cluster { private final boolean isBootstrapConfigured; /** kafka 集群中的节点信息列表(包括 id、host、port 等信息) */ private final List<Node> nodes; /** 未授权的 topic 集合 */ private final Set<String> unauthorizedTopics; private final Set<String> invalidTopics; /** 内部 topic 集合 */ private final Set<String> internalTopics; private final Node controller; /** 记录 topic 分区与分区详细信息的映射关系 */ private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition; /** 记录 topic 及其分区信息的映射关系 */ private final Map<String, List<PartitionInfo>> partitionsByTopic; /** 记录 topic 及其分区信息的映射关系(必须包含 leader 副本) */ private final Map<String, List<PartitionInfo>> availablePartitionsByTopic; /** 记录节点 ID 与分区信息的映射关系 */ private final Map<Integer, List<PartitionInfo>> partitionsByNode; /** key 是 brokerId,value 是 broker 节点信息,方便基于 brokerId 获取对应的节点信息 */ private final Map<Integer, Node> nodesById; private final ClusterResource clusterResource; }

    RecordAccumulator          消息收集器,可以将其看做是一个本地缓存消息的队列,消息收集线程将消息最终记录到收集器中,而 Sender 线程会定期定量从收集器中取出缓存的消息,并投递给 Kafka 集群。

    public final class RecordAccumulator { private final Logger log; /** 标识当前收集器是否被关闭,对应 producer 被关闭 */ private volatile boolean closed; /** 记录正在执行 flush 操作的线程数 */ private final AtomicInteger flushesInProgress; /** 记录正在执行 append 操作的线程数 */ private final AtomicInteger appendsInProgress; /** 指定每个 RecordBatch 中 ByteBuffer 的大小 */ private final int batchSize; /** 消息压缩类型 */ private final CompressionType compression; /** 通过参数 linger.ms 指定,当本地消息缓存时间超过该值时,即使消息量未达到阈值也会进行投递 */ private final int lingerMs; /** 生产者重试时间间隔 */ private final long retryBackoffMs; private final int deliveryTimeoutMs; /** 缓存(ByteBuffer)管理工具 */ private final BufferPool free; /** 时间戳工具 */ private final Time time; private final ApiVersions apiVersions; /** 记录 topic 分区与 RecordBatch 的映射关系,对应的消息都是发往对应的 topic 分区 */ private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches; /** 记录未发送完成(即未收到服务端响应)的消息集合 */ private final IncompleteBatches incomplete; // The following variables are only accessed by the sender thread, so we don't need to protect them. /** * 消息顺序性保证, * 缓存当前待发送消息的目标 topic 分区,防止对于同一个 topic 分区同时存在多个未完成的消息,可能导致消息顺序性错乱 */ private final Map<TopicPartition, Long> muted; /** 记录 drain 方法批量导出消息时上次的偏移量 */ private int drainIndex; private final TransactionManager transactionManager; private long nextBatchExpiryTimeMs = Long.MAX_VALUE; // the earliest time (absolute) a batch will expire. }

    ProducerBatch

           从上面 RecordAccumulator 类的字段列表中我们看到有一个 ConcurrentMap<TopicPartition, Deque<ProducerBatch>> 类型的 batches 字段,这里的 key 对应 topic 的某个分区,而 value 是一个 Deque 类型,其中封装了一批 ProducerBatch 对象,这些对象中记录了待发送的消息集合,而这些消息的一个共同点就是都是发往相同的 topic 分区。ProducerBatch 类字段定义如下:

    public final class ProducerBatch { private static final Logger log = LoggerFactory.getLogger(ProducerBatch.class); private enum FinalState { ABORTED, FAILED, SUCCEEDED } /** 当前 RecordBatch 创建的时间戳 */ final long createdMs; /** 当前缓存的消息的目标 topic 分区 */ final TopicPartition topicPartition; /** 标识当前 RecordBatch 发送之后的状态 */ final ProduceRequestResult produceFuture; /** 消息的 Callback 队列,每个消息都对应一个 Callback 对象 */ private final List<Thunk> thunks = new ArrayList<>(); /** 用来存储数据的 {@link MemoryRecords} 对应的 builder 对象 */ private final MemoryRecordsBuilder recordsBuilder; /** 发送当前 RecordBatch 的重试次数 */ private final AtomicInteger attempts = new AtomicInteger(0); private final boolean isSplitBatch; private final AtomicReference<FinalState> finalState = new AtomicReference<>(null); /** 记录保存的 record 个数 */ int recordCount; /** 记录最大的 record 字节数 */ int maxRecordSize; /** 最后一次重试发送的时间戳` */ private long lastAttemptMs; /** 追后一次向当前 RecordBatch 追加消息的时间戳 */ private long lastAppendTime; /** 记录上次投递当前 BatchRecord 的时间戳 */ private long drainedMs; /** 标记是否正在重试 */ private boolean retry; private boolean reopened; }

     

    Processed: 0.015, SQL: 8