一、啟動Kafka
我們之前已經安裝過Kafka,現在遠端至三台伺服器(node1、node2、node3),在每一台伺服器上執行下面指令,啟動Zookeeper。
cd /home/kafka_2.10-0.8.2.1
zkServer.sh start
然後按遠端至每一台伺服器,執行下面指令,啟動Kafka
cd /home/kafka_2.10-0.8.2.1
bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
執行下面指令,通過kafka建立一個生産者發資料,其使用的topic為20200505,這個topic是我們之前已經建立好了的。
cd /home/kafka_2.10-0.8.2.1
./bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic 20200505
二、 Spark程式如下,運作她
package com.zjt.spark.streaming;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
public class KafkaReceiverWordCount {
// ./bin/kafka-topics.sh --zookeeper spark001:2181,spark002:2181,spark003:81 --topic wordcount --replication-factor 1 --partitions 1 --create
// ./bin/kafka-console-producer.sh --topic wordcount --broker-list spark001:9092,spark002:9092,spark003:9092
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]");
JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(5));
// 這個比較重要,是對應你給topic用幾個線程去拉取資料
Map<String, Integer> topicThreadMap = new HashMap<String,Integer>();
topicThreadMap.put("20200505", 3);
// kafka這種建立的流,是pair的形式,有倆個值,但第一個值通常都是Null啊
JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(
jssc,
"node1:2181,node2:2181,node3:2181",
"WordcountConsumerGroup",
topicThreadMap);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>(){
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(Tuple2<String,String> tuple) throws Exception {
return Arrays.asList(tuple._2.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordcounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordcounts.print();
jssc.start();
jssc.awaitTermination();
jssc.close();
}
}
三、在Kafka建立的生産者控制台随機輸入一些字元串,觀察Spark程式的輸出
生成資料如下

SparkStreaming輸出如下