Flink的SessionWindow怎么用
发表于:2024-10-15 作者:千家信息网编辑
千家信息网最后更新 2024年10月15日,这篇文章主要讲解了"Flink的SessionWindow怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SessionWindow怎
千家信息网最后更新 2024年10月15日Flink的SessionWindow怎么用
这篇文章主要讲解了"Flink的SessionWindow怎么用",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"Flink的SessionWindow怎么用"吧!
sessionWindows会话窗口:按不活跃时间切成不同分区窗口,并进行窗口计算
示例环境
java.version: 1.8.xflink.version: 1.11.1
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
SessionWindow.java
import com.flink.examples.DataSource;import org.apache.flink.api.common.functions.ReduceFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;import org.apache.flink.streaming.api.windowing.time.Time;import java.util.List;/** * @Description sessionWindows会话窗口:按不活跃时间切成不同分区窗口,并进行窗口计算 */public class SessionWindow { /** * 遍历集合,返回会话滑动窗口下按不活跃时间切分后的,每个窗口下性别分区里最大年龄数据记录 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置流处理时间事件,对于会话窗口必需设置此时间类型,有三种类型: //1.ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间 //2.IngestionTime:以数据进入flink streaming data flow的时间为准 //3.EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(4); DataStream> inStream = env.addSource(new MyRichSourceFunction()); DataStream > dataStream = inStream.keyBy((KeySelector , String>) k ->k.f1) //按会话窗口滚动,当2秒之内没有指定分区数据流,则计算一次 //会话窗口是根据在指定时间之后没有活跃的数据接入,则认为窗口结束,进行窗口计算 .window(EventTimeSessionWindows.withGap(Time.seconds(2))) .reduce(new ReduceFunction >() { @Override public Tuple3 reduce(Tuple3 t1, Tuple3 t2) throws Exception { //返回年龄最大的 return t1.f2 > t2.f2 ? t1: t2; } }); dataStream.print(); env.execute("flink EventTimeSessionWindows job"); } /** * 模拟数据持续输出 */ public static class MyRichSourceFunction extends RichParallelSourceFunction > { @Override public void run(SourceContext > ctx) throws Exception { List > tuple3List = DataSource.getTuple3ToList(); for (Tuple3 tuple3 : tuple3List){ ctx.collect(tuple3); //1秒钟输出一个 Thread.sleep(2 * 1000); } } @Override public void cancel() { try{ super.close(); }catch (Exception e){ e.printStackTrace(); } } }}
打印结果
2> (张三,man,20)4> (李四,girl,24)2> (王五,man,29)4> (刘六,girl,32)2> (吴八,man,30)
感谢各位的阅读,以上就是"Flink的SessionWindow怎么用"的内容了,经过本文的学习后,相信大家对Flink的SessionWindow怎么用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
时间
数据
学习
不同
最大
内容
字段
年龄
环境
示例
类型
切成
处理
输出
事件
就是
应用程序
思路
性别
情况
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发考官面试问题
谷歌服务器可以升级吗
请简述软件开发文档
软件开发的研究方案模板
软件开发小组 管理 工具包
web服务器控件有哪些
广东计算机网络技术中专学校
湖州软件开发费用
认知神经网络技术应用
网络安全快速入行
成都c语言软件开发要多少钱
浪潮服务器管理台
瑶海区数据网络技术开发常见问题
刘强东视频软件开发
河南兆创网络技术有限公司
ios 更换数组数据库
临泝巨久网络技术
国外免费ftp服务器
什么牌子服务器笔记本好
下列关于关系数据库的说法
常州网络安全知识竞赛答案
长春燃气网络安全
服务器错误代码7是什么意思
淘米链商网络技术有限公司
技校学软件开发要求
CORBA软件开发
设计连接数据库方法
网络安全顶级研究机构
如何查看域名服务器
网络安全手抄报最简单的儿童版