天天看點

Flink standalone/yarn模式部署

1. 規劃目錄:

/home/user/flink

2. 解壓檔案并重命名

tar zxvf flink-1.11.1-bin-scala_2.11.tgz 

mv flink-1.11.1 flink

3. 配置conf

3.1 flink-conf.yaml 

在Common部分增加或設定如下:

env.java.home: /usr/java/default

state.savepoints.dir: hdfs:/flink/savepoints

jobmanager.rpc.address: 10.88.0.86 

jobmanager.rpc.port: 6123

blob.server.port: 35214

jobmanager.memory.process.size: 1600m

taskmanager.heap.mb: 20480m     --TaskManager總共能使用的記憶體大小

taskmanager.numberOfTaskSlots: 16  --伺服器CPU Core數一緻,每一台機器上能使用的 CPU 個數

parallelism.default: 1 --叢集中的總 CPU個數

3.2 masters

10.88.0.86:8081

3.3 works

10.88.0.87

10.88.0.88

3.4 zoo.cfg

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/data/zookeeper/data

clientPort=2181

server.1=10.88.1.85:2888:3888

server.2=10.88.1.86:2888:3888

server.3=10.88.1.87:2888:3888

3.5 打包并發送到workers

tar zcvf flink flink.tgz

scp flink.tgz [email protected]:/home/user

scp flink.tgz [email protected]:/home/user

3.6 在workers上加壓并略作配置的修改

vi flink-conf.yaml 

Common部分

+ taskmanager.host: 10.88.0.87

4. 啟動和使用

4.1 在所有伺服器上配置環境變量

vi /etc/profile

FLINK_HOME=/home/user/flink

HADOOP_HOME=/usr/lib/hadoop

HADOOP_MAPRED_HOME=/usr/lib/hadoop-mapreduce

HADOOP_CLASSPATH=`hadoop classpath`

PATH=$PATH:$JAVA_HOME/bin:$SPARK_HOME:$JRE_HOME/bin:$KAFKA_HOME/bin:$PHOENIX_HOME/bin:$FLINK_HOME/bin:$HADOOP_HOME/bin:/usr/local/redis/bin

export JAVA_HOME JRE_HOME PHOENIX_HOME FLINK_HOME KAFKA_HOME SPARK_HOME HADOOP_HOME HADOOP_MAPRED_HOME HADOOP_CLASSPATH CLASS_PATH PATH

4.2 在master上啟動叢集

start-cluster.sh

檢查jps,master上:StandaloneSessionClusterEntrypoint

workers上:TaskManagerRunner

4.3 停止叢集

stop-cluster.sh

4.4 啟動任務

flink run workers2hbase-0.0.1-SNAPSHOT.jar

flink run workers2hbase-0.0.1-SNAPSHOT.jar -p 2 --并發度

檢查有哪些任務

flink list -a

殺掉任務

flink stop 721edfb9b1fff06e8ee63ca9e7b9c183

5. 叢集部署後可以做一下檢查

start-scala-shell.sh remote 10.88.0.86 8081

Scala> val text = benv.fromElements(

  "To be, or not to be,--that is the question:--",

  "Whether 'tis nobler in the mind to suffer",

  "The slings and arrows of outrageous fortune",

  "Or to take arms against a sea of troubles,")

Scala> val counts = text

    .flatMap { _.toLowerCase.split("\\W+") }

    .map { (_, 1) }.groupBy(0).sum(1)

Scala> counts.print()

6.添加jobmanager和taskmanager

添加jobmanager:

bin/jobmanager.sh  ((start|start-foreground) cluster)  | stop | stop-all

添加taskmanager:

bin/taskmanager.sh start | start-foreground | stop | stop-all

7. 利用yarn進行資源排程

7.1 啟動一個YARN session用2個TaskManager(每個TaskManager配置設定1GB的堆空間)

yarn-session.sh -d -n 2 -jm 1024 -tm 1024 -s 2

參數:-d,背景運作session,flink yarn client将會隻送出任務到叢集然後關閉自己。注意:在這種情況下,無法使用flink停止yarn session,使用yarn工具來停止yarn session。

-n 2個TaskManager,每個TaskManager記憶體為1G(-tm)且占用了2個核(-s)

通過yarn application --list 檢視狀态

檢視日志檔案:yarn logs -applicationId <application ID> 

nohup flink run -c com.du.mflink2hbase workers2hbase-0.0.1-SNAPSHOT.jar --port 8020 &

7.2 以叢集模式送出任務,每次都會建立flink叢集

flink run -m yarn-cluster -c com.du.mflink2hbase  /home/user/lflink-demo-1.0-SNAPSHOT.jar

7.3 問題

Q1:Invalid resource request, requested virtual cores < 0, or requested virtual cores > max configured, requestedVirtualCores=10, maxVirtualCores=4

修改yarn-site.xml

+

<property>

    <name>yarn.scheduler.maximum-allocation-vcores</name>

    <value>32</value>

  </property>

繼續閱讀