天天看點

大資料(076)Spark【Spark Streaming之Spark Streaming接收并處理Kafka資料】

一、啟動Kafka

        我們之前已經安裝過Kafka,現在遠端至三台伺服器(node1、node2、node3),在每一台伺服器上執行下面指令,啟動Zookeeper。

cd /home/kafka_2.10-0.8.2.1
zkServer.sh start
           

        然後按遠端至每一台伺服器,執行下面指令,啟動Kafka

cd /home/kafka_2.10-0.8.2.1
bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
           

        執行下面指令,通過kafka建立一個生産者發資料,其使用的topic為20200505,這個topic是我們之前已經建立好了的。

cd /home/kafka_2.10-0.8.2.1
./bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic 20200505
           

二、 Spark程式如下,運作她

package com.zjt.spark.streaming;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaReceiverWordCount {

	// ./bin/kafka-topics.sh --zookeeper spark001:2181,spark002:2181,spark003:81 --topic wordcount --replication-factor 1 --partitions 1 --create
	// ./bin/kafka-console-producer.sh --topic wordcount --broker-list spark001:9092,spark002:9092,spark003:9092
	
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]");
		JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(5));
		
		// 這個比較重要,是對應你給topic用幾個線程去拉取資料
		Map<String, Integer> topicThreadMap = new HashMap<String,Integer>();
		topicThreadMap.put("20200505", 3);
		
		// kafka這種建立的流,是pair的形式,有倆個值,但第一個值通常都是Null啊
		JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(
				jssc, 
				"node1:2181,node2:2181,node3:2181",
				"WordcountConsumerGroup",  
				topicThreadMap);
		
		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>(){

			private static final long serialVersionUID = 1L;

			@Override
			public Iterable<String> call(Tuple2<String,String> tuple) throws Exception {
			 	return Arrays.asList(tuple._2.split(" "));
			}
			
		});
		
		JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){

			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Integer> call(String word) throws Exception {
				return new Tuple2<String, Integer>(word, 1);
			}
			
		});
		
		JavaPairDStream<String, Integer> wordcounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){

			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
			
		});
		
		wordcounts.print();
		
		jssc.start();
		jssc.awaitTermination();
		jssc.close();
	}
}
           

三、在Kafka建立的生産者控制台随機輸入一些字元串,觀察Spark程式的輸出

        生成資料如下

大資料(076)Spark【Spark Streaming之Spark Streaming接收并處理Kafka資料】

        SparkStreaming輸出如下

大資料(076)Spark【Spark Streaming之Spark Streaming接收并處理Kafka資料】