千家信息网

flink中zk引起的重启怎么解决

发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,这篇文章主要介绍"flink中zk引起的重启怎么解决",在日常操作中,相信很多人在flink中zk引起的重启怎么解决问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"fli
千家信息网最后更新 2024年09月22日flink中zk引起的重启怎么解决

这篇文章主要介绍"flink中zk引起的重启怎么解决",在日常操作中,相信很多人在flink中zk引起的重启怎么解决问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"flink中zk引起的重启怎么解决"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

背景

最近用flink on k8s跑程序的过程中,发现某个时刻经常导致程序重启,定时任务每天加载一次缓存,该缓存有大量数据,加载时长需要60-90s左右。这个定时任务经常会导致k8s重启程序,使其极不稳定,于是各种调优。

内存相关

  1. 怀疑可能是算子的sender和receiver之间因为加载缓存导致某种通信不可达,默认的心跳时间是50s,于是修改参数:heartbeat.timeout: 180000,heartbeat.interval: 20000。

  2. jobmanager和taskmanager是用akka通信,修改参数akka.ask.timeout: 240s。

这些操作之后,偶尔还是会在加载缓存的时候发现异常,日志截取如下

2020-10-16 17:05:05,939 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 29068ms for sessionid 0x30135fa8005449f2020-10-16 17:05:05,948 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 29068ms for sessionid 0x30135fa8005449f, closing socket connection and attempting reconnect2020-10-16 17:05:07,609 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED2020-10-16 17:05:07,611 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.2020-10-16 17:05:07,612 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 1bb3b7bdcfbc39cf760064ed9736ea80 with leader id bed26e07640e5e79197e468c85354534 lost leadership.2020-10-16 17:05:07,613 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.2020-10-16 17:05:07,614 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 1bb3b7bdcfbc39cf760064ed9736ea80.2020-10-16 17:05:07,615 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: Custom Source -> Flat Map -> Timestamps/Watermarks (15/15) (052a84a37a0647ab485baa54f149b762).2020-10-16 17:05:07,615 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (15/15) (052a84a37a0647ab485baa54f149b762) switched from RUNNING to FAILED.org.apache.flink.util.FlinkException: JobManager responsible for 1bb3b7bdcfbc39cf760064ed9736ea80 lost the leadership.at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1274)at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:155)at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1698)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive(Actor.scala:517)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at akka.dispatch.Mailbox.exec(Mailbox.scala:235)at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused by: java.lang.Exception: Job leader for job id 1bb3b7bdcfbc39cf760064ed9736ea80 lost leadership.... 22 more

再经过调查发现,这个跟zk有关系,zk在切换leader或者遇到网络波动之类的,会触发SUSPENDED状态,这个状态,会导致lost the leadership错误,而遇到这个错误,k8s直接就重启程序。其实访问zk还是正常的。 再经过一系列调查,这种问题别人早就遇到,还改了代码,就是flink官方没合并代码。调查的过程不表,有用的链接如下

  1. https://www.cnblogs.com/029zz010buct/p/10946244.html

这个有用的是升级curator包, flink用的是2.12.0,暂时没去操作,里面提到的SessionConnectionStateErrorPolicy是在4.x版本的,应该还是要去编译部分代码。

  1. https://github.com/apache/flink/pull/9066 https://issues.apache.org/jira/browse/FLINK-10052

    这个是其他人的解决方案,本人用的也是这个方法。 不把SUSPENDED状态认为是lost leadership,修改LeaderLatch的handleStateChange方法

            case RECONNECTED:            {                try                {                    if (!hasLeadership.get())                    {                        reset();                    }                }                catch ( Exception e )                {                    ThreadUtils.checkInterrupted(e);                    log.error("Could not reset leader latch", e);                    setLeadership(false);                }                break;            }            case LOST:            {                setLeadership(false);                break;            }

编译flink-shaded-hadoop-2-uber

找到这段代码之后,自然是找到了flink-shaded-hadoop-2-uber-xxx.jar这个包,在flink1.10的版本,还支持hadoop的这个包,在1.11之后已经不再主动支持,需要的要自己去下载,因为这个包在打镜像时会特意加上去,所以目标锁定这个包,重新编译。简单说下编译过程

  1. https://github.com/apache/curator/tree/apache-curator-2.12.0 下载这个版本的源码,修改curator-recipes下的src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java,修改内容如上所示,打的包是2.12.0。

  2. https://github.com/apache/flink-shaded/tree/release-10.0 下载flink-shaded 1.10版本的源码,修改flink-shaded-hadoop-2-parent的pom文件,增加exclusion,去掉curator-recipes的依赖,增加自己编译的curator-recipes。观察到不去掉依赖,默认是2.7.1版本,应该是这块代码好多年没动过,版本一直停留在2.7.1。

                                 org.apache.hadoop                        hadoop-common                        ${hadoop.version}                                                        ...省略若干exclusion                                                                org.apache.curator                                        curator-recipes                                                                                                                                org.apache.curator                        curator-recipes                        2.12.0                
  1. 因为我们用的是2.8.3-10.0版本的,源码是2.4.1的,修改成2.8.3

  2. 看根目录的readme.md,在flink-shaded-release-10.0/flink-shaded-hadoop-2-parent目录运行mvn package -Dshade-sources打包,打包完成之后,用工具反编译观察一下,SUSPENDED的代码确实去掉了,重新打镜像,跑程序。

到此,关于"flink中zk引起的重启怎么解决"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0