天天看點

Spark 算子操作剖析4

開發者學堂課程【大資料實時計算架構 Spark 快速入門:Spark 算子操作剖析4】學習筆記,與課程緊密聯系,讓使用者快速學習知識。

課程位址:

https://developer.aliyun.com/learning/course/100/detail/1690

Spark 算子操作剖析 4

有了 combiner 網絡傳輸效率更高,以前可能需要傳一萬個1,現在一個 key 在一個partition 裡隻需一個數,在 reduce 端的計算量也相應減少。

Sum 端的累加可以用 reducebykey 來做,如果有1,2,3三個數,1,2在一個 partition,3在一個 partiton,用 reducebykey 來做,自帶 combiner,在 map 端進行加和平均的邏輯操作,1,2平均為1.5,再和3平均,結果不為2。

Aggregatebykey:

當 map 端和 reduce 端邏輯不一樣時,可直接使用 aggregatebykey.

Sample 為随機抽樣,從整個 RDD 裡不重複取 false 之後的百分數,若改為 true,則會抽出重複的。

union(otherDataser)

Return a new dataset that contains the union of the elements in the source dataset and

the argument.

intersection(otherDataser)

Return a new RDD that contains the Intersection of elements in the source dataset and

distinct([numTasks])

Return a new dataset that contains the distinct elements of the source dataset.

groupByKey(inumTasks))

When called on a dataset of (K, V) palrs, returns a dataset of (K, Iterable) palrs!

Note: If you are grouping In order to pertorm an aggregation (such as a sum or average)

over each key, using rcaucesykey or aggregateBykey will yield much better pertormance.

Note: By default, the level of parallelism in the output depends on the number of

partitions of the parent RDD. You can pass an optional numtasks argument to set a

different number of tasks,

reduceByKey(tunc, [numTasks)

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the

values for each key are aggregated using the given reduce function func, which must be

of type (VV)=> V. Like in groupeykey, the number of reduce tasks is configurable through

an optional second argument.

aggregateByKey(zeroValue)(seqOp, combop,

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the

[numTasks])

values for each key are aggregated using the glven combine functions and a neutral

"zero" value. Allows an aggregated value type that is difterent than the input value type,

while avolding unnecessary allocations. Like in groupBykey, the number of reduce tasks is  

contigurable through an optional second argument.

sortByKey([ascending], [numTasks)

When called on a dataset of (K, V) pairs where K Implements Ordered, returns a dataset

of (K, V) pairs sorted by keys in ascending or descending order, as speclflied in the

boolean ascending argument.

join(otherDataset,[numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs

with all pairs of elements for each key. Outer joins are supported through TeftouterJoin,

rightouterJoin, and fu11outerJoin.

cogroup(otherDataset,[num Tasks])

When called on datasets of type (K, V) and (K, W). returns a dataset of (K,(Iterable

Iterable)) tuples. This operation is also called groupwith.

cartesian(olherDalasef)

When called on datlasets of types T and U, returns a dalaset of (T,U) pairs (alli pairs of

elements).

JavaSparkContext sc = new JavaSparkContext(conf);

List> scoreList = Arrays.aslist(

new Tuple2("xuruyun", 150),

new Tuple2("liangyongqi", 100),

new Tuple2("wangfei”, 100),

new Tuple2("wangfei", 80));

JavaPairRDD rdd = sc.parallelizePairs(scorelist);

rdd.reduceByKey(new Function2(){

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2) throws Exception {

return v1+v2;

}).foreach(new VoidFunction>(){

package com.shsxt.study.operator;

pimport java.util.Arrays;

public class AggregateBKeyOperator {

public static void main(String[] args){

SparkConf conf = new SparkConf()

isetAppName("AggregateByKeyOperator")

.setMaster("local");

JavaSparkContext sc= new JavaSparkContext(conf);

JavaRDD lines = sc.textFile("CHANGES.txt");

JavaRDD words = lines.flatMap(new FlatMapFunction(){

public Iterable call(String line) throws Exception {

return Arrays.aslist(line.split(""));

DA

javapairRDD pairs = words.mapToPair(new PairFunction

public Tuple2 call(String word) throws Exception {

return new Tuple2(word ,1);

// aggregateByKey其實和這個 reduceByKey 差不多,reduceByKey 是@ggregateByKey

//aggregateByKey 裡面的參數需要三個

//第一個參數,每個 Key 的初始值

//第二個參數,Seq Function,如何進行 Shuffle map-side 的本地聚合

//第三個參數,說白了就是如何進行 Shuffle reduce-side 的全局聚合

[簡化版

// reduce foldLeft  

JavaPairRDD wordCounts = pairs.aggregateBykey(0

, new Function2(){

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer v1, Integer v2)

throws Exception {

return v1+v2;

public Iterable call(String line) throws Exception {|

return Arrays.aslist(line.split(""));

javaPairRDD pairs = words.mapToPair(new Pairfunction

private static final long serialVersionUID = 1L;

@Override

public Tuple2 call(String word) throws Exception {

return new Tuple2(word ,1);

// aggregateByKey 其實和這個 reduceByKey 差不多,reduceByKey 是aggregateByKey 簡化版

// aggregateByKey 裡面的參數需要三個

//第一個參數,每個 Key 的初始值

//第二個參數,Seq Function,如何進行 Shuffle map-side 的本地聚合

//第三個參數,說白了就是如何進行 Shuffle reduce-side 的全局聚合

new Function2(){

private static final long serialVersionUID = 1L;

@Override

public Integer call(Integer vi, Integer v2)

throws Exception {

return v1+v2;

3);

List> list = wordCounts.collect();

for(Tuple2 wc: list){

System.out.println(wc);

sc.close();

public class SampleOperator {

public static void main(String[] args){

SparkConf conf = new SparkConf().setAppName("SampleOperator")

.setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);

List names = Arrays

aslist("xurunyun","liangyongqi","wangfei","yasaka","xurunyun","lian

JavaRDD nameRDD = sc.parallelize(names,2);

nameRDD.sample(false,@13).foreach(new VoidFunction(){

private static final long serialVersionUID = 1L;

@Override

public void call(String name) throws Exception {

System.out.println(name);

//

sc.close();

Return a sampled subset of this RDD.

@param withReplacement can elements be sampled multiple times (Feplaced when sampled out)

@param fraction expected size of the sample as a fraction of this RDD's size

without replacement: probability that each element is chosen; fraction must be [e, 1]

with replacement: expected number of times each element is chosen; fraction must be >= o

@param seed seed for the random number generator

def sample(

withReplacement: Boolean,

fraction: Double,

seed:Long = Utils.random.nextLong):RDD[T]= withscope {

require(fraction >= 0.0,"Negative fraction value:"+ fraction)

if (withReplacement){

new PartitionwiseSampledRDD[T, T](this, new Poissonsampler[T](fraction), true, seed)

} else {

new PartitionwiseSampledRDD[T, T](this, new Bernoullisampler[T](fraction), true, seed)

Randomly splits this RDD with the provided weights.

@param weights weights for splits, will be normalized if they don't sum to 1

@param seed random seed

@return split RDDs in an array

繼續閱讀