Flink的SessionWindow怎么用
发表于:2024-11-22 作者:千家信息网编辑
千家信息网最后更新 2024年11月22日,这篇文章主要讲解了"Flink的SessionWindow怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SessionWindow怎
千家信息网最后更新 2024年11月22日Flink的SessionWindow怎么用
这篇文章主要讲解了"Flink的SessionWindow怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SessionWindow怎么用"吧!
sessionWindows会话窗口:按不活跃时间切成不同分区窗口,并进行窗口计算
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
SessionWindow.java
import com.flink.examples.DataSource;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.util.List;/** * @Description sessionWindows会话窗口:按不活跃时间切成不同分区窗口,并进行窗口计算 */public class SessionWindow { /** * 遍历集合,返回会话滑动窗口下按不活跃时间切分后的,每个窗口下性别分区里最大年龄数据记录 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置流处理时间事件,对于会话窗口必需设置此时间类型,有三种类型: //1.ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间 //2.IngestionTime:以数据进入flink streaming data flow的时间为准 //3.EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(4); DataStream> inStream = env.addSource(new MyRichSourceFunction()); DataStream > dataStream = inStream.keyBy((KeySelector , String>) k ->k.f1) //按会话窗口滚动,当2秒之内没有指定分区数据流,则计算一次 //会话窗口是根据在指定时间之后没有活跃的数据接入,则认为窗口结束,进行窗口计算 .window(EventTimeSessionWindows.withGap(Time.seconds(2))) .reduce(new ReduceFunction >() { @Override public Tuple3 reduce(Tuple3 t1, Tuple3 t2) throws Exception { //返回年龄最大的 return t1.f2 > t2.f2 ? t1: t2; } }); dataStream.print(); env.execute("flink EventTimeSessionWindows job"); } /** * 模拟数据持续输出 */ public static class MyRichSourceFunction extends RichParallelSourceFunction > { @Override public void run(SourceContext > ctx) throws Exception { List > tuple3List = DataSource.getTuple3ToList(); for (Tuple3 tuple3 : tuple3List){ ctx.collect(tuple3); //1秒钟输出一个 Thread.sleep(2 * 1000); } } @Override public void cancel() { try{ super.close(); }catch (Exception e){ e.printStackTrace(); } } }}
打印结果
2> (张三,man,20)4> (李四,girl,24)2> (王五,man,29)4> (刘六,girl,32)2> (吴八,man,30)
感谢各位的阅读,以上就是"Flink的SessionWindow怎么用"的内容了,经过本文的学习后,相信大家对Flink的SessionWindow怎么用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
时间
数据
学习
不同
最大
内容
字段
年龄
环境
示例
类型
切成
处理
输出
事件
就是
应用程序
思路
性别
情况
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发专业初中
怎么看oracle数据库有问题
数据库的共享解决方案
小区行政归属数据库
生僻字无法存入数据库
智慧新闻网络技术有限公司
深圳服务器系统集成服务方案费用
宜章县java数据库开发
华为网络技术工程师招聘人数
电脑咋退出网络安全模式
数据库选型对业务的影响
网络技术相关的职业
软件开发要求规定
地形数据库dlg分层
计算机技术与网络技术的区别
仙侠游戏服务器修改级数
5g网络技术是哪国研发的
以太网的网络技术
阿里云服务器未识别的网络
江苏新华网络技术有限公司
现在信息网络技术
地球 服务器地址
音频在数据库中的类型
宝山区项目数据库服务清单
网络技术服务费用支出表
网络安全手抄报模板一等奖
dns缓存服务器是什么设备
计算机网络技术基础教材知识点
5g网络技术是哪国研发的
江苏新华网络技术有限公司