1 Javawordcount 直接上代碼
public class Testwordcount {
private static final Pattern SPACE = Pattern.compile(" ");//定義分割的類型
public static void main(String[] args) {
String logFile = "D:\\ab.txt";
SparkConf conf = new SparkConf().setMaster("local").setAppName("TestSpark");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache(); //讀取檔案中的資料并緩存到記憶體
JavaRDD<String> words = logData.flatMap(new FlatMapFunction<String, String>() { //将資料進行切割 執行個體中是以空格切割
public Iterator<String> call(String s) {
List<String> list = Arrays.asList(SPACE.split(s));
Iterator<String> ss = list.iterator();
return ss;
}
});
JavaPairRDD<String, Integer> countss = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
// countss.saveAsTextFile(outfile);
List<Tuple2<String, Integer>> output = countss.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
sc.close();
}
}
檔案中的資料
flume sd hadoop hbase kylin hdfs hadoop sd sd flume hdfs
執行的結果
20/05/12 10:10:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 23 ms on localhost (executor driver) (1/1)
20/05/12 10:10:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
20/05/12 10:10:47 INFO DAGScheduler: ResultStage 1 (collect at Testwordcount.java:58) finished in 0.024 s
20/05/12 10:10:47 INFO DAGScheduler: Job 0 finished: collect at Testwordcount.java:58, took 0.255136 s
hadoop: 2
flume: 2
kylin: 1
hdfs: 2
sd: 3
hbase: 1
2常用算子的使用
轉化類算子
flatMap 将資料進行切割 有一變多的作用
logData.flatMap(new FlatMapFunction<String, String>() {
public Iterator<String> call(String s) {
List<String> list = Arrays.asList(SPACE.split(s));
Iterator<String> ss = list.iterator();
return ss;
}
parallelize和makeRDD 都是将資料轉化成RDD makeRDD的底層是parallelize
舉例:将 list變成RDD
ArrayList<String> list = new ArrayList<String>();list.add("1");
list.add("2");
list.add("3");
list.add("4");
list.add("5");
JavaRDD<String> parallelize = sc.parallelize(list);
JavaRDD<String> filter = parallelize.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
if (s.equals("2")) {
return false;
}
return true;
}
});
List<String> collect = filter.collect();
System.out.println(collect);
filter 算子 過濾作用 傳回類型是Boolen false是過濾 true是資料通過
JavaRDD<String> filter = parallelize.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
if (s.equals("2")) {
return false;
}
return true;
}
});
acction 算子
first 擷取所有資料的第一個元素
count 計算處理之後行數
saveAsTextFile 輸出到某個檔案中