Flink Join怎么使用
发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,这篇文章主要讲解了"Flink Join怎么使用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Join怎么使用"吧!Join算子:两个数据
千家信息网最后更新 2025年01月31日Flink Join怎么使用
这篇文章主要讲解了"Flink Join怎么使用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink Join怎么使用"吧!
Join算子:两个数据流通过内部相同的key分区,将窗口内两个数据流相同key数据元素计算后,合并输出(类似于mysql表的inner join操作)
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
Join.java
package com.flink.examples.functions;import com.flink.examples.DataSource;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FlatJoinFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.util.Collector;import java.time.Duration;import java.util.Arrays;import java.util.List;/** * @Description Join算子:两个数据流通过内部相同的key分区,将窗口内两个数据流相同key数据元素计算后,合并输出(类似于mysql表的inner join操作) */public class Join { /** * Flink支持了两种Join:Window Join(窗口连接)和Interval Join(时间间隔连接),本示例演示的为Window Join * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/stream/operators/joining.html */ /** * 两个数据流集合,对相同key进行内联,分配到同一个窗口下,合并并打印 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// //watermark 自动添加水印调度时间// env.getConfig().setAutoWatermarkInterval(200); List> tuple3List1 = DataSource.getTuple3ToList(); List > tuple3List2 = Arrays.asList( new Tuple3<>("伍七", "girl", 18), new Tuple3<>("吴八", "man", 30) ); //Datastream 1 DataStream > dataStream1 = env.fromCollection(tuple3List1) //添加水印窗口,如果不添加,则时间窗口会一直等待水印事件时间,不会执行apply .assignTimestampsAndWatermarks(WatermarkStrategy. >forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner((element, timestamp)->System.currentTimeMillis())); //Datastream 2 DataStream > dataStream2 = env.fromCollection(tuple3List2) //添加水印窗口,如果不添加,则时间窗口会一直等待水印事件时间,不会执行apply .assignTimestampsAndWatermarks(WatermarkStrategy. >forBoundedOutOfOrderness(Duration.ofSeconds(2)) .withTimestampAssigner(new SerializableTimestampAssigner >() { @Override public long extractTimestamp(Tuple3 element, long timestamp) { return System.currentTimeMillis(); } })); //Datastream 3 DataStream newDataStream = dataStream1.join(dataStream2) .where(new KeySelector , String>() { @Override public String getKey(Tuple3 value) throws Exception { System.out.println("first name:" + value.f0 + ",sex:" + value.f1); return value.f1; } }) .equalTo(new KeySelector , String>() { @Override public String getKey(Tuple3 value) throws Exception { System.out.println("second name:" + value.f0 + ",sex:" + value.f1); return value.f1; } }) .window(TumblingEventTimeWindows.of(Time.seconds(1)) .apply(new FlatJoinFunction , Tuple3 , String>() { @Override public void join(Tuple3 first, Tuple3 second, Collector out) throws Exception { out.collect(first.f0 + "|" + first.f1 + "|" + first.f2 + "|" + second.f0 + "|" + second.f1 + "|" + second.f2); } }) ; newDataStream.print(); env.execute("flink Join job"); }}
打印结果
4> 李四|girl|24|伍七|girl|184> 刘六|girl|32|伍七|girl|184> 伍七|girl|18|伍七|girl|182> 张三|man|20|吴八|man|302> 王五|man|29|吴八|man|302> 吴八|man|30|吴八|man|30
感谢各位的阅读,以上就是"Flink Join怎么使用"的内容了,经过本文的学习后,相信大家对Flink Join怎么使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
数据
时间
相同
两个
数据流
水印
示例
学习
事件
元素
内容
环境
算子
输出
官方
就是
思路
情况
数据源
文档
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
这个看起来有点简单网络安全
北大青鸟计算机网络技术学校
恐龙岛服务器搭建平台
2网络安全黑板报幼儿园
软件开发Java参考文献
腾讯视频数据库什么意思
民歌数据库
ezcad2 数据库
路南区海航软件开发
怀旧服部落服务器战士练级天赋
云帮手服务器管理工具
台式电脑做服务器
网络安全领域的王者
科技大学互联网学校
从0开始 数据库
北极星网络技术有限公司
杨浦区参考网络技术咨询创新服务
深圳腾信互联网科技有限公司
互联网为什么不发展高科技
企业信用信息基础数据库暂行办法
mysql数据库上传图片
中国网络安全与信息化部网站
数据库查询数据属于什么方法
mirbase数据库怎么打开
怎么把数据库数据传入接口
北京中关村大数据库
数据中心服务器连接
肿瘤营养数据库
网络安全法第五章规定的是
深圳大道网络技术有限公司