天天看点

大数据(076)Spark【Spark Streaming之Spark Streaming接收并处理Kafka数据】

一、启动Kafka

        我们之前已经安装过Kafka,现在远程至三台服务器(node1、node2、node3),在每一台服务器上执行下面命令,启动Zookeeper。

cd /home/kafka_2.10-0.8.2.1
zkServer.sh start
           

        然后按远程至每一台服务器,执行下面命令,启动Kafka

cd /home/kafka_2.10-0.8.2.1
bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
           

        执行下面命令,通过kafka创建一个生产者发数据,其使用的topic为20200505,这个topic是我们之前已经创建好了的。

cd /home/kafka_2.10-0.8.2.1
./bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic 20200505
           

二、 Spark程序如下,运行她

package com.zjt.spark.streaming;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

public class KafkaReceiverWordCount {

	// ./bin/kafka-topics.sh --zookeeper spark001:2181,spark002:2181,spark003:81 --topic wordcount --replication-factor 1 --partitions 1 --create
	// ./bin/kafka-console-producer.sh --topic wordcount --broker-list spark001:9092,spark002:9092,spark003:9092
	
	public static void main(String[] args) {
		SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local[2]");
		JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(5));
		
		// 这个比较重要,是对应你给topic用几个线程去拉取数据
		Map<String, Integer> topicThreadMap = new HashMap<String,Integer>();
		topicThreadMap.put("20200505", 3);
		
		// kafka这种创建的流,是pair的形式,有俩个值,但第一个值通常都是Null啊
		JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(
				jssc, 
				"node1:2181,node2:2181,node3:2181",
				"WordcountConsumerGroup",  
				topicThreadMap);
		
		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String,String>, String>(){

			private static final long serialVersionUID = 1L;

			@Override
			public Iterable<String> call(Tuple2<String,String> tuple) throws Exception {
			 	return Arrays.asList(tuple._2.split(" "));
			}
			
		});
		
		JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>(){

			private static final long serialVersionUID = 1L;

			@Override
			public Tuple2<String, Integer> call(String word) throws Exception {
				return new Tuple2<String, Integer>(word, 1);
			}
			
		});
		
		JavaPairDStream<String, Integer> wordcounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>(){

			private static final long serialVersionUID = 1L;

			@Override
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
			
		});
		
		wordcounts.print();
		
		jssc.start();
		jssc.awaitTermination();
		jssc.close();
	}
}
           

三、在Kafka创建的生产者控制台随机输入一些字符串,观察Spark程序的输出

        生成数据如下

大数据(076)Spark【Spark Streaming之Spark Streaming接收并处理Kafka数据】

        SparkStreaming输出如下

大数据(076)Spark【Spark Streaming之Spark Streaming接收并处理Kafka数据】