1. 規劃目錄:
/home/user/flink
2. 解壓檔案并重命名
tar zxvf flink-1.11.1-bin-scala_2.11.tgz
mv flink-1.11.1 flink
3. 配置conf
3.1 flink-conf.yaml
在Common部分增加或設定如下:
env.java.home: /usr/java/default
state.savepoints.dir: hdfs:/flink/savepoints
jobmanager.rpc.address: 10.88.0.86
jobmanager.rpc.port: 6123
blob.server.port: 35214
jobmanager.memory.process.size: 1600m
taskmanager.heap.mb: 20480m --TaskManager總共能使用的記憶體大小
taskmanager.numberOfTaskSlots: 16 --伺服器CPU Core數一緻,每一台機器上能使用的 CPU 個數
parallelism.default: 1 --叢集中的總 CPU個數
3.2 masters
10.88.0.86:8081
3.3 works
10.88.0.87
10.88.0.88
3.4 zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
clientPort=2181
server.1=10.88.1.85:2888:3888
server.2=10.88.1.86:2888:3888
server.3=10.88.1.87:2888:3888
3.5 打包并發送到workers
tar zcvf flink flink.tgz
scp flink.tgz [email protected]:/home/user
scp flink.tgz [email protected]:/home/user
3.6 在workers上加壓并略作配置的修改
vi flink-conf.yaml
Common部分
+ taskmanager.host: 10.88.0.87
4. 啟動和使用
4.1 在所有伺服器上配置環境變量
vi /etc/profile
FLINK_HOME=/home/user/flink
HADOOP_HOME=/usr/lib/hadoop
HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce
HADOOP_CLASSPATH=`hadoop classpath`
PATH=$PATH:$JAVA_HOME/bin:$SPARK_HOME:$JRE_HOME/bin:$KAFKA_HOME/bin:$PHOENIX_HOME/bin:$FLINK_HOME/bin:$HADOOP_HOME/bin:/usr/local/redis/bin
export JAVA_HOME JRE_HOME PHOENIX_HOME FLINK_HOME KAFKA_HOME SPARK_HOME HADOOP_HOME HADOOP_MAPRED_HOME HADOOP_CLASSPATH CLASS_PATH PATH
4.2 在master上啟動叢集
start-cluster.sh
檢查jps,master上:StandaloneSessionClusterEntrypoint
workers上:TaskManagerRunner
4.3 停止叢集
stop-cluster.sh
4.4 啟動任務
flink run workers2hbase-0.0.1-SNAPSHOT.jar
flink run workers2hbase-0.0.1-SNAPSHOT.jar -p 2 --并發度
檢查有哪些任務
flink list -a
殺掉任務
flink stop 721edfb9b1fff06e8ee63ca9e7b9c183
5. 叢集部署後可以做一下檢查
start-scala-shell.sh remote 10.88.0.86 8081
Scala> val text = benv.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
Scala> val counts = text
.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, 1) }.groupBy(0).sum(1)
Scala> counts.print()
6.添加jobmanager和taskmanager
添加jobmanager:
bin/jobmanager.sh ((start|start-foreground) cluster) | stop | stop-all
添加taskmanager:
bin/taskmanager.sh start | start-foreground | stop | stop-all
7. 利用yarn進行資源排程
7.1 啟動一個YARN session用2個TaskManager(每個TaskManager配置設定1GB的堆空間)
yarn-session.sh -d -n 2 -jm 1024 -tm 1024 -s 2
參數:-d,背景運作session,flink yarn client将會隻送出任務到叢集然後關閉自己。注意:在這種情況下,無法使用flink停止yarn session,使用yarn工具來停止yarn session。
-n 2個TaskManager,每個TaskManager記憶體為1G(-tm)且占用了2個核(-s)
通過yarn application --list 檢視狀态
檢視日志檔案:yarn logs -applicationId <application ID>
nohup flink run -c com.du.mflink2hbase workers2hbase-0.0.1-SNAPSHOT.jar --port 8020 &
7.2 以叢集模式送出任務,每次都會建立flink叢集
flink run -m yarn-cluster -c com.du.mflink2hbase /home/user/lflink-demo-1.0-SNAPSHOT.jar
7.3 問題
Q1:Invalid resource request, requested virtual cores < 0, or requested virtual cores > max configured, requestedVirtualCores=10, maxVirtualCores=4
修改yarn-site.xml
+
<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>32</value>
</property>