kafka分區政策
- kafka分區政策
- 指定具體分區号
- 不給定具體分區号,給定key值(key值不斷變化)
- 不給定具體分區号,也不給對應的key
- 自定義分區
kafka分區政策
kafka分區政策決定producer生産者生産的消息最終會寫入到topic的哪個分區中。
kafka分區政策有以下四種
- 指定具體分區号
- 不給定具體分區号,給定key值(key值不斷變化)
- 不給定具體分區号,也不給定對應的key
- 自定義分區
實際上kafka還有一種分區政策,即随機分區,因為負載均衡不如輪訓,是以很少使用。
kafka預設兩種政策,根據是否有key來決定使用哪種
- 不給定具體分區号,給定key值(key值不斷變化)
- 不給定具體分區号,也不給定對應的key
kafka自定義有兩種政策
- 指定具體分區号
- 自定義分區
指定具體分區号需要在生産者發送消息時指定具體分區的index,很少使用
指定具體分區号
給定具體分區号,資料會寫入到指定的分區中
不給定具體分區号,給定key值(key值不斷變化)
不給定具體的分區号,給定key值。一般使用key的hashcode%partition分區數 作為分區号
不給定具體分區号,也不給對應的key
分區号和key都不給定,資料會以輪訓的方式寫入到不同分區中
自定義分區
自定義分區主要需要做以下2件事:
- 實作
類,并重寫類中的partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) 方法。Partitioner.class
- 在生成kafka producer用戶端的時候直接指定新的分區類
實作Partitioner.class類
public class MyParatitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
//key不能空,如果key為空的會通過輪詢的方式 選擇分區
if(keyBytes == null || (!(key instanceof String))){
throw new RuntimeException("key is null");
}
//擷取分區清單
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//以下是上述各種政策的實作,不能共存
//随機政策
return ThreadLocalRandom.current().nextInt(partitions.size());
//不給定分區号,給定key值
//此處注意,因為hascode可能出現負數,是以最終結果要取絕對值
return Math.abs(key.hashCode()) % partitions.size();
//自定義分區政策, 比如key為123的消息,選擇放入最後一個分區
if(key.toString().equals("123")){
return partitions.size()-1;
}else{
//否則随機
ThreadLocalRandom.current().nextInt(partitions.size());
}
}
@Override
public void close() {
}
}
在producer指定新的分區類