Kafka生産者代碼:往topic裡面寫東西
import java.util.Properties;
import scala.collection.Seq;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerDemo {
public static void main(String []args) throws InterruptedException {
//配置檔案
Properties props =new Properties();
//zzookeeper伺服器清單
props.put("zk.connect", "192.168.146.100:2181,192.168.146.101:2181,192.168.146.102:2181");
//borker 清單
props.put("metadata.broker.list", "192.168.146.100:9092,192.168.146.101:9092,192.168.146.102:9092");
//消息序列化機制 資料類型為String就需要用kafka.serializer.StringEncoder,資料類型為别的就要用别的,系統沒有的自己寫一個序列化
props.put("serializer.class", "kafka.serializer.StringEncoder");
//把 props重新封裝成 生産者對象
ProducerConfig config = new ProducerConfig(props);
//資料類型為String
Producer<String, String> producer = new Producer<String, String>(config);
//發送100條資訊
for (int i = 1; i <= 100; i++) {
Thread.sleep(500);
//發送消息 test1 為已經建立的topic
KeyedMessage<String, String> messages=new KeyedMessage<String, String>("test1",
"This is test1 Num:" + i );
producer.send( messages );
}
}
}
錯誤記錄:導入priducer包得時候導入 import kafka.javaapi.producer.Producer,我導入得時候為import kafka.producer.Producer; 提示send錯誤,利用提示改正還是運作錯誤
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" java.lang.ClassCastException: kafka.producer.KeyedMessage cannot be cast to scala.collection.Seq
at ProducerDemo.main(ProducerDemo.java:26)
消息實時檢視資訊:
Kafka消費者代碼:
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class ConsumerDemo {
public static void main(String []agrs){
Properties props =new Properties();
props.put("zookeeper.connect", "192.168.146.100:2181,192.168.146.101:2181,192.168.146.102:2181");
//給消費者分組,這裡分了一組 也可以不分
props.put("group.id", "1");
//讀的資訊,從開頭開始讀
props.put("auto.offset.reset","smallest");
//将props封裝為消費者的配置對象
ConsumerConfig config=new ConsumerConfig(props);
//拿到一個消費者用戶端
ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);
Map<String,Integer>topicCountMap=new HashMap<String,Integer>();
// topic名稱 ,第二個參數 是線程數量
topicCountMap.put("test1", 2);
//可以傳入好多topic
//topicCountMap.put("test2", 1);
//獲得消息流
Map<String,List<KafkaStream<byte[],byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);//需要傳一個Map參數
//get map裡面的topic -test1 ,拿出來是test1裡面的消息,為什麼是t<KafkaStream<byte[],byte[]>,兩個元素,因為卡夫卡KafkaStream包含消息和中繼資料。 比如一些消息管理資料
List<KafkaStream<byte[],byte[]>>streams=consumerMap.get("test1");
for(final KafkaStream<byte[],byte[]> kafkaStream : streams){
new Thread(new Runnable(){
@Override
public void run() {
//周遊流kafkaStream對象
for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
//mm消息拿出來是序列化的,必須用String格式轉換
String msg = new String(mm.message());
System.out.println(msg);
}
}
}).start();
/* 關于Runnable的另一種寫法
Runnable runnble=new Runnable( ) {
@Override
public void run() {
for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){
String msg = new String(mm.message());
System.out.println(msg);
}
}} ;
Thread thread =new Thread(runnble);
thread.start();
*/
資訊:
在生産者發送
資訊實時顯示
Runnable()是接口在上面的寫法并不是執行個體化,看起來是直接new一個接口,實際上是匿名内部類
同等與這種寫法,執行個體化了一個Runnable接口子類的執行個體。
Thread t=new Thread(new MyRunnable());
public class MyRunnable implements Runnable{
@Override
public void run() {
//具體實作
}
}