Apache Flink 官方文档--流(DataStream API)-旁路输出
发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,旁路输出(side output) 除了来自数据流算子的主流结果输出之外,可以产生任意数量的流旁路输出结果。旁路输出结果数据类型与主流结果的数据类型以及其他旁路输出结果数据类型可以是完全不同的。当你
千家信息网最后更新 2025年01月25日Apache Flink 官方文档--流(DataStream API)-旁路输出
旁路输出(side output)
除了来自数据流算子的主流结果输出之外,可以产生任意数量的流旁路输出结果。旁路输出结果数据类型与主流结果的数据类型以及其他旁路输出结果数据类型可以是完全不同的。当你需要分割数据流时,这个算子非常有用。通常需要复制流,然后从每个数据流中过滤掉不需要的数据。
当使用旁路输出时,首先需要定义一个OutputTag来标识一个旁路输出流。
Java
// this needs to be an anonymous inner class, so that we can analyze the typeOutputTag outputTag = new OutputTag("side-output") {};
Scala
val outputTag = OutputTag[String]("side-output")
注意OutputTag是如何根据旁路输出流包含的元素类型typed的。
可以通过以下函数发射数据到旁路输出。
- ProcessFunction
- CoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
可以使用Context参数(在上述函数中向用户暴露)将数据发送到OutputTag标识的旁路输出。以下是从ProcessFunction发出旁路输出数据的示例:
Java:
DataStream input = ...;final OutputTag outputTag = new OutputTag("side-output"){};SingleOutputStreamOperator mainDataStream = input .process(new ProcessFunction() { @Override public void processElement( Integer value, Context ctx, Collector out) throws Exception { // emit data to regular output out.collect(value); // emit data to side output ctx.output(outputTag, "sideout-" + String.valueOf(value)); } });
Scala:
val input: DataStream[Int] = ...val outputTag = OutputTag[String]("side-output")val mainDataStream = input .process(new ProcessFunction[Int, Int] { override def processElement( value: Int, ctx: ProcessFunction[Int, Int]#Context, out: Collector[Int]): Unit = { // emit data to regular output out.collect(value) // emit data to side output ctx.output(outputTag, "sideout-" + String.valueOf(value)) } })
要读取旁路输出流,在数据流运算后使用getSideOutput(OutputTag)。此时将会获得键入旁路输出流的结果。
Java:
final OutputTag outputTag = new OutputTag("side-output"){};SingleOutputStreamOperator mainDataStream = ...;DataStream sideOutputStream = mainDataStream.getSideOutput(outputTag);
Scala:
val outputTag = OutputTag[String]("side-output")val mainDataStream = ...val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)
输出
旁路
数据
结果
数据流
类型
主流
函数
标识
算子
不同
元素
参数
可以通过
数量
有用
用户
示例
发射
运算
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
在局域网内部署海康流媒体服务器
余姚嵌入式软件开发外包
云服务器建站
广西数据网络技术分类代理商
泰州智阳网络技术
软件开发岗位哪个好
软件开发质量管理制度
串口服务器网线
深圳市永图网络技术有限公司
gpu 服务器 搭建
怎么让软件开发的版权是你
中企通信网络技术有限公司
如何配置深度学习服务器
软件开发成本 会计科目
idc服务器工厂
网络安全周2021年河北
美团商家服务器错误
免费网站网络安全
国服大服务器
小程序如何关闭数据库
云数据库与分布式数据库
互联网时代科技赋能
湖州提供网络技术有哪些
网络安全网络讲座观后感
云服务器相对传统服务器优势
oauth 2.0 服务器搭建
南京网络安全方案
表空间数据库自动增量值
服务器配置推荐怎么看
介绍软件开发项目