天天看點

Java用戶端程式設計 kafka生産者與消費者代碼記錄

Kafka生産者代碼:往topic裡面寫東西

import java.util.Properties;
import scala.collection.Seq;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class ProducerDemo {
   
public static void main(String []args) throws InterruptedException    {
	   //配置檔案
	   Properties props =new Properties();
	   
	   //zzookeeper伺服器清單
	   props.put("zk.connect", "192.168.146.100:2181,192.168.146.101:2181,192.168.146.102:2181");
	   
	   //borker 清單
	   props.put("metadata.broker.list", "192.168.146.100:9092,192.168.146.101:9092,192.168.146.102:9092");
	   
	   //消息序列化機制   資料類型為String就需要用kafka.serializer.StringEncoder,資料類型為别的就要用别的,系統沒有的自己寫一個序列化
	   props.put("serializer.class", "kafka.serializer.StringEncoder");
	   
	   //把  props重新封裝成 生産者對象
	   ProducerConfig config = new ProducerConfig(props);
	   
	   //資料類型為String
		Producer<String, String> producer = new Producer<String, String>(config);
		//發送100條資訊
		for (int i = 1; i <= 100; i++) {
			Thread.sleep(500);
			//發送消息 test1 為已經建立的topic
			KeyedMessage<String, String> messages=new KeyedMessage<String, String>("test1",
					"This is test1 Num:" + i );
			producer.send( messages  );
		}
   }
}
           

錯誤記錄:導入priducer包得時候導入 import kafka.javaapi.producer.Producer,我導入得時候為import kafka.producer.Producer; 提示send錯誤,利用提示改正還是運作錯誤

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
	SLF4J: Defaulting to no-operation (NOP) logger implementation
	SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
	Exception in thread "main" java.lang.ClassCastException: kafka.producer.KeyedMessage 				   cannot be cast to scala.collection.Seq
		at ProducerDemo.main(ProducerDemo.java:26)
           

消息實時檢視資訊:

Java用戶端程式設計 kafka生産者與消費者代碼記錄

Kafka消費者代碼:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;


public class ConsumerDemo {
	public static void main(String []agrs){
		
	Properties props =new Properties();
	props.put("zookeeper.connect", "192.168.146.100:2181,192.168.146.101:2181,192.168.146.102:2181");
	
	//給消費者分組,這裡分了一組 也可以不分
	props.put("group.id", "1");
	
	//讀的資訊,從開頭開始讀
	props.put("auto.offset.reset","smallest");
	
	//将props封裝為消費者的配置對象
	ConsumerConfig config=new ConsumerConfig(props);
	//拿到一個消費者用戶端
	ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
 	
	Map<String,Integer>topicCountMap=new HashMap<String,Integer>();
	// topic名稱 ,第二個參數 是線程數量
	topicCountMap.put("test1", 2);  
	
	//可以傳入好多topic
	//topicCountMap.put("test2", 1);
	
	//獲得消息流
	Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);//需要傳一個Map參數
	
	//get map裡面的topic  -test1  ,拿出來是test1裡面的消息,為什麼是t<KafkaStream<byte[],byte[]>,兩個元素,因為卡夫卡KafkaStream包含消息和中繼資料。 比如一些消息管理資料
	List<KafkaStream<byte[],byte[]>>streams=consumerMap.get("test1");
	
	for(final KafkaStream<byte[],byte[]> kafkaStream : streams){
		new Thread(new Runnable(){

			@Override
			public void run() {
			//周遊流kafkaStream對象
				for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
				//mm消息拿出來是序列化的,必須用String格式轉換
					String msg = new String(mm.message());
					System.out.println(msg);
				}
			}
		}).start();
		
/*  關于Runnable的另一種寫法
	Runnable runnble=new Runnable( )  {
			@Override
			public void run() {
				for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
					String msg = new String(mm.message());
					System.out.println(msg);
				}
			}} ;
		Thread thread =new Thread(runnble);
		thread.start();
*/
           

資訊:

在生産者發送

Java用戶端程式設計 kafka生産者與消費者代碼記錄

資訊實時顯示

Java用戶端程式設計 kafka生産者與消費者代碼記錄

Runnable()是接口在上面的寫法并不是執行個體化,看起來是直接new一個接口,實際上是匿名内部類

同等與這種寫法,執行個體化了一個Runnable接口子類的執行個體。

Thread t=new Thread(new MyRunnable());

public class MyRunnable implements Runnable{

@Override

public void run() {

//具體實作

}

}