天天看点

【Spark】Spark的Shuffle机制

在mapreduce框架中,shuffle是连接map和reduce之间的桥梁,map的输出要用到reduce中必须经过shuffle这个环节,shuffle的性能高低直接影响了整个程序的性能和吞吐量。

shuffle是mapreduce框架中的一个特定的phase,介于map phase和reduce phase之间,当map的输出结果要被reduce使用时。输出结果须要按key哈希。而且分发到每个reducer上去。这个过程就是shuffle。因为shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的执行效率。

下图描写叙述了mapreduce算法的整个流程,当中shuffle phase是介于map phase和reduce phase之间:

【Spark】Spark的Shuffle机制

在hadoop, 在mapper端每次当memory buffer中的数据快满的时候, 先将memory中的数据, 按partition进行划分, 然后各自存成小文件, 这样当buffer不断的spill的时候, 就会产生大量的小文件。

所以hadoop后面直到reduce之前做的全部的事情事实上就是不断的merge, 基于文件的多路并归排序,在map端的将同样partition的merge到一起, 在reduce端, 把从mapper端copy来的数据文件进行merge, 以用于终于的reduce

多路归并排序, 达到两个目的。

merge, 把同样key的value都放到一个arraylist里面;sort, 终于的结果是按key排序的。

这个方法扩展性非常好, 面对大数据也没有问题, 当然问题在效率, 毕竟须要多次进行基于文件的多路归并排序,多轮的和磁盘进行数据读写。

【Spark】Spark的Shuffle机制

spark中的shuffle是把一组无规则的数据尽量转换成一组具有一定规则的数据。

spark计算模型是在分布式的环境下计算的。这就不可能在单进程空间中容纳全部的计算数据来进行计算。这样数据就依照key进行分区。分配成一块一块的小分区,打散分布在集群的各个进程的内存空间中,并非全部计算算子都满足于依照一种方式分区进行计算。

当须要对数据进行排序存储时。就有了又一次依照一定的规则对数据又一次分区的必要。shuffle就是包裹在各种须要重分区的算子之下的一个对数据进行又一次组合的过程。

在逻辑上还能够这样理解:因为又一次分区须要知道分区规则。而分区规则依照数据的key通过映射函数(hash或者range等)进行划分,由数据确定出key的过程就是map过程,同一时候map过程也能够做数据处理。比如,在join算法中有一个非常经典的算法叫map side join,就是确定数据该放到哪个分区的逻辑定义阶段。shuffle将数据进行收集分配到指定reduce分区,reduce阶段依据函数对对应的分区做reduce所需的函数处理。

【Spark】Spark的Shuffle机制

* 首先每个mapper会依据reducer的数量创建出对应的bucket,bucket的数量是m×r,当中m是map的个数,r是reduce的个数。

* 其次mapper产生的结果会依据设置的partition算法填充到每个bucket中去。

这里的partition算法是能够自己定义的,当然默认的算法是依据key哈希到不同的bucket中去。

* 当reducer启动时,它会依据自己task的id和所依赖的mapper的id从远端或是本地的block manager中取得对应的bucket作为reducer的输入进行处理。

这里的bucket是一个抽象概念,在实现中每个bucket能够对应一个文件。能够对应文件的一部分或是其它等。

继续阅读