千家信息网

Flink中Watermarks怎么用

发表于:2025-02-04 作者:千家信息网编辑
千家信息网最后更新 2025年02月04日,这篇文章将为大家详细讲解有关Flink中Watermarks怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。Watermarks水印:为输入的数据流的设置一个时
千家信息网最后更新 2025年02月04日Flink中Watermarks怎么用

这篇文章将为大家详细讲解有关Flink中Watermarks怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案

示例环境

java.version: 1.8.xflink.version: 1.11.1

TimestampsAndWatermarks.java

import com.flink.examples.DataSource;import org.apache.commons.lang3.time.DateFormatUtils;import org.apache.flink.api.common.eventtime.*;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.RichSourceFunction;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;import java.util.Date;import java.util.Iterator;import java.util.List;/** * @Description Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案 */public class TimestampsAndWatermarks {    /**     * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html     */    /**     * 遍历集合,分别打印不同性别的信息,对于执行超时,自动触发定时器     * @param args     * @throws Exception     */    public static void main(String[] args) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        /*        TimeCharacteristic有三种时间类型:            ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间;            IngestionTime:以数据进入flink streaming data flow的时间为准;            EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段;需要实现assignTimestampsAndWatermarks方法,并设置时间水位线;         */        //使用event time,需要指定事件的时间戳        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        //设置自动生成水印的时间周期,避免数据流量大的情况下,频繁添加水印导致计算性能降低。        env.getConfig().setAutoWatermarkInterval(1000L);        List> tuple3List = DataSource.getTuple3ToList();        DataStream> inStream = env.addSource(new MyRichSourceFunction());        DataStream> dataStream = inStream                //为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。                //Duration.ofSeconds(2),到数据流到达flink后,再水位线中设置延迟时间,也就是在所有数据流的最大的事件时间比window窗口结束时间大或相等时,再延迟多久触发window窗口结束;//                .assignTimestampsAndWatermarks(//                        WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(2))//                                .withTimestampAssigner((element, timestamp) -> {//                                    long times = System.currentTimeMillis() ;//                                    System.out.println(element.f1 + ","+ element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));//                                    return times;//                                })//                )                .assignTimestampsAndWatermarks(new MyWatermarkStrategy()                        .withTimestampAssigner(new SerializableTimestampAssigner>() {                            @Override                            public long extractTimestamp(Tuple3 element, long timestamp) {                                long times = System.currentTimeMillis();                                System.out.println(element.f1 + "," + element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));                                return times;                            }                        }))                //分区窗口                .keyBy((KeySelector, String>) k -> k.f1)                //触发3s滚动窗口                .window(TumblingEventTimeWindows.of(Time.seconds(3)))                //执行窗口数据,对keyBy数据流批量处理                .apply(new WindowFunction, Tuple2, String, TimeWindow>(){                    @Override                    public void apply(String s, TimeWindow window, Iterable> input, Collector> out) throws Exception {                        long times = System.currentTimeMillis() ;                        System.out.println();                        System.out.println("窗口处理时间:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));                        Iterator> iterator = input.iterator();                        int total = 0;                        int size = 0;                        String sex = "";                        while (iterator.hasNext()){                            Tuple3 tuple3 = iterator.next();                            total += tuple3.f2;                            size ++;                            sex = tuple3.f1;                        }                        out.collect(new Tuple2<>(sex, total / size));                    }                });        dataStream.print();        env.execute("flink Filter job");    }    /**     * 定期水印生成器     */    public static class MyWatermarkStrategy implements WatermarkStrategy>{        @Override        public WatermarkGenerator> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {            return new WatermarkGenerator>() {                //设置固定的延迟量3.5 seconds                private final long maxOutOfOrderness = 3500;                private long currentMaxTimestamp;                /**                 * 事件处理                 * @param event             数据流对象                 * @param eventTimestamp    事件水位线时间                 * @param output            输出                 */                @Override                public void onEvent(Tuple3 event, long eventTimestamp, WatermarkOutput output) {                    currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp);                }                @Override                public void onPeriodicEmit(WatermarkOutput output) {                    // 拿上一个水印时间 - 延迟量 = 等于给的窗口最终数据最后时间(如果在窗口到期内,未发生新的水印事件,则按window正常结束时间计算,当在最后水印时间-延迟量的时间范围内,有新的数据流进入,则会重新触发窗口内对全部数据流计算)                    output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));                }            };        }    }    /**     * 模拟数据持续输出     */    public static class MyRichSourceFunction extends RichSourceFunction> {        @Override        public void run(SourceContext> ctx) throws Exception {            List> tuple3List = DataSource.getTuple3ToList();            int j = 0;            for (int i=0;i<100;i++){                if (i%6 == 0){                    j=0;                }                ctx.collect(tuple3List.get(j));                //1秒钟输出一个                Thread.sleep(1 * 1000);                j ++;            }        }        @Override        public void cancel() {            try{                super.close();            }catch (Exception e){                e.printStackTrace();            }        }    }}

打印结果

man,张三的水位线为:2020-12-27 10:28:20girl,李四的水位线为:2020-12-27 10:28:21man,王五的水位线为:2020-12-27 10:28:22girl,刘六的水位线为:2020-12-27 10:28:23girl,伍七的水位线为:2020-12-27 10:28:24窗口处理时间:2020-12-27 10:28:25(man,20)man,吴八的水位线为:2020-12-27 10:28:25man,张三的水位线为:2020-12-27 10:28:26girl,李四的水位线为:2020-12-27 10:28:27窗口处理时间:2020-12-27 10:28:28(girl,28)窗口处理时间:2020-12-27 10:28:28(man,29)

关于"Flink中Watermarks怎么用"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

0