flink yarn模式下的 HA 和 故障重启

Apache Flink

HA

​ JobManager协调每个Flink部署。它负责调度和资源管理。默认情况下,每个Flink集群只有一个JobManager实例。这会创建一个单点故障(SPOF):如果JobManager崩溃,就不能提交新的程序,运行中的程序也会失败。使用JobManager高可用性,您可以从JobManager故障中恢复,从而消除SPOF。您可以为独立集群和纱线集群配置高可用性。

img

​ 由于生产环境下通常使用yarn模式,所以这里介绍yarn模式下的高可用配置,并测试杀死jobmanager进程后重启的过程。通常高可用需要配置zookeeper参数,所以这里分别对yarn-site.xmlflink-conf.yaml进行配置:

yarn-site.xml增加jobmanageer失败后最大的重试次数

1
2
3
4
5
6
7
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>10</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>

flink-conf.yaml中设置zookeeper参数:

1
2
3
4
5
high-availability: zookeeper
high-availability.zookeeper.quorum: hadoop01:2181,hadoop02:2181,hadoop03:2181
high-availability.storageDir: hdfs:///flink/ha
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10

注意yarn.resourcemanager.am.max-attempts应用程序重新启动的上限。因此,Flink中设置的应用程序尝试次数不能超过启动YARN的YARN群集设置。(yarn.resourcemanager.am.max-attemps)。

yarn.application-attempts <= yarn.resourcemanager.am.max-attempts

配置成功并分发至各节点,然后分别启动zookeeper,yarn,并开启session cluster:

1
bin/yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d

​ 查看控制台:

​ 查看job manager信息,可以看到zookeeper中注册了flink节点信息:

在控制台杀死YarnSessionClusterEntrypoint进程,flink UI界面无法访问

查看控制台发现在hadoop03又出现一个YarnSessionClusterEntrypoint进程:

通过yarn application发现又出现一个application,打开:

故障重启

flink在消费kafka源源不断产生的消息时,难免会遇到程序挂掉的情况,这时候如果无人值守,那么kafka生产的消息会一直积压无法消费,所以需要开启故障重启来自动恢复任务。

flink重启策略有四种:下面是在flink-conf.yaml配置和代码中进行配置:

固定延迟重启策略

1
2
3
4
5
6
7
8
9
# fixed-delay:固定延迟策略
restart-strategy: fixed-delay

# 尝试5次,默认Integer.MAX_VALUE
restart-strategy.fixed-delay.attempts: 5

# 设置延迟时间10s,默认为 akka.ask.timeout时间
restart-strategy.fixed-delay.delay: 10s

1
2
3
4
5
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 5表示最大重试次数为5次,10s为延迟时间
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,Time.of(10, TimeUnit.SECONDS)));

故障率重启策略

1
2
3
4
5
6
7
8
9
10
11
12
# 设置重启策略为failure-rate
restart-strategy: failure-rate

# 失败作业之前的给定时间间隔内的最大重启次数,默认1
restart-strategy.failure-rate.max-failures-per-interval: 3

# 测量故障率的时间间隔。默认1min
restart-strategy.failure-rate.failure-rate-interval: 5min

# 两次连续重启尝试之间的延迟,默认akka.ask.timeout时间
restart-strategy.failure-rate.delay: 10s

1
2
3
4
5
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 3为最大失败次数;5min为测量的故障时间;10s为2次间的延迟时间
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));

后备重启策略

​ 使用群集定义的重新启动策略。这对于启用检查点的流式传输程序很有帮助。默认情况下,如果没有定义其他重启策略,则选择固定延迟重启策略。

无重启策略

1
restart-strategy: none
1
2
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.noRestart());
Donate
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.

扫一扫,分享到微信

微信分享二维码
  • Copyrights © 2020-2021 ycfn97
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信