Flink的SessionWindow怎么用
发表于:2025-01-27 作者:千家信息网编辑
千家信息网最后更新 2025年01月27日,这篇文章主要讲解了"Flink的SessionWindow怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SessionWindow怎
千家信息网最后更新 2025年01月27日Flink的SessionWindow怎么用
这篇文章主要讲解了"Flink的SessionWindow怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SessionWindow怎么用"吧!
sessionWindows会话窗口:按不活跃时间切成不同分区窗口,并进行窗口计算
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
SessionWindow.java
import com.flink.examples.DataSource;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.util.List;/** * @Description sessionWindows会话窗口:按不活跃时间切成不同分区窗口,并进行窗口计算 */public class SessionWindow { /** * 遍历集合,返回会话滑动窗口下按不活跃时间切分后的,每个窗口下性别分区里最大年龄数据记录 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置流处理时间事件,对于会话窗口必需设置此时间类型,有三种类型: //1.ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间 //2.IngestionTime:以数据进入flink streaming data flow的时间为准 //3.EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(4); DataStream> inStream = env.addSource(new MyRichSourceFunction()); DataStream > dataStream = inStream.keyBy((KeySelector , String>) k ->k.f1) //按会话窗口滚动,当2秒之内没有指定分区数据流,则计算一次 //会话窗口是根据在指定时间之后没有活跃的数据接入,则认为窗口结束,进行窗口计算 .window(EventTimeSessionWindows.withGap(Time.seconds(2))) .reduce(new ReduceFunction >() { @Override public Tuple3 reduce(Tuple3 t1, Tuple3 t2) throws Exception { //返回年龄最大的 return t1.f2 > t2.f2 ? t1: t2; } }); dataStream.print(); env.execute("flink EventTimeSessionWindows job"); } /** * 模拟数据持续输出 */ public static class MyRichSourceFunction extends RichParallelSourceFunction > { @Override public void run(SourceContext > ctx) throws Exception { List > tuple3List = DataSource.getTuple3ToList(); for (Tuple3 tuple3 : tuple3List){ ctx.collect(tuple3); //1秒钟输出一个 Thread.sleep(2 * 1000); } } @Override public void cancel() { try{ super.close(); }catch (Exception e){ e.printStackTrace(); } } }}
打印结果
2> (张三,man,20)4> (李四,girl,24)2> (王五,man,29)4> (刘六,girl,32)2> (吴八,man,30)
感谢各位的阅读,以上就是"Flink的SessionWindow怎么用"的内容了,经过本文的学习后,相信大家对Flink的SessionWindow怎么用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
时间
数据
学习
不同
最大
内容
字段
年龄
环境
示例
类型
切成
处理
输出
事件
就是
应用程序
思路
性别
情况
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
Linux数据库改密码命令
安全通过什么方法采集数据库
r630服务器怎样测试内存
萤石云服务器怎么下载
eaccess数据库备份
网络安全法由哪一年开始生效
方舟怎么找开荒服务器
网课网络技术基础
服务器主机硬盘的作用
华为上海高斯数据库
达梦数据库批量数据处理技术
数据库表结构同步
徐汇区参考软件开发咨询热线
护苗网络安全主题黑板报
服务器防护墙软件
我国网络安全形式严峻
服务器应急预案和应急演练
mac 打印服务器
plex本地服务器
闵行区推广软件开发市面价
苹果虚拟服务器
数据库确定按钮叫什么
互联网科技公司的实习日记
钱有路 服务器错误
access数据库的选项卡
宁波定制软件开发app
对网络安全与生活的认识
购买金融数据库怎样入账
云服务器管理管控考核指标
软件开发复用度