簡述
WordCount(單詞計數)一直是大資料入門的經典案例,下面用java和scala實作Flink的WordCount代碼;
采用IDEA + Maven + Flink 環境;文末附 pom 檔案和相關技術點總結;
Java實作Flink批處理版本
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class WordCountBatchByJava {
public static void main(String[] args) throws Exception {
// 建立執行環境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 加載或建立源資料
DataSet<String> text = env.fromElements("this a book", "i love china", "i am chinese");
// 轉化處理資料
DataSet<Tuple2<String, Integer>> ds = text.flatMap(new LineSplitter()).groupBy(0).sum(1);
// 輸出資料到目的端
ds.print();
// 執行任務操作
// 由于是Batch操作,當DataSet調用print方法時,源碼内部已經調用Excute方法,是以此處不再調用,如果調用會出現錯誤
//env.execute("Flink Batch Word Count By Java");
}
static class LineSplitter implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word:line.split(" ")) {
collector.collect(new Tuple2<>(word,1));
}
}
}
}
運作輸出結果如下:
(a,1)
(am,1)
(love,1)
(china,1)
(this,1)
(i,2)
(book,1)
(chinese,1)
Java實作Flink流處理版本
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WordCountStreamingByJava {
public static void main(String[] args) throws Exception {
// 建立執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設定socket資料源
DataStreamSource<String> source = env.socketTextStream("192.168.1.111", 9999, "\n");
// 轉化處理資料
DataStream<WordWithCount> dataStream = source.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String line, Collector<WordWithCount> collector) throws Exception {
for (String word : line.split(" ")) {
collector.collect(new WordWithCount(word, 1));
}
}
}).keyBy("word")//以key分組統計
.timeWindow(Time.seconds(2),Time.seconds(2))//設定一個視窗函數,模拟資料流動
.sum("count");//計算時間視窗内的詞語個數
// 輸出資料到目的端
dataStream.print();
// 執行任務操作
env.execute("Flink Streaming Word Count By Java");
}
public static class WordWithCount{
public String word;
public int count;
public WordWithCount(){
}
public WordWithCount(String word, int count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
啟動一個shell視窗,聯通9999端口,輸入資料:
[root@spark111 flink-1.6.2]# nc -l 9999
山東 天津 北京 河北 河南 山東 上海 北京
山東 海南 青海 西藏 四川 海南
IDEA 輸出結果如下:
4> WordWithCount{word='北京', count=2}
1> WordWithCount{word='上海', count=1}
5> WordWithCount{word='天津', count=1}
4> WordWithCount{word='河南', count=1}
7> WordWithCount{word='山東', count=2}
3> WordWithCount{word='河北', count=1}
------------------------為了區分前後時間視窗結果,手動加的這條線--------------------------
8> WordWithCount{word='海南', count=2}
8> WordWithCount{word='四川', count=1}
7> WordWithCount{word='山東', count=1}
1> WordWithCount{word='西藏', count=1}
5> WordWithCount{word='青海', count=1}
Scala實作Flink批處理版本
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment
object WordCountBatchByScala {
def main(args: Array[String]): Unit = {
//擷取執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
//加載資料源
val source = env.fromElements("china is the best country","beijing is the capital of china")
//轉化處理資料
val ds = source.flatMap(_.split(" ")).map((_,1)).groupBy(0).sum(1)
//輸出至目的端
ds.print()
// 執行操作
// 由于是Batch操作,當DataSet調用print方法時,源碼内部已經調用Excute方法,是以此處不再調用,如果調用會出現錯誤
//env.execute("Flink Batch Word Count By Scala")
}
}
運作結果如下:
(is,2)
(beijing,1)
(the,2)
(china,2)
(country,1)
(of,1)
(best,1)
(capital,1)
Scala實作Flink流處理版本
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
object WordCountStreamingByScala {
def main(args: Array[String]): Unit = {
//擷取執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//加載或建立資料源
val source = env.socketTextStream("192.168.1.111",9999,'\n')
//轉化處理資料
val dataStream = source.flatMap(_.split(" "))
.map((_,1))
.keyBy(0)
.timeWindow(Time.seconds(2),Time.seconds(2))
.sum(1)
//輸出到目的端
dataStream.print()
//執行操作
env.execute("Flink Streaming Word Count By Scala")
}
}
啟動shell視窗,開啟9999端口通信,輸入詞語:
[root@spark111 flink-1.6.2]# nc -l 9999
time is passed what is the time?
time is nine time passed again
4> (what,1)
5> (time,1)
8> (is,2)
5> (time?,1)
8> (passed,1)
5> (the,1)
------------------------為了區分前後時間視窗結果,手動加的這條線--------------------------
8> (is,1)
5> (time,2)
8> (passed,1)
7> (nine,1)
6> (again,1)
POM檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ssrs</groupId>
<artifactId>flinkdemo</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.8.4</hadoop.version>
<flink.version>1.6.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
</project>
總結
-
flink處理任務流程如下:
① 擷取執行環境 (Environment)
② 加載或者建立資料源(source)
③ 轉化處理資料(transformation)
④ 輸出目的端(sink)
⑤ 執行任務(execute)
- 在批進行中,如果輸出目的端,執行的 print 指令(除此之外,還有count,collect方法),則執行任務Execute不需要調用(因為這些方法内部已經調用了Execute方法);如果調用,雖然也有正确結果,但是會有錯誤資訊輸出;錯誤如下:
Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'. at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:940) at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:922) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:85) at com.ssrs.WordCountBatchByJava.main(WordCountBatchByJava.java:27)
- 如果批處理代碼中,輸出目的端調用writeAsCsv、writeAsText等其他方法,則後面需要調用Execute;
- 批處理擷取執行環境用ExecutionEnvironment,流處理擷取環境用StreamExecutionEnvironment
- 批處理後的資料是DataSet,流處理後的資料是DataStream.
作者:ShadowFiend
出處:http://www.cnblogs.com/ShadowFiend/
本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接配接,否則保留追究法律責任的權利。如有問題或建議,請多多賜教,非常感謝。