千家信息网

利用flink统计消息回复情况

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下public static void main(S
千家信息网最后更新 2025年01月24日利用flink统计消息回复情况

其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下

public static void main(String[] arg) throws Exception {        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.getConfig().enableSysoutLogging();//开启Sysout打日志        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置窗口的时间单位为process time        Properties props = new Properties();        props.put("bootstrap.servers", "kafkaip:9092");        props.put("group.id", "metric-group4");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //key 反序列化        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        props.put("auto.offset.reset", "earliest"); //value 反序列化        DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(                "im-message-topic3",  //kafka topic                new SimpleStringSchema(),  // String 序列化                props)).setParallelism(1);        DataStream bean3DataStream = dataStreamSource.map(new MapFunction() {                     @Override            public Message map(String value) throws Exception {                 logger.info("receive msg:"+value);                  JSONObject jsonObject =JSONObject.parseObject(value);                 Message s= new Message(                         jsonObject.getString("sessionId"),                         jsonObject.getString("fromUid"),                          jsonObject.getString("toUid"),                         jsonObject.getString("chatType"),                         jsonObject.getString("type"),                         jsonObject.getString("msgId"),                         jsonObject.getString("msg"),                         jsonObject.getLong("timestampSend")                         );                 return s;            }        });        //设置水印,并过滤数据        DataStream bean3DataStreamWithAssignTime =                 bean3DataStream.assignTimestampsAndWatermarks(new TruckTimestamp()).timeWindowAll(Time.seconds(30),Time.seconds(15)).apply(new AllWindowFunction() {                      @Override                    public void apply(TimeWindow window, Iterable values, Collector out)                            throws Exception {                        for (Message t: values) {                            logger.info("window start time:"+new Date(window.getStart()).toString());                            logger.info("real time:"+new Date(t.getTimestampSend()).toString());                            if(t.getTimestampSend() appendStream =tableEnv.toAppendStream(tb3, Row.class);//        appendStream.addSink(new Sink());    //对过滤后的数据,使用正则匹配数据        Table tb2 = tableEnv.sqlQuery(                "SELECT " +                        " * " +                        "FROM myTable" +                        " " +                        "MATCH_RECOGNIZE ( " +                        "PARTITION BY sessionId " +                        "ORDER BY rowtime " +                        "MEASURES " +                        "e2.timestampSend as answerTime, "+                        "LAST(e1.timestampSend) as customer_event_time, " +                        "e2.fromUid as empUid, " +                        "e1.timestampSend as askTime," +                                              "1 as total_talk " +                                  "ONE ROW PER MATCH " +                        "AFTER MATCH SKIP TO LAST e2 " +                        "PATTERN (e1+ e2+?) " +                        "DEFINE " +                        "e1 as e1.type = 'yonghu', " +                        "e2 as e2.type = 'guanjia' " +                        ")"+                        ""                );           DataStream appendStream2 =tableEnv.toAppendStream(tb2, Row.class);           appendStream2.addSink(new Sink2());           env.execute("msg v5");       }    public static class TruckTimestamp extends AscendingTimestampExtractor {        private static final long serialVersionUID = 1L;        @Override        public long extractAscendingTimestamp(Message element) {            return element.getTimestampSend();        }    }     public static class Sink implements SinkFunction {            /**         *          */        private static final long serialVersionUID = 1L;            @Override            public void invoke(Row value) throws Exception {                System.out.println(new Date().toString()+"orinal time:"+value.toString());            }        }     public static class Sink2 implements SinkFunction {            /**         *          */        private static final long serialVersionUID = 1L;            @Override            public void invoke(Row value) throws Exception {                System.out.println(new Date().toString()+"new time:"+value.toString());            }        }
0