kafka分区策略
- kafka分区策略
- 指定具体分区号
- 不给定具体分区号,给定key值(key值不断变化)
- 不给定具体分区号,也不给对应的key
- 自定义分区
kafka分区策略
kafka分区策略决定producer生产者生产的消息最终会写入到topic的哪个分区中。
kafka分区策略有以下四种
- 指定具体分区号
- 不给定具体分区号,给定key值(key值不断变化)
- 不给定具体分区号,也不给定对应的key
- 自定义分区
实际上kafka还有一种分区策略,即随机分区,因为负载均衡不如轮训,所以很少使用。
kafka默认两种策略,根据是否有key来决定使用哪种
- 不给定具体分区号,给定key值(key值不断变化)
- 不给定具体分区号,也不给定对应的key
kafka自定义有两种策略
- 指定具体分区号
- 自定义分区
指定具体分区号需要在生产者发送消息时指定具体分区的index,很少使用
指定具体分区号
给定具体分区号,数据会写入到指定的分区中
不给定具体分区号,给定key值(key值不断变化)
不给定具体的分区号,给定key值。一般使用key的hashcode%partition分区数 作为分区号
不给定具体分区号,也不给对应的key
分区号和key都不给定,数据会以轮训的方式写入到不同分区中
自定义分区
自定义分区主要需要做以下2件事:
- 实现
类,并重写类中的partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) 方法。Partitioner.class
- 在生成kafka producer客户端的时候直接指定新的分区类
实现Partitioner.class类
public class MyParatitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
//key不能空,如果key为空的会通过轮询的方式 选择分区
if(keyBytes == null || (!(key instanceof String))){
throw new RuntimeException("key is null");
}
//获取分区列表
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//以下是上述各种策略的实现,不能共存
//随机策略
return ThreadLocalRandom.current().nextInt(partitions.size());
//不给定分区号,给定key值
//此处注意,因为hascode可能出现负数,所以最终结果要取绝对值
return Math.abs(key.hashCode()) % partitions.size();
//自定义分区策略, 比如key为123的消息,选择放入最后一个分区
if(key.toString().equals("123")){
return partitions.size()-1;
}else{
//否则随机
ThreadLocalRandom.current().nextInt(partitions.size());
}
}
@Override
public void close() {
}
}
在producer指定新的分区类