天天看点

kafka生产者客户端——那年我趟过的坑

前言:

最近有个需求需要用到kafka中间件做消息队列的中间件,于是看了某博客上的一些kafka生产者博文,参考地写了个程序,但是It dosn"t work.一开始还以为我编码问题,但

我发现代码很多地方都有黄色警告,提示行代码已不被推荐,再加上之前的依赖包也太老,在maven repository中已经找不到了,我才意识到是可能是方法过时了。于是我只好去官网看api,这里推荐两个网站很好地学习kafka, kafka官网和 kafka api中文翻译教程。但中间我还遇到一个坑,就是依赖包问题,官网推荐使用的maven依赖

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.1.0</version>
    </dependency>
           

我试了下没有成功,就是debug时候一直停留在send方法那里,要过差不多2分钟才执行完这方法,完了后我在服务器那查看通topic的数据,还发现没看到这些数据,让我很郁闷,最后摸索了很久,换了低一点的maven依赖才得以解决。下面看我的代码

producer代码:

我的maven依赖如下,使用了低版本的0.8,另外我还试了其他的一些版本,0.10没有成功,0.9也成功了,我至今都不知道不成功的原因,如果有人知道麻烦给我评论 先感谢了!
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>0.8.2.0</version>
</dependency> 
           

最后是发送端代码,很简单,在关键处我都写了注释,

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;



public class SendDataToKafka {

	public static void main(String[] args) {
		String topic = "simon_test2";
		Properties prop = new Properties();
		prop.put("bootstrap.servers", "127.0.0.1:9092"); //主机ip:port
		prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");//key类型序列化(String型)
	        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value类型序列化(String型)
		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
		int messageNo = 1;
		String msg = "data_";
		while(true){
			msg = msg + messageNo;
			producer.send(new ProducerRecord<String, String>(topic,msg));
			System.out.println("Send:" + msg);
			msg = "data_";
			messageNo++;
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

}
           

需要注意的是在设置kafka的send方法发送的Record里有三个概念,1.topic,2.key,3.value。 有意思的是kafka的record有点像redis中的hash数据类型,redis中的hash是key field value的方式存储的,但有点不同的kafka的key不是必须的,而且它不像redis中的hash无序,它是有序的,有点像list那样按先后顺序来存储数据。还有一点要提的是kafka支持发送两种数据类型,一种是String类型,一种是byte[]二进制数组,这在配置参数中一一对应。 String类型对应StringSerializer类,它只能传输String类型数据

prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//value类型序列化(String型)
           

byte[]类型对应DefaultSerializer类,它能传输String与byte[]类型数据

prop.put("value.serializer", "org.apache.kafka.common.serialization.DefaultSerializer");//value类型序列化(String型与byte[])
           

最后:

Kafka很多时候用ip直连的话是不被支持的,原因是kafka服务器配置时使用了host域名,这时你也要在你电脑上配置使用域名了。另外由于我的任务只需要写生产者,虽然我也研究了下消费者,但是没有深入,后续还会duikafka学习心得进行更新。