Spark Streaming 实时计算框架
-
- 一、实时计算概述
-
- 1.1 什么是实时计算?
- 1.2 常用的实时计算框架
- 二、Spark Streaming
-
- 2.1 Spark Streaming 介绍
- 2.2 Spark Streaming 工作原理
- 2.2 Spark Streaming工作机制
- 2.3 Spark Streaming程序的基本步骤
- 2.4 创建 StreamingContext对象
- 三、Spark Streaming 操作
-
- 3.1 DStream简介
- 3.2 DStream 编程模型
- 3.3 DStream 转换操作
- 3.4 DStream API 转换操作
- 3.5 IDEA安装依赖
- 3.6 启动服务端监听 Socket 服务
- 3.7 实现 transform() 方法,分割多个单词
- 3.8 UpdateStateByKeyTest 更新值
- 3.9 Dstream 窗口操作
- 3.10 DStream 输出操作
1、MySQL安装教程
2、Spark Streaming 实现网站热词排序
3、Spark Streaming 整合 Kafka,实现交流
一、实时计算概述
近年来,在Web应用、网络监控、传感监测、电信金融、生产制造等领域,增强了对数据实时处理的需求,而 Spark 中的 Spark Streaming 实时计算框架就是为实现对数据实时处理的需求而设计。
在电子商务中,淘宝、京东网站从用户点击的行为和浏览的历史记录中发现用户的购买意图和兴趣,然后通过 Spark Streaming 实时计算框架的分析处理,为之推荐相关商品,从而有效地提高商品的销售量,同时也增加了用户的满意度,可谓是“一举二得”。
本章主要对 Spark Streaming 实时计算框架相关知识进行介绍。
1.1 什么是实时计算?
在传统的数据处理流程(离线计算)中,复杂的业务处理流程会造成结果数据密集,结果数据密集则存在数据反馈不及时,若是在实时搜索的应用场景中,需要实时数据做决策,而传统的数据处理方式则并不能很好地解决问题,这就引出了一种新的数据计算——实时计算,它可以针对海量数据进行实时计算,无论是在数据采集还是数据处理中,都可以达到秒级别的处理要求。
简单来说,实时计算就是在数据采集与数据处理中,都可以达到秒级别的处理要求。
1.2 常用的实时计算框架
-
Apache Spark Streaming
Apache公司开源的实时计算框架。Apache Spark Streaming主要是把输入的数据按时间进行切分,切分的数据块并行计算处理,处理的速度可以达到秒级别。
-
Apache Storm
Apache公司开源的实时计算框架,它具有简单、高效、可靠地实时处理海量数据,处理数据的速度达到毫秒级别,并将处理后的结果数据保存到持久化介质中(如数据库、HDFS)。
-
Apache Flink
Apache公司开源的实时计算框架。Apache Spark Streaming 主要是把输入的数据按时间进行切分,切分的数据块并行计算处理,处理的速度可以达到秒级别。
-
Yahoo! s4
Yahoo公司开源的实时计算平台。Yahoo ! S4是通用的、分布式的、可扩展的,并且还具有容错和可插拔能力,供开发者轻松地处理源源不断产生的数据。
二、Spark Streaming
2.1 Spark Streaming 介绍
Spark Streaming 是构建在Spark上的实时计算框架,且是对Spark Core API的一个扩展,它能够实现对流数据进行实时处理,并具有很好的可扩展性、高吞吐量和容错性。Spark Streaming具有易用性、容错性及易整合性的显著特点。
- 易用性:
- 容错性
- 易整合性
2.2 Spark Streaming 工作原理
Spark Streaming支持从多种数据源获取数据,包括 Kafka 、 Flume 、Twitter 、ZeroMQ 、Kinesis、TCP Sockets 数据源。当Spark Streaming 从数据源获取数据之后,则可以使用诸如map 、reduce 、join和window等高级函数进行复杂的计算处理,最后将处理结果存储到分布式文件系统、数据库中,最终利用实时计算实现操作。
Spark Streaming 会对输入的数据源进行处理,然后将结果输出,Spark Streaming 在接受实时传入的数据流时,会将数据按批次(batch)进行划分,然后,再将这部分数据交由Spark引擎进行处理,处理完成后将结果输出到外部文件。
2.2 Spark Streaming工作机制
- 在Spark Streaming中,会有一个组件Receiver,作为一个长期运行的task跑在一个Executor上;
- 每个Receiver都会负责一个input DStream(比如从文件中读取数据的文件流,比如套接字流,或者从Kafka中读取的一个输入流等等);
- Spark Streaming通过input DStream与外部数据源进行连接,读取相关数据。
2.3 Spark Streaming程序的基本步骤
- 通过创建输入DStream来定义输入源;
- 通过对DStream应用转换操作和输出操作来定义流计算;
- 用streamingContext.start()来开始接收数据和处理流程;
- 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束);
- 可以通过streamingContext.stop()来手动结束流计算进程。
- 运行一个Spark Streaming程序,首先要生成一个 StreamingContext 对象,它是Spark Streaming程序的主入口
- 可以从一个SparkConf对象创建一个StreamingContext对象
- 登录Linux系统后,启动spark-shell。进入spark-shell以后,就已经获得了一个默认的SparkConext,也就是sc。
2.4 创建 StreamingContext对象
在交互式界面:
import org.apache.spark.streaming._ # 导包
val ssc = new StreamingContext(sc, Seconds(1)) # 创建对象,间隔时间1秒
Spark Streaming程序代码:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("主程序名").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))
三、Spark Streaming 操作
3.1 DStream简介
Spark Streaming 提供了一个高级抽象的流,即DStream (离散流) 。DStream 表示连续的数据流,可以通过 Kafka 、Flume、kinesis 等数据源创建,也可以通过现有DStream的高级操作来创建。DStream 的内部结构是由一系列连续的RDD组成,每个RDD都是一小段时间分隔开来的数据集。对DStream的任何操作,最终都会转变成对底层RDDs的操作。
3.2 DStream 编程模型
批处理引擎 Spark Core 把输入的数据按照一定的时间片(如1s) 分成一段一段的数据,每一段数据都会转换成RDD输入到Spark Core中,然后将 DStream 操作转换为RDD算子的相关操作,即转换操作、窗口操作、输出操作。RDD算子操作产生的中间结果数据会保存在内存中,也可以将中间的结果数据输出到外部存储系统中进行保存。
3.3 DStream 转换操作
Spark Streaming 中对 DStream 的转换操作都要转换为对RDD的转换操作。
其中, lines 表示转换操作前的DStream , words 表示转换操作后生成的 DStream 。对lines做flatMap转换操作,也就是对它内部的所有RDD做flatMap转换操作。
3.4 DStream API 转换操作
3.5 IDEA安装依赖
<properties>
<scala.version>2.11.8</scala.version>
<hadoop.version>2.7.3</hadoop.version>
<spark.version>2.4.0</spark.version>
<hbase.version>1.2.4</hbase.version>
</properties>
<dependencies>
<!--Scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>
<!--Spark & Kafka-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<!--Spark & flume-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!--Hadoop-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--Hbase-->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
</dependency>
</dependencies>
在之前安装过的只需要安装:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.4.0</version>
</dependency>
3.6 启动服务端监听 Socket 服务
命令:
nc -lk 9999
3.7 实现 transform() 方法,分割多个单词
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object TransformTest {
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName("TransformTest").setMaster("local[2]")
// 2.创建SparkContext对象,它是所有任务计算的源头
val sc: SparkContext = new SparkContext(sparkConf)
// 3.设置日志级别
sc.setLogLevel("WARN")
// 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
// 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
// 以上是固定搭配结构
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
// 6.使用RDD-to-RDD函数,返回新的DStream对象(即words),并空格切分每行
val words: DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))
// 7.打印输出结果
words.print()
// 8.开启流式计算
ssc.start()
// 9.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
3.8 UpdateStateByKeyTest 更新值
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object UpdateStateByKeyTest {
//newValues 表示当前批次汇总成的(word,1)中相同单词的所有1
//runningCount 表示历史的所有相同key的value总和
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount =runningCount.getOrElse(0)+newValues.sum
Some(newCount)
}
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
// 2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
val sc: SparkContext = new SparkContext(sparkConf)
// 3.设置日志级别
sc.setLogLevel("WARN")
// 4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
// 5.配置检查点目录,使用updateStateByKey方法必须配置检查点目录
ssc.checkpoint("./")
// 6.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
// 7.按空格进行切分每一行,并将切分的单词出现次数记录为1
val wordAndOne: DStream[(String, Int)] = dstream.flatMap(_.split(" ")).map(word =>(word,1))
// 8.调用updateStateByKey操作,统计单词在全局中出现的次数
var result: DStream[(String, Int)] = wordAndOne.updateStateByKey(updateFunction)
// 9.打印输出结果
result.print()
// 10.开启流式计算
ssc.start()
// 11.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
3.9 Dstream 窗口操作
- 事先设定一个滑动窗口的长度(也就是窗口的持续时间);
- 设定滑动窗口的时间间隔(每隔多长时间执行一次计算),让窗口按照指定时间间隔在源DStream上滑动;
- 每次窗口停放的位置上,都会有一部分Dstream(或者一部分RDD)被框入窗口内,形成一个小段的Dstream;
- 可以启动对这个小段DStream的计算。
方法名称 | 相关说明 |
---|---|
window(windowLength, slideInterval) | 基于源DStream产生的窗口化的批数据,计算得到一个新的Dstream; |
countByWindow(windowLength, slideInterval) | 返回流中元素的一个滑动窗口数; |
reduceByWindow(func, windowLength, slideInterval) | 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算; |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 更加高效的 reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce‖操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)。 |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 当应用到一个(K,V)键值对组成的DStream上,返回一个由(K,V)键值对组成的新的DStream;每个key的值都是它们在滑动窗口中出现的频率。 |
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream,ReceiverInputDStream}
object WindowTest {
def main(args: Array[String]): Unit = {
// 1.创建SparkConf对象
val sparkConf: SparkConf = new SparkConf()
.setAppName("WindowTest ").setMaster("local[2]")
// 2.创建SparkContext对象,它是所有任务计算的源头
val sc: SparkContext = new SparkContext(sparkConf)
// 3.设置日志级别
sc.setLogLevel("WARN")
// 4.创建StreamingContext,需要两个参数,分别为SparkContext和批处理时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(1))
// 5.连接socket服务,需要socket服务地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc
.socketTextStream("192.168.142.128",9999)
// 6.按空格进行切分每一行
val words: DStream[String] = dstream.flatMap(_.split(" "))
// 7.调用window操作,需要两个参数,窗口长度和滑动时间间隔
val windowWords: DStream[String] = words.window(Seconds(3),Seconds(1))
// 8.打印输出结果
windowWords.print()
// 9.开启流式计算
ssc.start()
// 10.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}
3.10 DStream 输出操作
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
object SaveAsTextFilesTest {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
//1.创建SparkConf对象 设置appName和master地址 local[2] 表示本地采用2个线程运行任务
val sparkConf: SparkConf = new SparkConf().setAppName("SaveAsTextFilesTest").setMaster("local[2]")
//2.创建SparkContext对象,它是所有任务计算的源头,它会创建DAGScheduler和TaskScheduler
val sc: SparkContext = new SparkContext(sparkConf)
//3.设置日志级别
sc.setLogLevel("WARN")
//4.创建StreamingContext,需要2个参数,一个是SparkContext,一个是批处理的时间间隔
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
//5.对接socket数据创建DStream对象,需要socket服务的地址、端口号及存储级别(默认的)
val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.142.128",9999)
//6.调用saveAsTextFiles操作,将nc窗口输出的内容保存到HDFS上
dstream.saveAsTextFiles("hdfs://master:8020//saveAsTextFiles/satf","txt")
//7.开启流式计算
ssc.start()
//8.让程序一直运行,除非人为干预停止
ssc.awaitTermination()
}
}