程式部署
本地執行
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.createLocalEnvironment(3)
//2.建立DataStream
val text = env.socketTextStream("train",9999)
//3.執行DataStream的轉換算子
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
遠端部署
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream
val text = env.socketTextStream("train",9999)
//3.執行DataStream的轉換算子
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
StreamExecutionEnvironment.getExecutionEnvironment自動識别運作環境,如果運作環境是idea,系統會自動切換成本地模式,預設系統的并行度使用系統最大線程數,等價于Spark中設定的,如果是生産環境,需要使用者在送出任務的時候指定并行度
local[*]
--parallelism
- 部署方式
- WEB UI部署(略)
- 通過腳本部署
[[email protected] ~]# cd /usr/soft/flink-1.10.0/
[[email protected] flink-1.10.0]# ./bin/flink run
--class com.baizhi.quickstart.FlinkWordCountTestLocal
--detached --parallelism 4
--jobmanager train:8081 /root/flink-1.0-SNAPSHOT.jar
Job has been submitted with JobID 808021820a80d008e8fcd7e72bba1029
檢視現有任務
[[email protected] flink-1.10.0]# ./bin/flink list --running --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------
[[email protected] flink-1.10.0]# ./bin/flink list --all --jobmanager train:8081
Waiting for response...
------------------ Running/Restarting Jobs -------------------
29.02.2020 05:06:41 : 808021820a80d008e8fcd7e72bba1029 : Window Stream WordCount (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
取消指定任務
[[email protected] flink-1.10.0]# ./bin/flink cancel --jobmanager train:8081 808021820a80d008e8fcd7e72bba1029
Cancelling job 808021820a80d008e8fcd7e72bba1029.
Cancelled job 808021820a80d008e8fcd7e72bba1029.
檢視程式執行計劃
[[email protected] flink-1.10.0]# ./bin/flink info --class com.baizhi.quickstart.FlinkWordCountTestLocal --parallelism 4 /root/flink-1.0-SNAPSHOT.jar
----------------------- Execution Plan -----------------------
{"nodes":[{"id":1,"type":"Source: Socket Stream","pact":"Data Source","contents":"Source: Socket Stream","parallelism":1},{"id":2,"type":"Flat Map","pact":"Operator","contents":"Flat Map","parallelism":4,"predecessors":[{"id":1,"ship_strategy":"REBALANCE","side":"second"}]},{"id":3,"type":"Map","pact":"Operator","contents":"Map","parallelism":4,"predecessors":[{"id":2,"ship_strategy":"FORWARD","side":"second"}]},{"id":5,"type":"aggregation","pact":"Operator","contents":"aggregation","parallelism":4,"predecessors":[{"id":3,"ship_strategy":"HASH","side":"second"}]},{"id":6,"type":"Sink: Print to Std. Out","pact":"Data Sink","contents":"Sink: Print to Std. Out","parallelism":4,"predecessors":[{"id":5,"ship_strategy":"FORWARD","side":"second"}]}]}
--------------------------------------------------------------
No description provided.
⽤戶可以通路:
https://flink.apache.org/visualizer/
将json資料粘貼過去,檢視Flink執⾏計劃圖
跨平台釋出
//1.建立流計算執行環境
var jars = "D:\\ideacores\\Flink\\flink\\target\\flink-1.0-SNAPSHOT.jar"
val env = StreamExecutionEnvironment.createRemoteEnvironment("train",8081,jars)
//設定預設并行度
env.setParallelism(4)
//2.建立DataStream
val text = env.socketTextStream("train",9999)
//3.執行DataStream的轉換算子
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
在運作之前需要使用mvn重新打包程式。直接運作main函數即可
Streaming(DataStream API)
DataSource
資料源是程式讀取資料的來源,使用者可以通過
env.addSource(SourceFunction)
,将SourceFunction添加到程式中。Flink内置許多已知實作的SourceFunction,但是使用者可以自定義實作
SourceFunction
(非并行化的接口)接口或者實作
ParallelSourceFunction
(并行化)接口,如果需要有狀态管理還可以繼承
RichParallelSourceFunction
File-based
-
- 讀取文本檔案(一次) 并以字元串的形式傳回它們readTextFile(path)
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream - 細化
val text = env.readTextFile("hdfs://train:9000/demo/words")
//3.執行DataStreaem的轉換算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
-
readFile(fileInputFormat, path)
-根據指定的檔案輸入讀取(一次)檔案
格式
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream - 細化
val inputFormat = new TextInputFormat(null)
val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words")
//3.執行DataStreaem的轉換算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
-
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)
-
這是前兩個方法在内部調用的方。 它讀取路徑中的檔案
fileInputFormat
. 根據所提供的watchType,此源可以定期調用
監視(每隔ms)新資料的路徑
(
FileProcessingMode.PROCESS_CONTINUOUSLY
), 或處理目前路徑中的資料
退出檔案處理模式。
PROCESS_ONCE
)。使用路徑過濾器,使用者可以進一步
排除正在處理的檔案。
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream - 細化
val inputFormat = new TextInputFormat(null)
val text = env.readFile(inputFormat,"hdfs://train:9000/demo/words",FileProcessingMode.PROCESS_CONTINUOUSLY,1000)
//3.執行DataStreaem的轉換算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
該方法會檢查采集目錄下的檔案,如果檔案發生變化系統會重新采集。此時可能會導緻檔案的重複計算。一般來說不建議修改檔案内容,直接上傳新檔案即可。
Socket Based
-
- Reads from a socket. Elements can be separated by a delimiter.socketTextStream
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream - 細化
val text = env.socketTextStream("train",9999,'\n',3)
//3.執行DataStreaem的轉換算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
Collection-based
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream - 細化
val inputFormat = new TextInputFormat(null)
val text = env.fromCollection(List("this is a demo","hello word"))
//3.執行DataStreaem的轉換算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
UserDefinedSource
- SourceFunction(非并行化)
import java.util.Random
import org.apache.flink.streaming.api.functions.source.SourceFunction
class UserDefinedNonParallelSourceFunction extends SourceFunction[String]{
@volatile //防止線程拷貝變量
var isRunning = true
var lines = Array("this is a demo","hello world","ni hao ma")
//在該方法中啟動線程,通過sourceContext的collect方法發送資料
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while (isRunning){
Thread.sleep(100)
//輸送資料給下遊
sourceContext.collect(lines(new Random().nextInt(lines.size)))
}
}
//釋放資源
override def cancel(): Unit = {
isRunning = false
}
}
測試
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream - 細化
val inputFormat = new TextInputFormat(null)
val text = env.addSource(new UserDefinedNonParallelSourceFunction)
//3.執行DataStreaem的轉換算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
- ParallelSourceFunction(并行化)
import java.util.Random
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}
class UserDefinedNonParallelSourceFunction extends ParallelSourceFunction[String]{
@volatile //防止線程拷貝變量
var isRunning = true
var lines = Array("this is a demo","hello world","ni hao ma")
//在該方法中啟動線程,通過sourceContext的collect方法發送資料
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
while (isRunning){
Thread.sleep(100)
//輸送資料給下遊
sourceContext.collect(lines(new Random().nextInt(lines.size)))
}
}
//釋放資源
override def cancel(): Unit = {
isRunning = false
}
}
測試
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream - 細化
val inputFormat = new TextInputFormat(null)
val text = env.addSource(new UserDefinedNonParallelSourceFunction)
//3.執行DataStreaem的轉換算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.print()
//5.執行流計算任務
env.execute("Window Stream WordCount")
Kafka Source
- 引入maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.10.0</version>
</dependency>
-
SimpleStringSchema
該SimpleStringSchema方案隻會反序列化kafka中的value
//1.建立流計算執⾏環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream - 細化
val props = new Properties()
props.setProperty("bootstrap.servers", "CentOS:9092")
props.setProperty("group.id", "g1")
val text = env.addSource(new FlinkKafkaConsumer[String]("topic01",new
SimpleStringSchema(),props))
//3.執⾏DataStream的轉換算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制列印
counts.print()
//5.執⾏流計算任務
env.execute("Window Stream WordCount")
- KafkaDeserializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.flink.api.scala._
class UserKafkaDeserialization extends KafkaDeserializationSchema[(String,String,Int,Long)]{
override def isEndOfStream(t: (String, String, Int, Long)): Boolean = false
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String, Int, Long) = {
if(consumerRecord.key()!=null){
(new String(consumerRecord.key()),new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}else{
(null,new String(consumerRecord.value()),consumerRecord.partition(),consumerRecord.offset())
}
}
override def getProducedType: TypeInformation[(String, String, Int, Long)] = {
createTypeInformation[(String,String,Int,Long)]
}
}
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val prop = new Properties()
prop.setProperty("bootstrap.servers","train:9092")
prop.setProperty("group.id", "g1")
val text = env.addSource(new FlinkKafkaConsumer[(String,String,Int,Long)]("topic01",new UserKafkaDeserialization,prop))
text.flatMap(line=>line._2.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
.print()
env.execute("Window Stream")
- JSONKeyValueNodeDeserializationSchema
-
要求Kafka中的topic的key和value都必須是json格式,也可以在使⽤的時候,指定是否讀取中繼資料
(topic、分區、o!set等)
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val prop = new Properties()
prop.setProperty("bootstrap.servers","train:9092")
prop.setProperty("group.id", "g1")
val text = env.addSource(new FlinkKafkaConsumer[ObjectNode]("topic01",new JSONKeyValueDeserializationSchema(true),prop))
//t:{"value":{"id":1,"name":"zhangsan"},"metadata":{"offset":0,"topic":"topic01","partition":13}}
text.map(word=>(word.get("value").get("id").asInt(),word.get("value").get("name").asText()))
.print()
env.execute("Window Stream")
參考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html
Data Sinks
Data Sink使用DataStreams并将其轉發到檔案,Socket,外部系統或列印它們。Flink帶有多種内置輸出格式,這些格式封裝在DataStreams的操作後面。
File-based
- writeAsText()/
:将元素按行寫入為字元串。這些字元串是通過調用每個元素的toString()方法獲得的。TextOutputFormat
- writeAsCsv(…) /
:将元組寫入逗号分隔的值檔案。行和字段分隔符是可配置的。每個字段的值來自對象的toString()方法CsvOutputFormat
- writeUsingOutputFormat/
:方法和自定義檔案輸出的基類。支援自定義對象到位元組的轉換。FileOutputFormat
注意DataStream上的write*()方法主要用于調試目的。
//1.建立流計算執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.建立DataStream - 細化
val text = env.fromCollection(List("this is a demo","hello word"))
//3.執行DataStreaem的轉換算子
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
//4.将計算的結果在控制台列印
counts.writeUsingOutputFormat(new TextOutputFormat[(String, Int)](new Path("hdfs://train:9000/flink-results")))
//5.執行流計算任務
env.execute("Window Stream WordCount")
注意事項:如果改成HDFS,需要使用者自己産生大量資料,才能看到測試效果,原因是因為HDFS檔案系統寫入時的緩沖區比較大、以上寫入檔案系統的Sink不能夠參與系統檢查點,如果在生産環境下通常使用flink-connector-filesystem寫入到外圍系統。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.11</artifactId>
<version>1.10.0</version>
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://train:9000/demo/words")
val bucketingSink = StreamingFileSink.forRowFormat(new Path("hdfs://train:9000/bucket-results"),
new SimpleStringEncoder[(String, Int)]("UTF-8")
).withBucketAssigner(new DateTimeBucketAssigner[(String, Int)]("yyyy-MM-dd")) //動态産生寫入路徑
.build()
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(bucketingSink)
老版本寫法
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://train:9000/demo/words")
val bucketingSink = new BucketingSink[(String,Int)]("hdfs://train:9000/bucket-results")
bucketingSink.setBucketer(new DateTimeBucketer[(String,Int)]("yyyy-MM-dd"))
bucketingSink.setBatchSize(1024)
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(bucketingSink)
print()/printToErr()
Prints the toString() value of each element on the standard out / standard error stream. Optionally, aprefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print . If the parallelism is greater than 1, the output will also be prepended with theidentifier of the task which produced the output.
列印标準輸出/标準錯誤流中每個元素的toString()值。可選地,可以提供aprefix (msg),它預先寫入輸出。這有助于區分不同的列印調用。如果并行度大于1,輸出也将以産生輸出的任務的辨別符作為字首。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://train:9000/demo/words")
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.setParallelism(2).print("測試")
env.execute("Window Stream")
UserDefinedSinkFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
class UserDefinedSinkFunction extends RichSinkFunction[(String,Int)]{
override def open(parameters: Configuration): Unit = {
println("打開連結...")
}
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
println("輸出:"+value)
}
override def close(): Unit = {
println("釋放連結")
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile("hdfs://train:9000/demo/words")
val counts = text.flatMap(line=>line.split(" "))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(new UserDefinedSinkFunction)
env.execute("Window Stream WordCount")
RedisSink
參考:
https://bahir.apache.org/docs/flink/current/flink-streaming-redis/
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
//1.建立流計算執⾏環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//2.建立DataStream - 細化
val text = env.readTextFile("hdfs://train:9000/demo/words")
var flinkJeidsConf = new FlinkJedisPoolConfig.Builder()
.setHost("train")
.setPort(6379)
.build()
//3.執⾏DataStream的轉換算⼦
val counts = text.flatMap(line=>line.split("\\s+"))
.map(word=>(word,1))
.keyBy(0)
.sum(1)
counts.addSink(new RedisSink(flinkJeidsConf,new UserDefinedSinkFunction()))
env.execute("Window Stream")
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
class UserDefinedSinkFunction extends RedisMapper[(String,Int)]{
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.HSET,"wordcounts")
}
override def getKeyFromData(t: (String, Int)): String = t._1
override def getValueFromData(t: (String, Int)): String = t._2+""
}