天天看點

[kafka掃盲]---(6)kafka源碼閱讀之分區器

Author:趙志乾
Date:2018-10-21
Declaration:All Right Reserved!!!
           

DefaultPartitioner.java

該類實作了Partitioner接口,核心方法為partition():用于給未指定分區号的消息記錄生成分區号,其生成政策也比較簡單。其依據是否指定key值采用兩種不同的政策:如果指定key值,則按照key的hash來生成分區号,如果未指定key值,則按照輪詢政策來生成分區号。

為了實作輪詢政策,該類内部維護了一個執行個體字段:topicCounterMap,其類型為ConcurrentMap。該字段用于維護topic名稱到topic在目前用戶端中所持有的計數器。

如果要實作自定義的分區器,可以通過實作Partitioner接口來完成。

package org.apache.kafka.clients.producer.internals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

/*該類的執行個體是由kafka用戶端預設的分區器,提供預設的分區政策:如果生産者要釋出的消息記錄指定了分區
号,則直接使用該分區号進行二級分屬劃分;如果沒有指定分區号,而是指定了key值,則使用該key的hash值
來生成分區号,進行消息記錄的二級分屬劃分;如果分區号和key都沒有指定,則通過輪詢的方式生成一個分區
号,進行二級分屬劃分*/
public class DefaultPartitioner implements Partitioner {
    //主題計數器,用于輪詢政策使用
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    /*為指定消息記錄計算分區号,所需參數包括:主題、key、key序列化後的位元組數組、value、value序
列化後的位元組數組、kafka叢集中繼資料*/
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[]
 valueBytes, Cluster cluster) {
        //擷取消息記錄所屬主題在叢集中的目前分區資訊
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //目前主題擁有的分區數
        int numPartitions = partitions.size();
        //如果沒有指定key值,便采用輪詢的方式生成分區号
        if (keyBytes == null) {
            //擷取主題下一個計數值
            int nextValue = nextValue(topic);
            //擷取叢集上指定主題下的可用分區資訊
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic
(topic);    
            //如果存在可用分區
            if (availablePartitions.size() > 0) {
                //按輪詢政策得到分區号
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                //沒有可用分區時,按主題下所有分區參與輪詢計算,傳回一個不可用分區号            
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            如果存在key值,則通過對key值hash的方式傳回一個分區号            
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    //擷取主題的下一個計數值,傳回值用于後續的輪詢政策
    private int nextValue(String topic) {
        //擷取指定主題的計數器
        AtomicInteger counter = topicCounterMap.get(topic);
        //如果計數器不存在,代表首次向該主題釋出消息
        if (null == counter) {
            //為主題生成一個計數器執行個體
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        //傳回主題下一個計數值
        return counter.getAndIncrement();
    }

    public void close() {}
}