消息釋出api
pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
producer.properties
生産資料的配置資訊,放在resources下面
# Kafka服務端的主機名和端口号
bootstrap.servers=mini1:9092,mini2:9092,mini3:9092
# specify the compression codec for all data generated: none, gzip, snappy, lz4
compression.type=none
# 等待所有副本節點的應答 (follower角色的分區從leader角色的分區中同步完畢消息後,給leader回報資訊)
acks=all
#消息發送最大嘗試次數
retries=1
#一批消息處理大小
batch.size=16384
# name of the partitioner class for partitioning events; default partition spreads data randomly
partitioner.class=Partition.Mypartition
# the maximum amount of time the client will wait for the response of a request
#request.timeout.ms=
# how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
#max.block.ms=
# the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
# 請求延時
linger.ms=1
# the maximum size of a request in bytes
#max.request.size=
# the default batch size in bytes when batching multiple records sent to a partition
#batch.size=
# the total bytes of memory the producer can use to buffer records waiting to be sent to the server
#發送緩存區記憶體大小
buffer.memory=33554432
#消息的key對應的序列化類
key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
#消息的value對應的序列化類
value.serializer=org.apache.kafka.common.serialization.StringSerializer
consumer.properties
# list of brokers used for bootstrapping knowledge about the rest of the cluster
# format: host1:port1,host2:port2 ...
#定義kakfa 服務的位址
bootstrap.servers=mini1:9092,mini2:9092,mini3:9092
# consumer group id
group.id=test-consumer-group
# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
#auto.offset.reset=
# 是否自動确認offset
enable.auto.commit=true
# 自動确認offset的時間間隔
auto.commit.interval.ms=500
# key的反序列化類
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# value的反序列化類
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
第一個Demo
package api;
import org.apache.kafka.clients.producer.*;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = new Properties();
try {
prop.load(CustomProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
Producer<String, String> producer = new KafkaProducer<String, String>(prop);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("first", i+"", i+"!"));
}
producer.close();
}
}
帶回調函數的Demo
package api;
import org.apache.kafka.clients.producer.*;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer1 {
/**
* 回調函數會在producer收到ack時調用,為異步調用,該方法有兩個參數,
* 分别是RecordMetadata和Exception,如果Exception為null,說明消息發送成功,
* 如果Exception不為null,說明消息發送失敗。
* 注意:消息發送失敗會自動重試,不需要我們在回調函數中手動重試。
*/
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties prop = new Properties();
try {
prop.load(CustomProducer.class.getClassLoader().getResourceAsStream("producer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
Producer<Integer, String> producer = new KafkaProducer<Integer, String>(prop);
for (int i = 0; i < 100; i++) {
final ProducerRecord<Integer, String> record
= new ProducerRecord<Integer, String>("second", i, i + "!");
producer.send(record, new Callback() {
//回調函數,該方法會在Producer收到ack時調用,為異步調用
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println("topic->" + metadata.topic()+
",partition->"+metadata.partition()+
",key->"+record.key());
} else {
exception.printStackTrace();
}
}
});
}
producer.close();
}
}
消息訂閱api
package api;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
Properties prop = new Properties();
try {
prop.load(CustomProducer.class.getClassLoader().getResourceAsStream("consumer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Arrays.asList("first"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s, partition = %s%n",
record.offset(),
record.key(),
record.value(),
record.partition()
);
}
consumer.commitSync();
/**
* 手動送出offset的方法有兩種:
* 分别是commitSync(同步送出)和commitAsync(異步送出)。
* 兩者的相同點是,都會将本次poll的一批資料最高的偏移量送出;
* 不同點是,commitSync會失敗重試,一直到送出成功(如果由于不可恢複原因導緻,也會送出失敗);
* 而commitAsync則沒有失敗重試機制,故有可能送出失敗。
*/
}
}
}
從各個分區的開始進行消費
subscribe的第二個參數
package api;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
public class CustomConsumer1 {
public static void main(String[] args) {
Properties prop = new Properties();
try {
prop.load(CustomProducer.class.getClassLoader().getResourceAsStream("consumer.properties"));
} catch (IOException e) {
e.printStackTrace();
}
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(prop);
consumer.subscribe(Arrays.asList("first","second"), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
//從各個分區的開始位置開始訂閱
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
consumer.seekToBeginning(collection);
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, offset = %d, key = %s, value = %s, partition = %s%n",
record.topic(),
record.offset(),
record.key(),
record.value(),
record.partition()
);
}
//所有消息訂閱完畢就退出
if(records.isEmpty()){
break;
}
consumer.commitSync();
/**
* 手動送出offset的方法有兩種:
* 分别是commitSync(同步送出)和commitAsync(異步送出)。
* 兩者的相同點是,都會将本次poll的一批資料最高的偏移量送出;
* 不同點是,commitSync會失敗重試,一直到送出成功(如果由于不可恢複原因導緻,也會送出失敗);
* 而commitAsync則沒有失敗重試機制,故有可能送出失敗。
*/
}
}
}
改這個參數也可
props.put("auto.offset.reset","smallest") 以前的也消費 或者 隻從新産生的開始消費