pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
测试代码:
package com.kaven.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
send("new-topic-user");
}
public static void send(String name) throws ExecutionException, InterruptedException {
Producer<String, String> producer = ProducerTest.createProducer();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
"key-" + i,
"value-" + i
);
// 异步发送并回调
producer.send(producerRecord, (metadata, exception) -> {
if(exception == null) {
System.out.println("partition: " + metadata.partition() + " offset: " + metadata.offset());
}
else {
exception.printStackTrace();
}
});
}
// 要关闭Producer实例
producer.close();
}
public static Producer<String, String> createProducer() {
// Producer的配置
Properties properties = new Properties();
// 服务地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.7:9092");
// KEY的序列化器类
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// VALUE的序列化器类
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 分区器类
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kaven.kafka.producer.PartitionLoadBalancer");
return new KafkaProducer<>(properties);
}
}
Producer
自定义
Partition
负载均衡需要实现
org.apache.kafka.clients.producer.Partitioner
接口。
package com.kaven.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class PartitionLoadBalancer implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int num = Integer.parseInt(((String) key).split("-")[1]) % cluster.partitionCountForTopic(topic);
System.out.println(key + " : " + num);
return num;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
PartitionLoadBalancer
根据消息
key
的最后一位数字(这里根据自己的需求来设计)来选择分区。
key-0 : 0
key-1 : 1
key-2 : 2
key-3 : 0
key-4 : 1
key-5 : 2
key-6 : 0
key-7 : 1
key-8 : 2
key-9 : 0
partition: 2 offset: 37
partition: 2 offset: 38
partition: 2 offset: 39
partition: 0 offset: 42
partition: 0 offset: 43
partition: 0 offset: 44
partition: 0 offset: 45
partition: 1 offset: 50
partition: 1 offset: 51
partition: 1 offset: 52