千家信息网

Flink中ProcessFunction类如何使用

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章将为大家详细讲解有关Flink中ProcessFunction类如何使用,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。Flink处理函数实战系
千家信息网最后更新 2025年02月04日Flink中ProcessFunction类如何使用

这篇文章将为大家详细讲解有关Flink中ProcessFunction类如何使用,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

Flink处理函数实战系列链接

  1. 深入了解ProcessFunction的状态操作(Flink-1.10);

  2. ProcessFunction;

  3. KeyedProcessFunction类;

  4. ProcessAllWindowFunction(窗口处理);

  5. CoProcessFunction(双流处理);

关于处理函数(Process Function)

如下图,在常规的业务开发中,SQL、Table API、DataStream API比较常用,处于Low-level的Porcession相对用得较少,从本章开始,我们一起通过实战来熟悉处理函数(Process Function),看看这一系列的低级算子可以带给我们哪些能力?

关于ProcessFunction类

处理函数有很多种,最基础的应该ProcessFunction类,来看看它的类图,可见有RichFunction的特性open、close,然后自己有两个重要的方法processElement和onTimer: 常用特性如下所示:

  1. 处理单个元素;

  2. 访问时间戳;

  3. 旁路输出;

接下来写两个应用体验上述功能;

版本信息

  1. 开发环境操作系统:MacBook Pro 13寸, macOS Catalina 10.15.3

  2. 开发工具:IDEA ULTIMATE 2018.3

  3. JDK:1.8.0_211

  4. Maven:3.6.0

  5. Flink:1.9.2

源码下载

如果您不想写代码,整个系列的源码可在GitHub下载到,地址和链接信息如下表所示(https://github.com/zq2599/blog_demos):

名称链接备注
项目主页https://github.com/zq2599/blog_demos该项目在GitHub上的主页
git仓库地址(https)https://github.com/zq2599/blog_demos.git该项目源码的仓库地址,https协议
git仓库地址(ssh)git@github.com:zq2599/blog_demos.git该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,本章的应用在flinkstudy文件夹下,如下图红框所示:

创建工程

执行以下命令创建一个flink-1.9.2的应用工程:

mvn \archetype:generate \-DarchetypeGroupId=org.apache.flink \-DarchetypeArtifactId=flink-quickstart-java \-DarchetypeVersion=1.9.2

按提示输入groupId:com.bolingcavalry,architectid:flinkdemo

第一个demo

第一个demo用来体验以下两个特性:

  1. 处理单个元素;

  2. 访问时间戳;

创建Simple.java,内容如下:

package com.bolingcavalry.processfunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.util.Collector;public class Simple {    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        // 并行度为1        env.setParallelism(1);        // 设置数据源,一共三个元素        DataStream> dataStream = env.addSource(new SourceFunction>() {            @Override            public void run(SourceContext> ctx) throws Exception {                for(int i=1; i<4; i++) {                    String name = "name" + i;                    Integer value = i;                    long timeStamp = System.currentTimeMillis();                    // 将将数据和时间戳打印出来,用来验证数据                    System.out.println(String.format("source,%s, %d, %d\n",                            name,                            value,                            timeStamp));                    // 发射一个元素,并且戴上了时间戳                    ctx.collectWithTimestamp(new Tuple2(name, value), timeStamp);                    // 为了让每个元素的时间戳不一样,每发射一次就延时10毫秒                    Thread.sleep(10);                }            }            @Override            public void cancel() {            }        });        // 过滤值为奇数的元素        SingleOutputStreamOperator mainDataStream = dataStream                .process(new ProcessFunction, String>() {                    @Override                    public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {                        // f1字段为奇数的元素不会进入下一个算子                        if(0 == value.f1 % 2) {                            out.collect(String.format("processElement,%s, %d, %d\n",                                    value.f0,                                    value.f1,                                    ctx.timestamp()));                        }                    }                });        // 打印结果,证明每个元素的timestamp确实可以在ProcessFunction中取得        mainDataStream.print();        env.execute("processfunction demo : simple");    }}

这里对上述代码做个介绍:

  1. 创建一个数据源,每个10毫秒发出一个元素,一共三个,类型是Tuple2,f0是个字符串,f1是整形,每个元素都带时间戳;

  2. 数据源发出元素时,提前把元素的f0、f1、时间戳打印出来,和后面的数据核对是否一致;

  3. 在后面的处理中,创建了ProcessFunction的匿名子类,里面可以处理上游发来的每个元素,并且还能取得每个元素的时间戳(这个能力很重要),然后将f1字段为奇数的元素过滤掉;

  4. 最后将ProcessFunction处理过的数据打印出来,验证处理结果是否符合预期;

直接执行Simple类,结果如下,可见过滤和提取时间戳都成功了:

第二个demo

第二个demo是实现旁路输出(Side Outputs),对于一个DataStream来说,可以通过旁路输出将数据输出到其他算子中去,而不影响原有的算子的处理,下面来演示旁路输出:

创建SideOutput类:

package com.bolingcavalry.processfunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.util.Collector;import org.apache.flink.util.OutputTag;import java.util.ArrayList;import java.util.List;public class SideOutput {    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 并行度为1        env.setParallelism(1);        // 定义OutputTag        final OutputTag outputTag = new OutputTag("side-output"){};        // 创建一个List,里面有两个Tuple2元素        List> list = new ArrayList<>();        list.add(new Tuple2("aaa", 1));        list.add(new Tuple2("bbb", 2));        list.add(new Tuple2("ccc", 3));        //通过List创建DataStream        DataStream> fromCollectionDataStream = env.fromCollection(list);        //所有元素都进入mainDataStream,f1字段为奇数的元素进入SideOutput        SingleOutputStreamOperator mainDataStream = fromCollectionDataStream                .process(new ProcessFunction, String>() {                    @Override                    public void processElement(Tuple2 value, Context ctx, Collector out) throws Exception {                        //进入主流程的下一个算子                        out.collect("main, name : " + value.f0 + ", value : " + value.f1);                        //f1字段为奇数的元素进入SideOutput                        if(1 == value.f1 % 2) {                            ctx.output(outputTag, "side, name : " + value.f0 + ", value : " + value.f1);                        }                    }                });        // 禁止chanin,这样可以在页面上看清楚原始的DAG        mainDataStream.disableChaining();        // 取得旁路数据        DataStream sideDataStream = mainDataStream.getSideOutput(outputTag);        mainDataStream.print();        sideDataStream.print();        env.execute("processfunction demo : sideoutput");    }}

这里对上述代码做个介绍:

  1. 数据源是个集合,类型是Tuple2,f0字段是字符串,f1字段是整形;

  2. ProcessFunction的匿名子类中,将每个元素的f0和f1拼接成字符串,发给主流程算子,再将f1字段为奇数的元素发到旁路输出;

  3. 数据源发出元素时,提前把元素的f0、f1、时间戳打印出来,和后面的数据核对是否一致;

  4. 将主流程和旁路输出的元素都打印出来,验证处理结果是否符合预期;

执行SideOutput看结果,如下图,main前缀的都是主流程算子,一共三条记录,side前缀的是旁路输出,只有f1字段为奇数的两条记录,符合预期: 上面的操作都是在IDEA上执行的,还可以将flink单独部署,再将上述工程构建成jar,提交到flink的jobmanager,可见DAG如下:

至此,处理函数中最简单的ProcessFunction类的学习

关于Flink中ProcessFunction类如何使用就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

元素 处理 数据 时间 字段 旁路 输出 奇数 算子 函数 地址 数据源 结果 项目 两个 主流 主流程 仓库 源码 代码 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 MySQL数据库表没函数计算 在广东网络技术能不能考二建 软件开发行业新手入门 用了云数据库还需要DBS java程序多服务器部署 商户服务器证书下载安装 网络安全事故预防 长春网络安全支队 战地4哪些服务器不会被炸服 山西聚搜网络技术有限公司 常用的解决网络安全技术有哪些 14.7苹果连接服务器出现问题 保交所软件开发 网络安全事关国家 栖霞区网络软件开发质量保证 学习数据库都需要学习什么 微信二维码下载找不到服务器 贸易公司风险数据库 软件开发的阶段划分 绿盟科技大学网络安全研讨会 网络安全知识有奖竞答答案 ASP从数据库读取图片滚动 java程序多服务器部署 数据库安全技术平时测验题1 以30开头的数据库查询 塔城网络服务器维保公司 湖南软件开发首推岚鸿 河东软件开发培训教学哪家好 白银网络安全工程师岗位职责 数据库为什么不能分离
0