千家信息网

Flink提交任务的方法是什么

发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,本篇内容主要讲解"Flink提交任务的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink提交任务的方法是什么"吧!一、关键组件任务提交过程
千家信息网最后更新 2025年02月02日Flink提交任务的方法是什么

本篇内容主要讲解"Flink提交任务的方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink提交任务的方法是什么"吧!

一、关键组件

任务提交过程中有三个重要组件:Dispatcher、JobMaster、JobManagerRunnerImpl。通过下面调用路径先找到MiniDispatcher:

YarnJobClusterEntrypoint的main() -> ClusterEntrypoint的runCluster() -> DefaultDispatcherResourceManagerComponentFactory的create() -> DefaultDispatcherRunnerFactory的createDispatcherRunner() -> DefaultDispatcherRunner的grantLeadership() -> JobDispatcherLeaderProcess的onStart() -> DefaultDispatcherGatewayServiceFactory的create() -> JobDispatcherFactory的createDispatcher() -> MiniDispatcher的start()

(1)Dispatcher

负责接收任务提交请求,并分给JobManager执行;

Dispatcher启动时,会运行startRecoveredJobs()来启动需要恢复的任务。当Flink on Yarn模式时,MiniDispatcher将当前任务传入到需要恢复的任务中,这样就实现了任务的提交启动

(2)JobManagerRunner

负责运行JobMaster

(3)JobMaster

负责运行任务,对应旧版的JobManager;

一个任务对应一个JobMaster;

二、JobMaster执行任务

在JobMaster中通过Scheduler、Execution组件来执行一个任务。将任务DAG中每个节点算子分配给TaskManager中的TaskExecutor运行。

Execution的start()方法中通过rpc远程调用TaskExecutor的submitTask()方法:

  public void deploy() throws JobException {                        ......                try {                        ......                        final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();                        final ComponentMainThreadExecutor jobMasterMainThreadExecutor =                                vertex.getExecutionGraph().getJobMasterMainThreadExecutor();                                                CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)                                .thenCompose(Function.identity())                                .whenCompleteAsync(                                        .....,                                        jobMasterMainThreadExecutor);                }                catch (Throwable t) {                        ......                }        }

三、TaskExecutor运行算子节点任务

TaskExecutor的submitTask()方法中通过创建org.apache.flink.runtime.taskmanager.Task来运行算子任务。Task的doRun()方法中通过算子节点对应的执行类AbstractInvokable来运行算子的处理逻辑,每个算子对应的执行类AbstractInvokable在客户端提交任务时确定,StreamExecutionEnvironment的addOperator():

    public  void addOperator(                        Integer vertexID,                        @Nullable String slotSharingGroup,                        @Nullable String coLocationGroup,                        StreamOperatorFactory operatorFactory,                        TypeInformation inTypeInfo,                        TypeInformation outTypeInfo,                        String operatorName) {                Class invokableClass =                                operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;                addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,                                outTypeInfo, operatorName, invokableClass);        }

当是流式任务时,调用StreamTask的invoke()方法。当是source节点时,通过调用链 StreamTask.invoke() -> StreamTask.runMailboxLoop() -> MailboxProcessor.runMailboxLoop() -> SourceStreamTask.processInput() :

    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {                controller.suspendDefaultAction();                // Against the usual contract of this method, this implementation is not step-wise but blocking instead for                // compatibility reasons with the current source interface (source functions run as a loop, not in steps).                sourceThread.setTaskDescription(getName());                sourceThread.start();                sourceThread.getCompletionFuture().whenComplete((Void ignore, Throwable sourceThreadThrowable) -> {                        if (isCanceled() && ExceptionUtils.findThrowable(sourceThreadThrowable, InterruptedException.class).isPresent()) {                                mailboxProcessor.reportThrowable(new CancelTaskException(sourceThreadThrowable));                        } else if (!isFinished && sourceThreadThrowable != null) {                                mailboxProcessor.reportThrowable(sourceThreadThrowable);                        } else {                                mailboxProcessor.allActionsCompleted();                        }                });        }

创建线程LegacySourceFunctionThread实例,来开启单独生产数据的线程。LegacySourceFunctionThread的run()方法中调用StreamSource的run()方法:

     public void run(final Object lockingObject,                        final StreamStatusMaintainer streamStatusMaintainer,                        final Output> collector,                        final OperatorChain operatorChain) throws Exception {                final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();                final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();                final long latencyTrackingInterval = getExecutionConfig().isLatencyTrackingConfigured()                        ? getExecutionConfig().getLatencyTrackingInterval()                        : configuration.getLong(MetricOptions.LATENCY_INTERVAL);                LatencyMarksEmitter latencyEmitter = null;                if (latencyTrackingInterval > 0) {                        latencyEmitter = new LatencyMarksEmitter<>(                                getProcessingTimeService(),                                collector,                                latencyTrackingInterval,                                this.getOperatorID(),                                getRuntimeContext().getIndexOfThisSubtask());                }                final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();                this.ctx = StreamSourceContexts.getSourceContext(                        timeCharacteristic,                        getProcessingTimeService(),                        lockingObject,                        streamStatusMaintainer,                        collector,                        watermarkInterval,                        -1);                try {                        userFunction.run(ctx);                        // if we get here, then the user function either exited after being done (finite source)                        // or the function was canceled or stopped. For the finite source case, we should emit                        // a final watermark that indicates that we reached the end of event-time, and end inputs                        // of the operator chain                        if (!isCanceledOrStopped()) {                                // in theory, the subclasses of StreamSource may implement the BoundedOneInput interface,                                // so we still need the following call to end the input                                synchronized (lockingObject) {                                        operatorChain.endHeadOperatorInput(1);                                }                        }                } finally {                        if (latencyEmitter != null) {                                latencyEmitter.close();                        }                }        }

StreamSource的run()方法中调用 userFunction.run(ctx); 当数据源是kafka时,userFunction为FlinkKafkaConsumerBase

3.1 userFunction和 headOperator

最后执行run()的headOperator和算子程序userFunction是在添加算子时确定的,比如添加kafka数据源时

 environment.addSource(new FlinkKafkaConsumer(......));

最后调用的addSource()方法:

   public  DataStreamSource addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) {                TypeInformation resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);                boolean isParallel = function instanceof ParallelSourceFunction;                clean(function);                final StreamSource sourceOperator = new StreamSource<>(function);                return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);        }

headOperator为StreamSource,StreamSource中的userFunction为FlinkKafkaConsumer

到此,相信大家对"Flink提交任务的方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0