pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kaven</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
測試代碼:
package com.kaven.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class ProducerTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
send("new-topic-user");
}
public static void send(String name) throws ExecutionException, InterruptedException {
Producer<String, String> producer = ProducerTest.createProducer();
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
name,
"key-" + i,
"value-" + i
);
// 異步發送并回調
producer.send(producerRecord, (metadata, exception) -> {
if(exception == null) {
System.out.println("partition: " + metadata.partition() + " offset: " + metadata.offset());
}
else {
exception.printStackTrace();
}
});
}
// 要關閉Producer執行個體
producer.close();
}
public static Producer<String, String> createProducer() {
// Producer的配置
Properties properties = new Properties();
// 服務位址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.7:9092");
// KEY的序列化器類
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// VALUE的序列化器類
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 分區器類
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kaven.kafka.producer.PartitionLoadBalancer");
return new KafkaProducer<>(properties);
}
}
Producer
自定義
Partition
負載均衡需要實作
org.apache.kafka.clients.producer.Partitioner
接口。
package com.kaven.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class PartitionLoadBalancer implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int num = Integer.parseInt(((String) key).split("-")[1]) % cluster.partitionCountForTopic(topic);
System.out.println(key + " : " + num);
return num;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
PartitionLoadBalancer
根據消息
key
的最後一位數字(這裡根據自己的需求來設計)來選擇分區。
key-0 : 0
key-1 : 1
key-2 : 2
key-3 : 0
key-4 : 1
key-5 : 2
key-6 : 0
key-7 : 1
key-8 : 2
key-9 : 0
partition: 2 offset: 37
partition: 2 offset: 38
partition: 2 offset: 39
partition: 0 offset: 42
partition: 0 offset: 43
partition: 0 offset: 44
partition: 0 offset: 45
partition: 1 offset: 50
partition: 1 offset: 51
partition: 1 offset: 52