一、flink介紹
Apache Flink 是一個開源的分布式流處理和批處理系統。Flink 的核心是在資料流上提供資料分發、通信、具備容錯的分布式計算。同時,Flink 在流處理引擎上建構了批處理引擎,原生支援了疊代計算、記憶體管理和程式優化。
二、部署環境
作業系統環境:
flink支援Linux, Mac OS X, 和 Windows環境部署,本次部署選擇Linux環境部署。
JDK:
要求Java 7或者更高
三、下載下傳軟體
四、部署步驟
1、JDK安裝步驟此處省略,安裝後驗證下JDK環境
$ java -version
openjdk version "1.8.0_144"
OpenJDK Runtime Environment (build 1.8.0_144-b01)
OpenJDK 64-Bit Server VM (build 25.144-b01, mixed mode)
2、安裝部署flink
本文介紹flink部署分為兩種模式:local,standalone。下面依次介紹這兩種模式的部署方式。
找到下載下傳的flink壓縮包,進行解壓
$ tar -zxvf flink-1.4.2-bin-hadoop26-scala_2.11.tgz
首先是local模式,最為簡單。
$ cd flink-1.4.2
$ bin/start-local.sh
Starting job manager
我們可以通過檢視日志确認是否啟動成功
$ tailf flink-csap-taskmanager-0-XXXX.log
2018-05-03 10:07:53,718 INFO org.apache.flink.runtime.filecache.FileCache - User file cache uses directory /tmp/flink-dist-cache-4c371de9-0f85-4889-b4d9-4a522641549c
2018-05-03 10:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager - Starting TaskManager actor at akka://flink/user/taskmanager#-524742300.
2018-05-03 10:07:53,725 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager data connection information: 2c358d6f38949f9aae31c5bddb0cc1dc @ LY1F-R021707-VM14.local (dataPort=55234)
2018-05-03 10:07:53,726 INFO org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 1 task slot(s).
2018-05-03 10:07:53,727 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 111/1024/1024 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
2018-05-03 10:07:53,730 INFO org.apache.flink.runtime.taskmanager.TaskManager - Trying to register at JobManager akka.tcp://flink@localhost:6123/user/jobmanager (attempt 1, timeout: 500 milliseconds)
2018-05-03 10:07:53,848 INFO org.apache.flink.runtime.taskmanager.TaskManager - Successful registration at JobManager (akka.tcp://flink@localhost:6123/user/jobmanager), starting network stack and library cache.
2018-05-03 10:07:53,851 INFO org.apache.flink.runtime.taskmanager.TaskManager - Determined BLOB server address to be localhost/127.0.0.1:52382. Starting BLOB cache.
2018-05-03 10:07:53,858 INFO org.apache.flink.runtime.blob.PermanentBlobCache - Created BLOB cache storage directory /tmp/blobStore-c07b9e80-41f0-490f-8126-7008144c4b0b
2018-05-03 10:07:53,861 INFO org.apache.flink.runtime.blob.TransientBlobCache - Created BLOB cache storage directory /tmp/blobStore-e0d1b687-1c47-41c4-b5bc-10ceaa39e778
JobManager程序将會在8081端口上啟動一個WEB頁面,我們可以通過浏覽器到hostname:8081中檢視相關的資訊。
可以打開頁面檢視到相關資訊,說明local模式部署是沒問題的。
下面來看一下standlone部署方式。
安裝JDK,解壓壓縮包,都是一樣的。不一樣的是我們要修改解壓後的flink配置檔案。然後在叢集主機間做免密,
免密操作方法。
修改conf/flink-conf.yaml,我們将jobmanager.rpc.address的值設定成你master節點的IP位址。此外,我們通過jobmanager.heap.mb和taskmanager.heap.mb配置參數來設定每個節點的JVM能夠配置設定的最大記憶體。從配置參數名字可以看出,這個參數的機關是MB,如果某些節點擁有比你之前設定的值更多的記憶體時,我們可以在那個節通過FLINK_TM_HEAP參數類覆寫值錢的設定。
我們需要把所有将要作為worker節點的IP位址存放在conf/slaves檔案中,在conf/slaves檔案中,每個IP位址必須放在一行,如下:
192.168.0.100
192.168.0.101
.
.
.
192.168.0.150
然後将修改好的flink包整理複制到叢集各個節點。每個節點flink路徑保持一緻。然後啟動叢集
$ bin/start-cluster.sh
檢視日志是否成功。
以上是部署方法,部署成功後,我們來跑一個demo程式,驗證一下Flink的流處理功能,對其有個初步的了解。
flink為了更好的讓大家了解,已經給大家提供了一些demo代碼,demo的jar包可以在/examples/streaming首先看一下demo代碼:
object SocketWindowWordCount {
def main(args: Array[String]) : Unit = {
// the port to connect to
val port: Int = try {
ParameterTool.fromArgs(args).getInt("port")
} catch {
case e: Exception => {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
return
}
}
// get the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// get input data by connecting to the socket
val text = env.socketTextStream("localhost", port, '\n')
// parse the data, group it, window it, and aggregate the counts
val windowCounts = text
.flatMap { w => w.split("\\s") }
.map { w => WordWithCount(w, 1) }
.keyBy("word")
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum("count")
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1)
env.execute("Socket Window WordCount")
}
// Data type for words with count
case class WordWithCount(word: String, count: Long)
}
這個demo是監控端口,然後對端口輸入單子進行wordcount的程式。
運作demo,首先打開一個視窗進行端口資料輸入:
$ nc -l 9001
hello
hello
word
world
然後運作demo監控端口單詞輸入統計:
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9001
運作後可以看到結果統計:
$ more flink-csap-taskmanager-0-XXX.out.1
hello : 1
hello : 1
word : 1
world : 1
以上就是flink的hello world,大家初步對flink有個了解。