千家信息网

Flink SideOutput怎么使用

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,这篇文章主要介绍"Flink SideOutput怎么使用",在日常操作中,相信很多人在Flink SideOutput怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家
千家信息网最后更新 2025年02月06日Flink SideOutput怎么使用

这篇文章主要介绍"Flink SideOutput怎么使用",在日常操作中,相信很多人在Flink SideOutput怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flink SideOutput怎么使用"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

SideOutput方法:(侧输出)从主数据流中根据outputTag获取额外的输出流(分流场景下使用)

示例环境

java.version: 1.8.xflink.version: 1.11.1

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

SideOutput.java

import com.flink.examples.DataSource;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.api.java.tuple.Tuple4;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.util.Collector;import org.apache.flink.util.OutputTag;import java.util.List;/** * @Description SideOutput方法:(侧输出)从主数据流中根据outputTag获取额外的输出流(分流场景下使用) */public class SideOutput {    /**     * 遍历集合,将数据流切分成多个流并打印     * @param args     * @throws Exception     */    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        List> tuple3List = DataSource.getTuple3ToList();        //Datastream        DataStream> dataStream = env.fromCollection(tuple3List);        //自定义man和girl两个性别tag        OutputTag> manTag = new OutputTag>("man"){};        OutputTag> girlTag = new OutputTag>("girl"){};        //将所有流数据输入到process做处理        SingleOutputStreamOperator> output = dataStream.process(new ProcessFunction, Tuple4>() {            @Override            public void processElement(Tuple3 value, Context ctx, Collector> out) throws Exception {                //将数据流按不同的性别划分,创建新的Tuple4,分别绑定到不同性别的tag                if (value.f1.equals("man")){                    ctx.output(manTag, Tuple4.of(value.f0, value.f1, value.f2, "男"));                }else {                    ctx.output(girlTag, Tuple4.of(value.f0, value.f1, value.f2, "女"));                }            }        });        //获取指定tag的数据流        DataStream> dataStream1 = output.getSideOutput(manTag);        DataStream> dataStream2 = output.getSideOutput(girlTag);        //打印        dataStream1.print();        dataStream2.print();        env.execute("flink Split job");    }}

打印结果

(张三,man,20,男)(李四,girl,24,女)(王五,man,29,男)(刘六,girl,32,女)(伍七,girl,18,女)(吴八,man,30,男)

到此,关于"Flink SideOutput怎么使用"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

数据 数据流 学习 输出 性别 方法 不同 场景 更多 环境 示例 帮助 实用 接下来 两个 多个 数据源 文章 理论 知识 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 缓存 数据库 服务器里的图片删除后去了哪里 数据库驱动框架 江苏电子网络技术规定 国家网络安全宣传开展情况 赴日软件开发课程 青年网络安全观后感150字 世界网络安全测试 广州伯纳斯互联网科技有限公司 华为安卓软件开发工程 流媒体服务器故障导致录像丢失 服务器连接交换机几条线 第9章 计算机网络安全 电子商务的网络技术历年试卷 福州学习软件开发 sql数据库重命名问题 中经网软件开发公司电话 主属性数据库怎么设置 app如何知道服务器地址 百遍不如打一遍网络安全演练 无尽的拉格朗日重选服务器 合肥安博网络技术有限公司 软件开发新报价图片 网络安全设计走线方法 部队网络安全涉密项目流程 数据库中的数据表怎么写 安装数据库 提示无法写入 网络安全和大数据管理局 方舟手游服务器招人送镰刀龙 黄埔靠谱网络安全建设
0