Flink的CoGroup如何使用
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇文章主要介绍"Flink的CoGroup如何使用",在日常操作中,相信很多人在Flink的CoGroup如何使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Fli
千家信息网最后更新 2025年02月03日Flink的CoGroup如何使用
这篇文章主要介绍"Flink的CoGroup如何使用",在日常操作中,相信很多人在Flink的CoGroup如何使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Flink的CoGroup如何使用"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
CoGroup算子:将两个数据流按照key进行group分组,并将数据流按key进行分区的处理,最终合成一个数据流(与join有区别,不管key有没有关联上,最终都会合并成一个数据流)
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
CoGroup.java
package com.flink.examples.functions;import com.flink.examples.DataSource;import com.google.gson.Gson;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.CoGroupFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;import java.time.Duration;import java.util.Arrays;import java.util.List;/** * @Description CoGroup算子:将两个数据流按照key进行group分组,并将数据流按key进行分区的处理,最终合成一个数据流(与join有区别,不管key有没有关联上,最终都会合并成一个数据流) */public class CoGroup { /** * 两个数据流集合,对相同key进行内联,分配到同一个窗口下,合并并打印 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //watermark 自动添加水印调度时间 //env.getConfig().setAutoWatermarkInterval(200); List> tuple3List1 = DataSource.getTuple3ToList(); List > tuple3List2 = Arrays.asList( new Tuple3<>("伍七", "girl", 18), new Tuple3<>("吴八", "man", 30) ); //Datastream 1 DataStream > dataStream1 = env.fromCollection(tuple3List1) //添加水印窗口,如果不添加,则时间窗口会一直等待水印事件时间,不会执行apply .assignTimestampsAndWatermarks(WatermarkStrategy . >forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((element, timestamp) -> System.currentTimeMillis())); //Datastream 2 DataStream > dataStream2 = env.fromCollection(tuple3List2) //添加水印窗口,如果不添加,则时间窗口会一直等待水印事件时间,不会执行apply .assignTimestampsAndWatermarks(WatermarkStrategy . >forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner >() { @Override public long extractTimestamp(Tuple3 element, long timestamp) { return System.currentTimeMillis(); } }) ); //对dataStream1和dataStream2两个数据流进行关联,没有关联也保留 //Datastream 3 DataStream newDataStream = dataStream1.coGroup(dataStream2) .where(new KeySelector , String>() { @Override public String getKey(Tuple3 value) throws Exception { return value.f1; } }) .equalTo(t3->t3.f1) .window(TumblingEventTimeWindows.of(Time.seconds(1))) .apply(new CoGroupFunction , Tuple3 , String>() { @Override public void coGroup(Iterable > first, Iterable > second, Collector out) throws Exception { StringBuilder sb = new StringBuilder(); Gson gson = new Gson(); //datastream1的数据流集合 for (Tuple3 tuple3 : first) { sb.append(gson.toJson(tuple3)).append("\n"); } //datastream2的数据流集合 for (Tuple3 tuple3 : second) { sb.append(gson.toJson(tuple3)).append("\n"); } out.collect(sb.toString()); } }); newDataStream.print(); env.execute("flink CoGroup job"); }}
打印结果
{"f0":"张三","f1":"man","f2":20}{"f0":"王五","f1":"man","f2":29}{"f0":"吴八","f1":"man","f2":30}{"f0":"吴八","f1":"man","f2":30}{"f0":"李四","f1":"girl","f2":24}{"f0":"刘六","f1":"girl","f2":32}{"f0":"伍七","f1":"girl","f2":18}{"f0":"伍七","f1":"girl","f2":18}
到此,关于"Flink的CoGroup如何使用"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!
数据
数据流
时间
水印
两个
关联
学习
事件
更多
环境
示例
算子
并将
并成
分组
处理
帮助
实用
相同
接下来
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全的表述正确的是什么
学django之前要学数据库么
mac系统搭建数据库
按需服务器
网络安全登记测评评分标准
女孩子学的网络技术
博雅数据库南京中医药大学
医院网络安全工作方案实施办法
服务器测试报错解决办法
筋斗云物流数据库设计
奉贤区品牌软件开发定制价格
易云游网络技术有限公司
一年级网络安全保护视力绘画作品
服务器能换轮胎吗
plc生成文件导入数据库
晶体生长过程数据服务器软件开发
在正确树立网络安全
网络安全六项能力
疫情信息网络技术
税网络安全自查报告
线上数据库死锁怎么解决
网络安全登记测评评分标准
好用的后端应用服务器框架
sntp时间服务器
绝地求生轻量版的服务器在哪
银行业网络安全宣传活动总结
服务器恢复后台管理
一百秒漫谈新思想之网络安全
娄底软件开发哪家专业
香港vpn服务器租用