kafka发送消息分区选择策略详解

刘超 13天前 ⋅ 92 阅读   编辑

一、描述

  发送消息时kafka采用什么分区策略?

  有以下几种情况:
  a、如果不手动指定分区选择策略,则会使用默认分区策略DefaultPartitioner。
  b、如果不指定消息key,则消息发送到的分区是随着时间不停变换的(AtomicInteger产生数字/分区数)
  c、如果指定了消息key,则会根据消息的hash值和topic的分区数取模来获取分区的。
  d、如果应用有消息顺序性的需要,则可以通过指定消息的key和自定义分区类来将符合某种规则的消息发送到同一个分区。同一个分区消息是有序的,同一个分区只有一个消费者就可以保证消息的顺序性消费。如果一个topic只有一个分区(这会降低性能)或者将同一个特征的数据写到一个分区,可以实现全局有序

二、证实

  下面通过阅读源码来证实一下(基于kafka-0.10.1.1)

  1、创建test分区

/home/zhao/software/kafka_2.11-0.10.2.1/bin/kafka-topics.sh --zookeeper server-1:2181 --create --topic test --partitions 3 --replication-factor 1

  有如下发送消息例子代码

public class PartitionSelect {
    public static void main(String[] args) {
    	String topicName = "test";
    	// 设置配置属性
    	Properties props = new Properties();
    	props.put("bootstrap.servers", "10.7.6.25:9092");
    	props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    	props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    	props.put("request.required.acks", "1");
    	// 创建producer
    	Producer<String, String> producer = new KafkaProducer<String, String>(props);

    	for (int i = 0; i < 10; i++) {
    		producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
    	}

    	System.out.println("Message sent successfully");
    	producer.close();
    }
}

  1、kafka 生产者发送消息分区选择策略

  要知道kafka发送消息的分区选择策略,我们就从send方法入手,通过跟踪send方法,发现KafkaProducer是通过内部的私有方法doSend来发送消息的,里面有一行代码:

int partition = partition(record, serializedKey, serializedValue, cluster);

  这行代码的功能其实就是选择分区,partition方法的代码逻辑如下:

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

  从上面的代码逻辑我们可以看出,如果record指定了分区则使用指定的分区,如果没有则使用partitioner分区器来选择分区。如果没指定partitioner.class,那默认使用的分区选择器实现类是:DefaultPartitioner.class, 该类的分区选择策略如下:

public class DefaultPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(new Random().nextInt());

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = counter.getAndIncrement();
            List availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    public void close() {}

}

  分区选择策略分为两种:
  1、消息的key为null,如果key为null,则获取自增值nextValue,并加一。然后判断topic的可用分区数是否大于0,如果大于0则使用获取的nextValue的值和可用分区数进行取模操作。 如果topic的可用分区数小于等于0,则用获取的nextValue的值和总分区数进行取模操作(其实就是随机选择了一个不可用分区)。
  2、消息的key不为null,不为null的选择策略很简单,就是根据hash算法murmur2就算出key的hash值,然后和分区数进行取模运算。

三、参考文章

1、https://leokongwq.github.io/2017/02/27/mq-kafka-producer-partitioner.html


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: