新增分区对 producer 的影响 - 源码解析

kafka 源码分析

Posted by GiraffeTree on August 31, 2020

概述

题图为 8月14号晚上拍摄的英仙座流星

在分区扩容时,比较担心扩容对 client 的影响, 本节主要分析下对 producer 的影响

这里先说下小结

  1. producer 与 server 的 TCP 连接是在创建 KafkaProducer 实例时建立的
  2. producer 内部会定期向 broker 发送 metadata 请求, 更新主题分区的关系, 时长metadata.max.age.ms 控制
  3. 当 Producer 尝试给一个不存在的主题发送消息时, broker 会告诉 producer 主题不存在, producer 会发送 metadata 请求, 尝试获取最新的元数据信息

而更新了元数据后, 相当于 producer 会根据你的 key -> partition 的规则, 分发数据到不同分区

所以默认情况下, 相同 key 的数据可能由于分区扩容导致 前后数据处于不同分区

所以建议在建立 topic 时就初始化足够多的分区 ,避免之后分区扩展的麻烦

源码分析 (基于 kafka-clients-2.4.1)

producer 获取分区信息跟 metadata 请求有着密切的联系, 有兴趣可以自己调试下源码, 下面的分析也可以不用看

先看下 metadata 请求/响应的主要调用栈, 方便你们自己查看调试

  • 发送 metadata 请求
    • 主要方法是NetworkClients.sendInternalMetadataRequest

图片

  • 收到 metadata 响应
    • 主要方法是ProducerMetadata.updata

图片

Producer 发送数据时, 什么时候去拿分区信息的?

Producer 获取 分区信息, 我这边分为 两部分来讲 , 一个是主线程, 一个是 producer 子线程

这两个线程主要通过 ProducerMetadata 来共享数据, 并通过 synchronized 来做同步 (通知)

主线程部分

  1. 主线程调用 KafkaProducer 构造函数时会创建一个 kafka-producer-network-thread | producer-1的线程, 请注意和 main 主线程做区分

    图片

  2. 当主线程第一次调用  producer.send 后, 会等待版本初始化更新

    • 主要调用了KafkaProducer.waitOnMetadata方法

    图片

  3. 接着调用了ProducerMetadata.awaitUpdate方法
    1. 注意这里的 SystemTime 将 this (producerMetadata 对象本身) 传了进去, 作为同步块的同步对象
    2. 然后开始 wait
    3. 这个伏笔 this 会在之后, ProducerMetadata 更新时被唤醒

    图片

  4. 继续看到 time.waitObject , 这个方法其实是个死循环, 知道条件满足, 才会返回
    • 其实就是等待, 直到元数据被拉取下来

    图片

producer 线程部分

  1. 与此同时,在 ProducerClient的构造函数中, 创建并启动 producer 线程, 然后会循环调用 poll 方法

     @Override
     public List<ClientResponse> poll(long timeout, long now) {
         // 省略了一些状态判断的代码
         long metadataTimeout = metadataUpdater.maybeUpdate(now);
         try {
         this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
     } catch (IOException e) {
         log.error("Unexpected error during I/O", e);
         }
         long updatedNow = this.time.milliseconds();
         List<ClientResponse> responses = new ArrayList<>();
         handleCompletedSends(responses, updatedNow);
         handleCompletedReceives(responses, updatedNow);
         handleDisconnections(responses, updatedNow);
         handleConnections();
         handleInitiateApiVersionRequests(updatedNow);
         handleTimedOutRequests(responses, updatedNow);
         completeResponses(responses);
         return responses;
     }
    
  2. producer 首先需要连接到 kafka 服务器

    图片

     private void initiateConnect(Node node, long now) {
         String nodeConnectionId = node.idString();
         try {
             connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
             InetAddress address = connectionStates.currentAddress(nodeConnectionId);
             log.debug("Initiating connection to node {} using address {}", node, address);
             selector.connect(nodeConnectionId,
                     new InetSocketAddress(address, node.port()),
                     this.socketSendBuffer,
                     this.socketReceiveBuffer);
         } catch (IOException e) {
             log.warn("Error connecting to node {}", node, e);
             // Attempt failed, we'll try again after the backoff
             connectionStates.disconnected(nodeConnectionId, now);
             // Notify metadata updater of the connection failure
             metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
         }
     }
    
  3. 在连接到 kafka 服务器后, handleConnections 将 node 加进 nodesNeedingApiVersionsFetch 一个 map 之中

    • 这里的3,4,5步其实和本节探讨的问题没有关联, 直接跳到第6步看就行

    图片

     private void handleConnections() {
         for (String node : this.selector.connected()) {
             // We are now connected.  Note that we might not still be able to send requests. For instance,
             // if SSL is enabled, the SSL handshake happens after the connection is established.
             // Therefore, it is still necessary to check isChannelReady before attempting to send on this
             // connection.
             if (discoverBrokerVersions) {
                 this.connectionStates.checkingApiVersions(node);
                 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
                 log.debug("Completed connection to node {}. Fetching API versions.", node);
             } else {
                 this.connectionStates.ready(node);
                 log.debug("Completed connection to node {}. Ready.", node);
             }
         }
     }
    
  4. 在poll循环过程中, handleInitiateApiVersionRequests 会向服务器发送 ApiVersionRequest

    图片

    图片

  5. handleInitiateApiVersionRequests 会取出 nodesNeedingApiVersionsFetch 中的拉取请求, 并发送正式的客户端请求给 服务器

     private void handleInitiateApiVersionRequests(long now) {
         Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = nodesNeedingApiVersionsFetch.entrySet().iterator();
         while (iter.hasNext()) {
             Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next();
             String node = entry.getKey();
             if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) {
                 log.debug("Initiating API versions fetch from node {}.", node);
                 ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue();
                 ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true);
                 doSend(clientRequest, true, now);
                 iter.remove();
             }
         }
     }
    
  6. 当连接上服务器后, maybeUpdata 方法中会发送内部元数据请求

    图片

  7. 收到元数据请求的响应时, 更新元数据 Metadata.update

    图片

  • ProducerMetadata.notifyAll 唤醒条件不满足的线程, 于是主线程, 继续执行, 拿到了 metadata
    • 这里其实就是一个典型的 MESA 管程应用
    • 另外, 这里要写成 notifyAll() , 否则多个线程 同时使用 一个 producer 发送数据时, 可能会导致有一些线程拿不到锁, 饿死….

    图片

如何确定消息发给的分区

  • Producer.partition 有以下代码
    • 我们可以通过实现 Partitioner接口, 自定义选择分区
    • 但我想如果自己实现一个 key -> partition 永久绑定, 得有一个存储中心来记录 key -> partition 的关系, 这样实现的话, 好像比较麻烦, 对性能/内存要求也很高, 每次一个新的 key 传进来, 都要查询下存储; 并且如果出现不同 producer 对分区扩容感应的时间不同, 可能出现 key -> partition 冲突的情况, 不过仔细一想这点在实现上可以避免
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(
                    record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

小结

从上面的源码分析来看 , 分区增加对于 producer 的影响可能就是会将相同 key 的消息发送到不同的分区, 可能导致一些数据处理上的问题

所以建议对可能会增加吞吐量的主题进行过度分区(在建立 topic 时就初始化足够多的分区),防止在之后进行分区扩容时, 数据被分发到不同分区上。

其他参考

  1. kafka New topic or Increase partition count
    1. https://stackoverflow.com/questions/52553133/kafka-new-topic-or-increase-partition-count
  2. 开启 org.apache.kafka.clients.Metadata 的 debug 日志

     # 我这里用的是 log4j2.yaml
     # 下面是部分配置, 完整请查看 todo
     Loggers:
         logger:
             -
                 name:  org.apache.kafka.clients.Metadata
                 level: debug
                 AppenderRef:
                 ref: STDOUT
    
    • 新增 分区时 producer 客户端 metadata 类的日志内容示例

    图片