千家信息网

Flink的TimeWindowAll如何使用

发表于:2025-02-09 作者:千家信息网编辑
千家信息网最后更新 2025年02月09日,本篇内容主要讲解"Flink的TimeWindowAll如何使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink的TimeWindowAll如何使
千家信息网最后更新 2025年02月09日Flink的TimeWindowAll如何使用

本篇内容主要讲解"Flink的TimeWindowAll如何使用",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Flink的TimeWindowAll如何使用"吧!

timeWindow时间窗口(滑动窗口【滑动窗口与滚动窗口的区别,在于滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠】)

示例环境

java.version: 1.8.xflink.version: 1.11.1

示例数据源 (项目码云下载)

Flink 系例 之 搭建开发环境与数据

TimeWindow.java

import com.flink.examples.DataSource;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.windowing.time.Time;import java.util.List;/** * @Description timeWindow时间窗口(滑动窗口【滑动窗口与滚动窗口的区别,在于滑动窗口会有数据元素重叠可能,而滚动窗口不存在元素重叠】) */public class TimeWindow {    /**     * 遍历集合,返回指定时间滑动窗口下每个性别分区里最大年龄数据记录     * @param args     * @throws Exception     */    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //env.setParallelism(1);        DataStream> inStream = env.addSource(new MyRichSourceFunction());        DataStream> dataStream = inStream                .keyBy((KeySelector, String>) k ->k.f1)                //按时间窗口滑动,每3秒为一个时间窗口,并每次滑动2秒(简单来说:每隔2秒对前3秒内的输入数据流),计算一次                .timeWindow(Time.seconds(3), Time.seconds(2))                //注意:计算变量为f2                .maxBy(2);        dataStream.print();        env.execute("flink TimeWindow job");    }    /**     * 模拟数据持续输出     */    public static class MyRichSourceFunction extends RichSourceFunction> {        @Override        public void run(SourceContext> ctx) throws Exception {            List> tuple3List = DataSource.getTuple3ToList();            for (Tuple3 tuple3 : tuple3List){                ctx.collect(tuple3);                //1秒钟输出一个                Thread.sleep(1 * 1000);            }        }        @Override        public void cancel() {            try{                super.close();            }catch (Exception e){                e.printStackTrace();            }        }    }}

打印结果

3> (张三,1,20)4> (李四,2,24)3> (王五,1,29)3> (王五,1,29)4> (刘六,2,32)

到此,相信大家对"Flink的TimeWindowAll如何使用"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0