千家信息网

从Flink client提交源码看第三方jar包的动态加载的解决方案是怎样的

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,从Flink client提交源码看第三方jar包的动态加载的解决方案是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收
千家信息网最后更新 2025年02月04日从Flink client提交源码看第三方jar包的动态加载的解决方案是怎样的

从Flink client提交源码看第三方jar包的动态加载的解决方案是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

1. flink run 提交流程源码分析

查看flink脚本找到执行run命令的入口类,如下:

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@

入口类为:org.apache.flink.client.cli.CliFrontend。 最终会调用 parseParameters(String[] args) 方法来执行命令解析,run 命令会调用 run(params) 方法,如下:

switch (action) {        case ACTION_RUN:                run(params);                return 0;        case ACTION_RUN_APPLICATION:                runApplication(params);                return 0;        case ACTION_LIST:                list(params);                return 0;        case ACTION_INFO:                info(params);                return 0;        case ACTION_CANCEL:                cancel(params);                return 0;        case ACTION_STOP:                stop(params);                return 0;        case ACTION_SAVEPOINT:                savepoint(params);                return 0;}

run 方法代码如下

protected void run(String[] args) throws Exception {                LOG.info("Running 'run' command.");                final Options commandOptions = CliFrontendParser.getRunCommandOptions();                final CommandLine commandLine = getCommandLine(commandOptions, args, true);                // evaluate help flag                if (commandLine.hasOption(HELP_OPTION.getOpt())) {                        CliFrontendParser.printHelpForRun(customCommandLines);                        return;                }                final CustomCommandLine activeCommandLine =                                validateAndGetActiveCommandLine(checkNotNull(commandLine));                final ProgramOptions programOptions = ProgramOptions.create(commandLine);        # 创建 PackagedProgram 对象                final PackagedProgram program =                                getPackagedProgram(programOptions);        #解析获取相关依赖jar                final List jobJars = program.getJobJarAndDependencies();                # 生成最终提交配置        final Configuration effectiveConfiguration = getEffectiveConfiguration(                                activeCommandLine, commandLine, programOptions, jobJars);                LOG.debug("Effective executor configuration: {}", effectiveConfiguration);                try {                        executeProgram(effectiveConfiguration, program);                } finally {                        program.deleteExtractedLibraries();                }        }

run方法根据用户传入的参数如 main函数,jar包等信息创建出 PackagedProgram 对象,这个对象封装了用户提交的信息。从 getPackagedProgram()方法里可以看出。

return PackagedProgram.newBuilder()                        .setJarFile(jarFile)                        .setUserClassPaths(classpaths)                        .setEntryPointClassName(entryPointClass)                        .setConfiguration(configuration)                        .setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())                        .setArguments(programArgs)                        .build();

查看PackagedProgram构造方法,里面会创建几个关键成员变量:

  • classpaths:用户-C 参数传入的信息

  • jarFile : 用户的主函数的jar

  • extractedTempLibraries :提取出上面主jar包里 lib/ 文件夹下的所有jar包信息,供后面classloader使用

  • userCodeClassLoader : 用户code的classloader,这个classloader会把classpaths,jarFile,extractedTempLibraries 都加入到classpath里。该userCodeClassLoader默认采用child_first优先策略

  • mainClass :用户main函数方法 该构造方法如下:

private PackagedProgram(                        @Nullable File jarFile,                        List classpaths,                        @Nullable String entryPointClassName,                        Configuration configuration,                        SavepointRestoreSettings savepointRestoreSettings,                        String... args) throws ProgramInvocationException {                this.classpaths = checkNotNull(classpaths);                this.savepointSettings = checkNotNull(savepointRestoreSettings);                this.args = checkNotNull(args);                checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null.");                // whether the job is a Python job.                this.isPython = isPython(entryPointClassName);                // load the jar file if exists                this.jarFile = loadJarFile(jarFile);                assert this.jarFile != null || entryPointClassName != null;                // now that we have an entry point, we can extract the nested jar files (if any)                this.extractedTempLibraries = this.jarFile == null ? Collections.emptyList() : extractContainedLibraries(this.jarFile);                this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(                        getJobJarAndDependencies(),                        classpaths,                        getClass().getClassLoader(),                        configuration);                // load the entry point class                this.mainClass = loadMainClass(                        // if no entryPointClassName name was given, we try and look one up through the manifest                        entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),                        userCodeClassLoader);                if (!hasMainMethod(mainClass)) {                        throw new ProgramInvocationException("The given program class does not have a main(String[]) method.");                }        }

PackagedProgram 里 getJobJarAndDependencies 方法,该方法收集了job所有依赖的jar包,这些jar包后续会提交到集群并加入到classpath路径中。

PackagedProgram对象构造完成之后,便是创建最终的Configuration对象了,如下方法

final Configuration effectiveConfiguration = getEffectiveConfiguration(                                activeCommandLine, commandLine, programOptions, jobJars);

这个方法会设置两个参数:

  • pipeline.classpaths: 值为getJobJarAndDependencies()和classpaths里的url

  • pipeline.jars: 值为getJobJarAndDependencies()返回的jar和lib文件夹下的依赖,后续提交集群的时候会根据这个把jar一起提交到集群

准备好 PackagedProgram和Configuration后,就开始执行用户程序了,

executeProgram(effectiveConfiguration, program);

详细代码如下:

public static void executeProgram(                        PipelineExecutorServiceLoader executorServiceLoader,                        Configuration configuration,                        PackagedProgram program,                        boolean enforceSingleJobExecution,                        boolean suppressSysout) throws ProgramInvocationException {                checkNotNull(executorServiceLoader);                final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();                final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();                try {# 设置用户上下文用户类加载器Thread.currentThread().setContextClassLoader(userCodeClassLoader);                        LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));                        ContextEnvironment.setAsContext(                                executorServiceLoader,                                configuration,                                userCodeClassLoader,                                enforceSingleJobExecution,                                suppressSysout);                        StreamContextEnvironment.setAsContext(                                executorServiceLoader,                                configuration,                                userCodeClassLoader,                                enforceSingleJobExecution,                                suppressSysout);                        try {                # 反射调用户的 main 函数执行job提交                                program.invokeInteractiveModeForExecution();                        } finally {                                ContextEnvironment.unsetAsContext();                                StreamContextEnvironment.unsetAsContext();                        }                } finally {                        Thread.currentThread().setContextClassLoader(contextClassLoader);                }        }

最后总结一下整个流程:

  1. 执行flink run 命名传入相关参数

  2. 创建PackagedProgram对象,准备相关jar,用户类加载器,Configuration对象

  3. 通过反射调用用户Main方法

  4. 构建Pipeline StreamGraph,提交job到集群

2. 提交job时,动态加载第三方jar(如udf等)

通过分析流程我们可以发现可以有两种方式来实现动态jar的添加

  1. 动态的 把三方jar 放入 主函数jar包的lib目录下(可以通过jar uf 命名搞定) 因为在PackagedProgram构造方法里会通过extractContainedLibraries()方法获取jar lib目录里的所有jar,并且这些jar会一并上传到集群

  2. 在用户任务main函数里,通过反射动态设置 Configuration 对象的 pipeline.classpaths , pipeline.jars 这两个属性 。并且还需要把第三方jar加载到Thread.contextClassLoader里。(可参见:https://zhuanlan.zhihu.com/p/278482766)

本人在项目中直接采用的是第一种方案,不会添加更多代码。

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

方法 用户 对象 函数 动态 集群 信息 参数 第三方 代码 命令 流程 反射 方案 源码 两个 入口 文件 文件夹 更多 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 南充服务器云存储公司 数据库打开文件只显示文件夹 海南邵赢网络技术官网 网络药理学常用数据库及网站汇总 南威互联网科技集团执行总裁 黄浦区常用网络技术信息推荐 软件开发offer知乎 数据库安全采取措施 软件开发底层中间件业务层 云服务器在哪个地方在线选购 服务器上的git 数据库简单解决问题 上海浦东互联网科技公司注册 防火墙连接不上数据库 软件开发人员外包 计算机软件开发技术的答辩内容 腾讯云香港轻量服务器 架设代理 塔式服务器放置 运城众思互联网科技有限公司 网络调试助手阿里云服务器不能用 携程误删数据库 花都区质量网络技术开发咨询报价 软件开发测量数据 宝山区软件开发生产过程 惠州5g服务器标准机柜 青岛软件开发江创 软件开发无线和软件开发区别 江西企业软件开发服务价格 共青团网络安全管理制度 网络安全的概念发生了什么变化
0