kafka環境搭建與實戰(1)安裝kafka http://zilongzilong.iteye.com/blog/2267913
kafka環境搭建與實戰(2)kafka API實戰 http://zilongzilong.iteye.com/blog/2267924
1.maven項目中添加依賴
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
2.spring內建kafka
與spring內建spring-kafka.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="testProducer" class="org.apache.kafka.clients.producer.KafkaProducer" scope="prototype">
<constructor-arg type="java.util.Properties">
<props>
<prop key="bootstrap.servers">kafka0:9092,kafka1:9092,kafka2</prop>
<prop key="acks">all</prop>
<prop key="retries">0</prop>
<prop key="batch.size">16384</prop>
<prop key="linger.ms">1</prop>
<prop key="buffer.memory">33554432</prop>
<prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
<prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
<prop key="partitioner.class">com.***.kafka.Partitioner.RandomPartitioner</prop>
</props>
</constructor-arg>
</bean>
<bean id="group1Consumer" class="org.apache.kafka.clients.consumer.KafkaConsumer" scope="prototype">
<constructor-arg type="java.util.Properties">
<props>
<prop key="bootstrap.servers">kafka0:9092,kafka1:9092,kafka2:9092</prop>
<prop key="group.id">group1</prop>
<prop key="enable.auto.commit">true</prop>
<prop key="auto.commit.interval.ms">1000</prop>
<prop key="session.timeout.ms">30000</prop>
<prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
<prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
</props>
</constructor-arg>
</bean>
<bean id="group2Consumer" class="org.apache.kafka.clients.consumer.KafkaConsumer" scope="prototype">
<constructor-arg type="java.util.Properties">
<props>
<prop key="bootstrap.servers">kafka0:9092,kafka1:9092,kafka2:9092</prop>
<prop key="group.id">group2</prop>
<prop key="enable.auto.commit">true</prop>
<prop key="auto.commit.interval.ms">1000</prop>
<prop key="session.timeout.ms">30000</prop>
<prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
<prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
</props>
</constructor-arg>
</bean>
</beans>
3.producer使用
自己的partion政策類
public class RandomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
Random random = new Random();
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (numPartitions > 0) {
return Math.abs(random.nextInt()) % numPartitions;
} else {
return 0;
}
}
@Override
public void close() {
}
}
public class ProducerUtil {
private static Producer<String, String> producer = SpringContextHolder.getBean("testProducer");
public static void produce(String message) {
producer.send(new ProducerRecord<String, String>("test",message));
}
}
4.consumer使用
public class KafkaServletContextListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new Runnable() {
@Override
public void run() {
KafkaConsumer<String, String> consumer = SpringContextHolder.getBean("group1Consumer");
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
record.key();
record.offset();
record.partition();
record.topic();
record.value();
//TODO
}
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
KafkaConsumer<String, String> consumer = SpringContextHolder.getBean("group2Consumer");
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
record.key();
record.offset();
record.partition();
record.topic();
record.value();
//TODO
}
}
}
});
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
在web.xml中添加listener
<listener>
<listener-class>com.***.web.listener.KafkaServletContextListener</listener-class>
</listener>
5.kafka中遇到的錯誤
5.1 消費者在消費消息的時候,一直報如下錯誤:
ILLEGAL_GENERATION occurred while committing offsets for group
在網上找到一篇文章http://comments.gmane.org/gmane.comp.apache.kafka.user/10708,但是按照這個調整了auto.commit.interval.ms和session.timeout.ms,但是還是無濟于事。
最後的根本解決辦法是,優化消費者的處理邏輯,因為我在消費這種用到了jredis,jredis中對于exist、get時間複雜度為o(1),而smembers方法時間複雜度為o(N),我做的是一是優化代碼,二是盡量優先用jredis中對于exist、get方法,然後部署上去,問題經觀察,沒有出現了。
總之,解決辦法是優化消費者代碼,減少耗時,讓消費者能及時回報消費狀态給zookeeper