[toc]
spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校amplab,2010年开源,2013年6月成为apache孵化项目,2014年2月成为apache顶级项目。目前,spark生态系统已经发展成为一个包含多个子项目的集合,其中包含sparksql、spark streaming、graphx、mllib等子项目,spark是基于内存计算的大数据并行计算框架。spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将spark部署在大量廉价硬件之上,形成集群。spark得到了众多大数据公司的支持,这些公司包括hortonworks、ibm、intel、cloudera、mapr、pivotal、百度、阿里、腾讯、京东、携程、优酷土豆。当前百度的spark已应用于凤巢、大搜索、直达号、百度大数据等业务;阿里利用graphx构建了大规模的图计算和图挖掘系统,实现了很多生产系统的推荐算法;腾讯spark集群达到8000台的规模,是当前已知的世界上最大的spark集群。
在这里必须对比mapreduce,mapreduce最大的性能短板就在于shuffle过程中,会将中间结果输出到磁盘上(也就是hdfs上),这个过程中至少会产生6次的io。也正是这些频繁的io使得mr的性能不尽人意。
对于spark来说,中间结果是都在内存中的(checkpoint另说),就从这点来说,就少了很多io导致的性能问题。当然这只是其中一点,后面会细说
与hadoop的mapreduce相比,spark基于内存的运算速度要快100倍以上,即使,spark基于硬盘的运算也要快10倍。spark实现了高效的dag执行引擎,从而可以通过内存来高效处理数据流。
spark支持java、python和scala的api,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且spark支持交互式的python和scala的shell,可以非常方便地在这些shell中使用spark集群来验证解决问题的方法。
spark提供了统一的解决方案。spark可以用于批处理、交互式查询(spark sql)、实时流处理(spark streaming)、机器学习(spark mllib)和图计算(graphx)。这些不同类型的处理都可以在同一个应用中无缝使用。spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。
另外spark还可以很好的融入hadoop的体系结构中可以直接操作hdfs,并提供hive on spark、pig on spark的框架集成hadoop。
spark可以非常方便地与其他的开源产品进行融合。比如,spark可以使用hadoop的yarn和apache mesos作为它的资源管理和调度器,并且可以处理所有hadoop支持的数据,包括hdfs、hbase和cassandra等。这对于已经部署hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用spark的强大处理能力。spark也可以不依赖于第三方的资源管理和调度器,它实现了standalone作为其内置的资源管理和调度框架,这样进一步降低了spark的使用门槛,使得所有人都可以非常容易地部署和使用spark。此外,spark还提供了在ec2上部署standalone的spark集群的工具。
spark生态圈:
spark core :最重要,其中最重要的是 rdd (弹性分布式数据集)
spark sql :类似于hive 使用sql语句操作rdd dataframe(表)
spark streaming : 流式计算
前面三个用到比较多,后面这两个看需求吧
spark mllib :spark机器学习类库
spark graphx : 图计算
图2.1 spark架构
spark大致有几个大组件,分别为:driver、master(cluster manager)、worker。
图2.2 spark工作任务图
上面这图说明了每个组件的功能。
spark可以部署在以上几种环境之上:
standalone:spark内置的资源管理器
yarn:hadoop的资源管理器
mesos
amazon ec2
使用scala版本为scala2.11.8,spark版本为spark-2.1.0-bin-hadoop2.7。
jdk版本1.8,hadoop版本2.8.4
解压好spark程序之后,进入解压目录下。修改配置文件:
配置完成后,启动集群:
基本和伪分布式是一样的,也就是 conf/slaves文件中配置多几个worker节点而已,然后照样启动集群就ok了。
搭建完成了可以进入 http://masterip:8080 查看集群状态
在spark中,master节点作为整个集群的管理者,是单点的,容易发生单点故障,所以为了保障master节点的可用性,需要给它实现ha
主要用于开发或测试环境。spark提供目录保存spark application和worker的注册信息,并将他们的恢复状态信息写入该目录中,这时,一旦master发生故障,就可以通过重新启动master进程(sbin/start-master.sh),恢复已运行的spark application和worker的注册信息。
基于文件系统的单点恢复,主要是在spark-env.sh里spark_daemon_java_opts设置以下内容:
要注意的是,这种方式本质上还是只有一个master节点,只不过是重启master节点时可以自动还原worker以及application信息,防止master挂了之后,所有任务都丢失执行状态,然后master重启之后需要重新从头到尾执行之前的任务。
zookeeper提供了一个leader election机制,利用这个机制可以保证虽然集群存在多个master,但是只有一个是active的,其他的都是standby。当active的master出现故障时,另外的一个standby master会被选举出来。由于集群的信息,包括worker, driver和application的信息都已经持久化到zookeeper,因此在切换的过程中只会影响新job的提交,对于正在进行的job没有任何的影响。
这里分别用两台主机配置master节点,而worker节点仍然是单节点(为了方便起见而已)。首先需保证zookeeper服务的正常运行。这里不重复讲,可以看之前zookeeper的文章。这里直接讲spark 的配置。
修改spark-env.sh配置文件
以上配置需要保证在整个spark集群的所有master和worker节点所在主机的配置一样。
配置完成后,启动集群
启动完成后,可以到两个master的管理页面上看对应的状态:
接着我们看看zookeeper上存储什么信息:
spark提供了一些实例程序,
spark提供了两个工具用于提交执行spark任务,分别是spark-shell和spark-submit
一般用在生产环境中用于提交任务到集群中执行
例子:蒙特卡罗求pi
一般在生产环境中,在ide中编写完spark程序后,会打包成jar包,然后上传到集群中。通过上面的spark-submit命令,将任务提交至集群中执行
spark-shell是spark自带的交互式shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。一般用于测试
有两种运行模式:
例子:在spark shell中编写wordcount程序
首先需要idea配置好scala开发环境。
到插件中心安装scala插件。
创建maven工程,然后add framework support添加scala支持
到project structure添加scala源码文件夹
最后右键就可以看到可以创建scala class 的选项了。
注意:本地得安装scala以及jdk
配置好scala环境后,需要添加spark对应的maven依赖,添加依赖到pom.xml中:
记住上面的关于build的配置千万不要漏掉。这里说说我之前的遇到的小坑。
小坑:
wordcount实例代码: