千家信息网

Flink怎么执行用户程序

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,本篇内容主要讲解"Flink怎么执行用户程序",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink怎么执行用户程序"吧!执行用户程序CliFronten
千家信息网最后更新 2025年02月06日Flink怎么执行用户程序

本篇内容主要讲解"Flink怎么执行用户程序",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink怎么执行用户程序"吧!

执行用户程序

CliFrontend生成Configuration对象

以flink on yarn为例:

(1)在CliFrontend的main()方法中,生成GenericCLI、FlinkYarnSessionCli、DefaultCLI三种命令行对象,依次放入ArrayList对象customCommandLines中

        public static void main(final String[] args) {                EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);                        ......                // 3. load the custom command lines                final List customCommandLines = loadCustomCommandLines(                        configuration,                        configurationDirectory);                ......        }

在后面 run() -> validateAndGetActiveCommandLine()方法中依次从customCommandLines对象中取出命令行对象,调用isActive()方法,判断是哪一种命令行

    public CustomCommandLine validateAndGetActiveCommandLine(CommandLine commandLine) {                for (CustomCommandLine cli : customCommandLines) {                        if (cli.isActive(commandLine)) {                                return cli;                        }                }                throw new IllegalStateException("No valid command-line found.");        }

FlinkYarnSessionCli的isActive()方法中,会去判断运行bin/flink脚本时是否传入了-m参数,其值是否为yarn-cluster

      @Override        public boolean isActive(CommandLine commandLine) {                final String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);                final boolean yarnJobManager = ID.equals(jobManagerOption);                final boolean hasYarnAppId = commandLine.hasOption(applicationId.getOpt())                                || configuration.getOptional(YarnConfigOptions.APPLICATION_ID).isPresent();                final boolean hasYarnExecutor = YarnSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET))                                || YarnJobClusterExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));                return hasYarnExecutor || yarnJobManager || hasYarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);        }

addressOption为匹配-m,ID为yarn-cluster

(2)在CliFrontend的run()方法中,通过getEffectiveConfiguration()方法得到Configuration对象,传入的命令行对象activeCommandLine即为上面第一个步骤中得到的FlinkYarnSessionCli;

在getEffectiveConfiguration()方法中会调用FlinkYarnSessionCli的applyCommandLineOptionsToConfiguration()方法来增加和yarn相关的配置,代码如下:

      public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {                // we ignore the addressOption because it can only contain "yarn-cluster"                final Configuration effectiveConfiguration = new Configuration(configuration);                applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);                final ApplicationId applicationId = getApplicationId(commandLine);                if (applicationId != null) {                        final String zooKeeperNamespace;                        if (commandLine.hasOption(zookeeperNamespace.getOpt())){                                zooKeeperNamespace = commandLine.getOptionValue(zookeeperNamespace.getOpt());                        } else {                                zooKeeperNamespace = effectiveConfiguration.getString(HA_CLUSTER_ID, applicationId.toString());                        }                        effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);                        effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(applicationId));                        effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);                } else {                        effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);                }                ......        ......}

其中关键配置DeploymentOptions.TARGET,即程序目标运行环境;YarnJobClusterExecutor.NAME 值为

public enum YarnDeploymentTarget {        PER_JOB("yarn-per-job"),    .....}

给StreamExecutionEnvironment设置Configuration对象

在ClientUitls的executeProgram()中通过下面代码设置:

 public static void executeProgram(                        PipelineExecutorServiceLoader executorServiceLoader,                        Configuration configuration,                        PackagedProgram program,                        boolean enforceSingleJobExecution,                        boolean suppressSysout) throws ProgramInvocationException {                ......                try {                        ......                        StreamContextEnvironment.setAsContext(                                executorServiceLoader,                                configuration,                                userCodeClassLoader,                                enforceSingleJobExecution,                                suppressSysout);                        ......                } finally {                        Thread.currentThread().setContextClassLoader(contextClassLoader);                }        }

运行用户程序main()方法

ClientUtils的executeProgram()方法中调用PackagedProgram的invokeInteractiveModeForExecution(),来执行用户main()方法

    private static void callMainMethod(Class entryClass, String[] args) throws ProgramInvocationException {                Method mainMethod;                if (!Modifier.isPublic(entryClass.getModifiers())) {                        ......                }                try {                        mainMethod = entryClass.getMethod("main", String[].class);                } catch (NoSuchMethodException e) {                        ......                } catch (Throwable t) {                        ......                }                if (!Modifier.isStatic(mainMethod.getModifiers())) {                        ......                }                if (!Modifier.isPublic(mainMethod.getModifiers())) {                        ......                }                try {                        mainMethod.invoke(null, (Object) args);                } catch (IllegalArgumentException e) {                        ......                } catch (IllegalAccessException e) {                        ......                } catch (InvocationTargetException e) {                        ......                } catch (Throwable t) {                                        }        }

到此,相信大家对"Flink怎么执行用户程序"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0