flink
入門案例之
WordCount
,以下測試代碼都是在本地執行的
添加依賴
添加maven依賴
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.7.2</version>
</dependency>
</dependencies>
build資訊
<build>
<plugins>
<!-- 該插件用于将Scala代碼編譯成class檔案 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<!-- 聲明綁定到maven的compile階段 -->
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
批處理
import org.apache.flink.api.scala.{AggregateDataSet, DataSet, ExecutionEnvironment, createTypeInformation}
object WordCount {
def main(args: Array[String]): Unit = {
//擷取執行環境
//建立一個執行環境,該環境代表目前在其中執行程式的上下文。
// 如果程式是獨立調用的,則此方法傳回本地執行環境。
// 如果從指令行用戶端内部調用該程式以将其送出給叢集,則此方法将傳回該叢集的執行環境。
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//建立通過逐行讀取給定檔案而産生的字元串資料集。
val inputDS: DataSet[String] = env.readTextFile("D:/test/a.txt")
val wordCounts: AggregateDataSet[(String, Int)] = inputDS.flatMap(_.split(" ")).map((_, 1)).groupBy(0).sum(1)
//将DataSet中的元素列印到調用print()方法的JVM的标準輸出流System.out中。
// 對于在群集中執行的程式,此方法需要将DataSet的内容收集回用戶端,以在用戶端列印。
//為每個元素編寫的字元串由AnyRef.toString方法定義。
//與()和()方法類似,此方法立即觸發程式執行。
wordCounts.print()
}
}
流處理
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object StreamWordCount {
def main(args: Array[String]): Unit = {
//替換為你自己的IP
val host = "192.168.xx.1x1"
val port = 9000
// 建立流處理環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 接收socket文本流
val textDstream: DataStream[String] = env.socketTextStream(host, port)
// flatMap和Map需要引用的隐式轉換
import org.apache.flink.api.scala._
val value: DataStream[(String, Int)] = textDstream.flatMap(_.split("\\s")).filter((_.nonEmpty)).map((_, 1)).keyBy(0).sum(1)
value.print().setParallelism(1)
env.execute("Socket WordCountTest")
}
}
之後在
linux
上用如下指令進行測試
總結
- 導入包的時候一定要留意導入的是
的包還是java
的包,很多函數是兩個包都有的,導入錯了讓你懷疑人生scala