![Flink与Kylin深度实践](https://wfqqreader-1252317822.image.myqcloud.com/cover/467/37323467/b_37323467.jpg)
2.5 Flink on YARN模式
Flink任务也可以运行在YARN上面,将Flink任务提交到YARN平台可以实现统一的任务资源调度管理,方便开发人员管理集群中的CPU和内存等资源。如图2-1所示,Flink on YARN也有两种模式:单个YARN Session模式和多个YARN Session模式。
环境要求:Hadoop至少为2.2版;HDFS及YARN服务启动正常。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/29_01.jpg?sign=1739032126-ME6EnzyM5KuWsMS25WnecbjUFItASDtJ-0-9143e21489b0c53cb24f283c7bbc7ff8)
●图2-1 Flink on YARN模式
2.5.1 单个YARN Session模式
这种模式需要先启动集群,然后再提交作业,接着会向YARN申请资源空间,之后资源保持不变。如果资源不足,下一个作业就无法提交,只能等到YARN中的一个作业执行完成后释放资源,所以实际工作中一般不会使用这种模式。
这种模式不需要做任何配置,可以直接将任务提交到YARN集群,这之前需要提前启动HDFS以及YARN集群。
1.修改yarn-site.xml配置文件
在node01上执行以下命令开始修改yarn-site.xml。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/29_02.jpg?sign=1739032126-UM87OPoptJBZIjTWD1so0lNYnr34J4ut-0-ca81ebb505cd833687f0f7130daedbe7)
添加以下配置属性到yarn-site.xml文件中。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/29_03.jpg?sign=1739032126-FI649A8fhFOp3plKXGyDZRQQY4F7QVs7-0-105dd74795a1354366be53146d6793cc)
然后在node01上将修改后的配置文件复制到node02与node03服务器,命令如下。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/29_04.jpg?sign=1739032126-fNywmlhGID2PVJn9uz42MFyCYNQRsSTS-0-cbf6a417a09daef841e021cfd5373a22)
之后重新启动YARN集群。
2.修改Flink配置文件
在node01上执行以下命令修改Flink配置文件。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/30_01.jpg?sign=1739032126-g6JZpbj30eBKQ4CXO4AZhQuoQ3YnEDEy-0-fb0941f5b1820789d103f496da788d07)
3.在HDFS上创建文件夹
命令如下。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/30_02.jpg?sign=1739032126-HA0LaAQvlmIDYLKG7YGnqRBxouHMjToF-0-bf1183292413fd4776d7ac00b2448da9)
4.在YARN中启动Flink集群
直接在node01上执行以下命令,在YARN中启动一个全新的Flink集群。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/30_03.jpg?sign=1739032126-asemvPdEedQUpWTVGCbzwMQorng9xJlw-0-47fc79d67b68cb471e0986c8ae4268eb)
可以直接使用yarn-session.sh这个脚本来启动。也可以使用“--help”查看更多参数设置。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/30_04.jpg?sign=1739032126-WTUok0kmYzl4ceGJbsH2oyJZ04ceT58y-0-5b019754a58c0e61620870621d43de63)
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/31_01.jpg?sign=1739032126-7pORbj5o1cL1oaTRhxEgBGVVLaGCSu4N-0-f61ec434c3e2ca545eef24083277748c)
注意:
如果启动时YARN的内存太小,则可能报出以下错误。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/31_02.jpg?sign=1739032126-dolz8TdDB1Vz4CRAoQMCWgaf6ONh5EP5-0-114157674595d75a6dc8e95a0e16d359)
此时需要修改yarn-site.xml添加以下配置,然后重启YARN。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/31_03.jpg?sign=1739032126-IN0CiBD6Dhy1dH6XPbliXD0CB73DztgY-0-4c7c9362741195eb43199f9fa799955a)
这个参数的功能主要是让YARN集群跳过集群资源检查,避免由于虚拟机内存不够而导致任务提交失败。
5.查看YARN管理界面
访问YARN的8088管理界面http://node01:8088/cluster,发现其中有一个应用,这是为Flink单独启动的一个Session。
6.提交任务
使用Flink自带的jar包实现单词统计功能。
在node01上准备单词文件。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/32_01.jpg?sign=1739032126-Cm6UkG98BzM9Crbfk4FfMSShzOauK8es-0-6fde0619f41c816b897f9af84dbf0bac)
文件内容如下。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/32_02.jpg?sign=1739032126-1Uv17rCA9mlRnRcvnQQl4tHecSEZJjBu-0-9629d10adde6510833560d75c40a10a6)
在HDFS上创建文件夹并上传文件。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/32_03.jpg?sign=1739032126-YT8S9FU1kkCg5of44VpipB3tLLvjT2B4-0-cd8fb0ff337098eb594e1209b4414fbd)
在node01上执行以下命令,提交任务到Flink集群。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/32_04.jpg?sign=1739032126-OkyR2LA6ch3o1RJ9idEPo5rpDhTTJtfQ-0-b7992638669bd0ab29d36c223c94067b)
7.验证YARN Session的高可用性
通过node01的8088界面,查看YARN Session在哪一台机器上启动,然后关闭YARN Session进程,这时YARN Session会在另外一台机器上重新启动。
找到YarnSessionClusterEntrypoint所在的服务器,然后关闭该进程。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/32_05.jpg?sign=1739032126-i0G4p7hSZUKNeOiYCIjqLfY0fCUDIehY-0-45cf3895f4f8c5e57e56a5a4abde8d26)
关闭进程之后,会发现YARN集群重新启动了一个YarnSessionClusterEntrypoint进程在其他机器上。如图2-2所示,YARN上又启动了一个新的任务。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/33_01.jpg?sign=1739032126-YlToGkJVqeFzW5HzwAprcwWlGtrNrGiM-0-33bd5d2c07b252b599d1678557268603)
●图2-2 Flink on YARN的高可用性
2.5.2 多个YARN Session模式
这种模式的优点是一个任务对应一个Job,即每提交一个Job都会根据自身情况向YARN申请资源,直到Job执行完成,并不会影响下一个Job的正常运行,除非YARN上没有任何资源。
注意:
Client端必须设置YARN_CONF_DIR、HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。
这种模式下不需要在YARN中启动任何集群,直接提交任务即可。
1.直接执行命令提交任务
编写提交任务的脚本并执行。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/33_02.jpg?sign=1739032126-MmxIEUHBX0UFwwB0DhRolNkxNKvK2hYx-0-2e85a83e1a5443b0a656db66d69abf67)
2.查看输出结果
在HDFS中执行以下命令查看输出结果。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/33_03.jpg?sign=1739032126-yHjLDZdSQy0IwUax22QkO2YWxOYUcQQG-0-db0cb73de9eb3babc011281f74c290e1)
3.查看“flink run”的帮助文档
使用“--help”查看帮助文档中的参数。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/33_04.jpg?sign=1739032126-hA29mCNs6zp8TWmatZaqUz6AgJzehehM-0-daeff3e2a14c5b2d113659989d0f7d2e)
结果如下。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/34_01.jpg?sign=1739032126-QaoH22hawMo7nWw6MRn17y6KATo0UIUH-0-4264a5ba0119837aed1fde1e70a5086f)
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/35_01.jpg?sign=1739032126-dbZZMVOaHkQ54zXv07bj8QHF3nX9TFx7-0-6a6212804801f142f7724b64ff6d9ecf)
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/36_01.jpg?sign=1739032126-bRclsE7ZcLKqMW53LmisCXInWYLCzEzN-0-d51faf8de45071d5f7311b390edaf602)
2.5.3“flink run”脚本分析
提交Flink任务时,可以加入以下这些参数。
1)默认查找当前YARN集群中已有YARN Session信息中的JobManager(所在路径:/tmp/.yarn-properties-root)。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/36_02.jpg?sign=1739032126-tCNWa36v4BNXjvn4Ix00V4SEd90WupwY-0-d197552436cd08f2ef3a932b324c00cd)
2)连接指定主机和端口的JobManager。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/36_03.jpg?sign=1739032126-YRKd3x3gZKeshfMoQfPN7BUjO4n4uKL0-0-752651686c09d593b57cc396e39eb394)
3)启动一个新的YARN-Session。
![](https://epubservercos.yuewen.com/B89B1D/19773740901350206/epubprivate/OEBPS/Images/36_04.jpg?sign=1739032126-CLxhIbcA3Sghpopl2krU8erDDG6kLMwL-0-2189195f3ac59c9a73ba4860647765b4)
注意:
YARN Session命令行的选项也可以使用“./bin/flink”获得。它们都有一个“y”或者“yarn”的前缀,例如:bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar。