千家信息网

Flink中CoProcessFunction如何使用

发表于:2024-12-03 作者:千家信息网编辑
千家信息网最后更新 2024年12月03日,今天就跟大家聊聊有关Flink中CoProcessFunction如何使用,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。本文是《Flink处理
千家信息网最后更新 2024年12月03日Flink中CoProcessFunction如何使用

今天就跟大家聊聊有关Flink中CoProcessFunction如何使用,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

  • 本文是《Flink处理函数实战》系列的第五篇,学习内容是如何同时处理两个数据源的数据;

  • 试想在面对两个输入流时,如果这两个流的数据之间有业务关系,该如何编码实现呢,例如下图中的操作,同时监听99989999端口,将收到的输出分别处理后,再由同一个sink处理(打印):

  • Flink支持的方式是扩展CoProcessFunction来处理,为了更清楚认识,我们把KeyedProcessFunctionCoProcessFunction的类图摆在一起看,如下所示:

  • 从上图可见,CoProcessFunction和KeyedProcessFunction的继承关系一样,另外CoProcessFunction自身也很简单,在processElement1和processElement2中分别处理两个上游流入的数据即可,并且也支持定时器设置;

编码实战

接下来咱们开发一个应用来体验CoProcessFunction,功能非常简单,描述如下:

  1. 建两个数据源,数据分别来自本地99989999端口;

  2. 每个端口收到类似aaa,123这样的数据,转成Tuple2实例,f0是aaa,f1是123

  3. 在CoProcessFunction的实现类中,对每个数据源的数据都打日志,然后全部传到下游算子;

  4. 下游操作是打印,因此99989999端口收到的所有数据都会在控制台打印出来;

  5. 整个demo的功能如下图所示:

  • 接下来编码实现上述功能;

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

名称链接备注
项目主页https://github.com/zq2599/blog_demos该项目在GitHub上的主页
git仓库地址(https)https://github.com/zq2599/blog_demos.git该项目源码的仓库地址,https协议
git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章的应用在flinkstudy文件夹下,如下图红框所示:

Map算子

  1. 做一个map算子,用来将字符串aaa,123转成Tuple2实例,f0是aaa,f1是123

  2. 算子名为WordCountMap.java

package com.bolingcavalry.coprocessfunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.StringUtils;public class WordCountMap implements MapFunction> {    @Override    public Tuple2 map(String s) throws Exception {        if(StringUtils.isNullOrWhitespaceOnly(s)) {            System.out.println("invalid line");            return null;        }        String[] array = s.split(",");        if(null==array || array.length<2) {            System.out.println("invalid line for array");            return null;        }        return new Tuple2<>(array[0], Integer.valueOf(array[1]));    }}

便于扩展的抽象类

  • 开发一个抽象类,将前面图中提到的监听端口、map处理、keyby处理、打印都做到这个抽象类中,但是CoProcessFunction的逻辑却不放在这里,而是交给子类来实现,这样如果我们想进一步实践和扩展CoProcessFunction的能力,只要在子类中专注做好CoProcessFunction相关开发即可,如下图,红色部分交给子类实现,其余的都是抽象类完成的:

  • 抽象类AbstractCoProcessFunctionExecutor.java,源码如下,稍后会说明几个关键点:

package com.bolingcavalry.coprocessfunction;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.KeyedStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.co.CoProcessFunction;/** * @author will * @email zq2599@gmail.com * @date 2020-11-09 17:33 * @description 串起整个逻辑的执行类,用于体验CoProcessFunction */public abstract class AbstractCoProcessFunctionExecutor {    /**     * 返回CoProcessFunction的实例,这个方法留给子类实现     * @return     */    protected abstract CoProcessFunction<            Tuple2,            Tuple2,            Tuple2> getCoProcessFunctionInstance();    /**     * 监听根据指定的端口,     * 得到的数据先通过map转为Tuple2实例,     * 给元素加入时间戳,     * 再按f0字段分区,     * 将分区后的KeyedStream返回     * @param port     * @return     */    protected KeyedStream, Tuple> buildStreamFromSocket(StreamExecutionEnvironment env, int port) {        return env                // 监听端口                .socketTextStream("localhost", port)                // 得到的字符串"aaa,3"转成Tuple2实例,f0="aaa",f1=3                .map(new WordCountMap())                // 将单词作为key分区                .keyBy(0);    }    /**     * 如果子类有侧输出需要处理,请重写此方法,会在主流程执行完毕后被调用     */    protected void doSideOutput(SingleOutputStreamOperator> mainDataStream) {    }    /**     * 执行业务的方法     * @throws Exception     */    public void execute() throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 并行度1        env.setParallelism(1);        // 监听9998端口的输入        KeyedStream, Tuple> stream1 = buildStreamFromSocket(env, 9998);        // 监听9999端口的输入        KeyedStream, Tuple> stream2 = buildStreamFromSocket(env, 9999);        SingleOutputStreamOperator> mainDataStream = stream1                // 两个流连接                .connect(stream2)                // 执行低阶处理函数,具体处理逻辑在子类中实现                .process(getCoProcessFunctionInstance());        // 将低阶处理函数输出的元素全部打印出来        mainDataStream.print();        // 侧输出相关逻辑,子类有侧输出需求时重写此方法        doSideOutput(mainDataStream);        // 执行        env.execute("ProcessFunction demo : CoProcessFunction");    }}
  • 关键点之一:一共有两个数据源,每个源的处理逻辑都封装到buildStreamFromSocket方法中;

  • 关键点之二:stream1.connect(stream2)将两个流连接起来;

  • 关键点之三:process接收CoProcessFunction实例,合并后的流的处理逻辑就在这里面;

  • 关键点之四:getCoProcessFunctionInstance是抽象方法,返回CoProcessFunction实例,交给子类实现,所以CoProcessFunction中做什么事情完全由子类决定;

  • 关键点之五:doSideOutput方法中啥也没做,但是在主流程代码的末尾会被调用,如果子类有侧输出(SideOutput)的需求,重写此方法即可,此方法的入参是处理过的数据集,可以从这里取得侧输出;

子类决定CoProcessFunction的功能

  1. 子类CollectEveryOne.java如下所示,逻辑很简单,将每个源的上游数据直接输出到下游算子:

package com.bolingcavalry.coprocessfunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.functions.co.CoProcessFunction;import org.apache.flink.util.Collector;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class CollectEveryOne extends AbstractCoProcessFunctionExecutor {    private static final Logger logger = LoggerFactory.getLogger(CollectEveryOne.class);    @Override    protected CoProcessFunction, Tuple2, Tuple2> getCoProcessFunctionInstance() {        return new CoProcessFunction, Tuple2, Tuple2>() {            @Override            public void processElement1(Tuple2 value, Context ctx, Collector> out) {                logger.info("处理1号流的元素:{},", value);                out.collect(value);            }            @Override            public void processElement2(Tuple2 value, Context ctx, Collector> out) {                logger.info("处理2号流的元素:{}", value);                out.collect(value);            }        };    }    public static void main(String[] args) throws Exception {        new CollectEveryOne().execute();    }}
  1. 上述代码中,CoProcessFunction后面的泛型定义很长:, Tuple2, Tuple2> ,一共三个Tuple2,分别代表一号数据源输入、二号数据源输入、下游输出的类型;

验证

  1. 分别开启本机的99989999端口,我这里是MacBook,执行nc -l 9998nc -l 9999

  2. 启动Flink应用,如果您和我一样是Mac电脑,直接运行CollectEveryOne.main方法即可(如果是windows电脑,我这没试过,不过做成jar在线部署也是可以的);

  3. 在监听9998和9999端口的控制台分别输入aaa,111bbb,222

  4. 以下是flink控制台输出的内容,可见processElement1和processElement1方法的日志代码已经执行,并且print方法作为最下游,将两个数据源的数据都打印出来了,符合预期:

12:45:38,774 INFO CollectEveryOne - 处理1号流的元素:(aaa,111),(aaa,111)12:45:43,816 INFO CollectEveryOne - 处理2号流的元素:(bbb,222)(bbb,222)

看完上述内容,你们对Flink中CoProcessFunction如何使用有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

数据 处理 子类 端口 输出 两个 方法 实例 数据源 逻辑 监听 元素 关键 关键点 输入 内容 地址 源码 算子 项目 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 任丘市盘古网络技术有限公司 网络安全专科录取分数线 derby数据库语法 数据库系统概论第5版第五章 教学网络安全 税务专用网络安全助手 智能语音控制软件开发要多少钱 软件开发员工工作感谢信 北京有竹网络技术有限公司 大数据时代数据库主要是什么 数组怎么存储到数据库 sap连接外部数据库 网页找别的服务器 缓存服务器作用是什么 四川专业软件开发中心 利用网络安全模型可以构建02 怎么查看keil的数据库 数据库安装提示选择许可模式 电子电器架构室软件开发组 江阴定制软件开发内容 传奇服务端清除数据库 碧橙网络技术有限公司股东 基本数据库类型有哪些内容 软件开发驱动开发 软件开发科技企业账务处理 网络安全周宣传活动倡议书 安琪网络安全手抄报图片 创渠网络技术上海有限公司 临沂用友软件开发公司地址 试题保存选项怎么保存数据库
0