利用flink统计消息回复情况
发表于:2025-02-24 作者:千家信息网编辑
千家信息网最后更新 2025年02月24日,其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下public static void main(S
千家信息网最后更新 2025年02月24日利用flink统计消息回复情况
其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下
public static void main(String[] arg) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableSysoutLogging();//开启Sysout打日志 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置窗口的时间单位为process time Properties props = new Properties(); props.put("bootstrap.servers", "kafkaip:9092"); props.put("group.id", "metric-group4"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); //value 反序列化 DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "im-message-topic3", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); DataStream bean3DataStream = dataStreamSource.map(new MapFunction() { @Override public Message map(String value) throws Exception { logger.info("receive msg:"+value); JSONObject jsonObject =JSONObject.parseObject(value); Message s= new Message( jsonObject.getString("sessionId"), jsonObject.getString("fromUid"), jsonObject.getString("toUid"), jsonObject.getString("chatType"), jsonObject.getString("type"), jsonObject.getString("msgId"), jsonObject.getString("msg"), jsonObject.getLong("timestampSend") ); return s; } }); //设置水印,并过滤数据 DataStream bean3DataStreamWithAssignTime = bean3DataStream.assignTimestampsAndWatermarks(new TruckTimestamp()).timeWindowAll(Time.seconds(30),Time.seconds(15)).apply(new AllWindowFunction() { @Override public void apply(TimeWindow window, Iterable values, Collector out) throws Exception { for (Message t: values) { logger.info("window start time:"+new Date(window.getStart()).toString()); logger.info("real time:"+new Date(t.getTimestampSend()).toString()); if(t.getTimestampSend() appendStream =tableEnv.toAppendStream(tb3, Row.class);// appendStream.addSink(new Sink()); //对过滤后的数据,使用正则匹配数据 Table tb2 = tableEnv.sqlQuery( "SELECT " + " * " + "FROM myTable" + " " + "MATCH_RECOGNIZE ( " + "PARTITION BY sessionId " + "ORDER BY rowtime " + "MEASURES " + "e2.timestampSend as answerTime, "+ "LAST(e1.timestampSend) as customer_event_time, " + "e2.fromUid as empUid, " + "e1.timestampSend as askTime," + "1 as total_talk " + "ONE ROW PER MATCH " + "AFTER MATCH SKIP TO LAST e2 " + "PATTERN (e1+ e2+?) " + "DEFINE " + "e1 as e1.type = 'yonghu', " + "e2 as e2.type = 'guanjia' " + ")"+ "" ); DataStream appendStream2 =tableEnv.toAppendStream(tb2, Row.class); appendStream2.addSink(new Sink2()); env.execute("msg v5"); } public static class TruckTimestamp extends AscendingTimestampExtractor { private static final long serialVersionUID = 1L; @Override public long extractAscendingTimestamp(Message element) { return element.getTimestampSend(); } } public static class Sink implements SinkFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"orinal time:"+value.toString()); } } public static class Sink2 implements SinkFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"new time:"+value.toString()); } }
数据
序列
三个
业务
代码
函数
单位
大小
实际
日志
时间
正则
水印
情况
消息
统计
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
创科机器视觉软件开发平台
信阳oa软件开发公司
云服务器优劣势
网络安全顶尖待遇
网络安全靠师生手抄报内容
玩家丢包是服务器原因吗
汕头自主可控软件开发销售厂
北京北斗授时服务器云主机
数据库外部图应该怎样建立
什么是软件开发特点
网络安全中的偏差
如何建成云服务器
幼儿网络安全主题活动总结
山西牛云网络技术有限公司
数据库中syn什么意思
市人社局网络安全工作
南昌小型软件开发公司有哪些
什么系统适合当服务器
泛微oa数据库怎么实现
马列思政的数据库
高效的app软件开发公司
服务器中标价
网络安全伴我成长宣传片视频
济南服务器数据恢复
制造业网络安全管理
重庆合川众道生鲜软件开发
医保数据库操作
ibm服务器进入管理界面
天津齐众软件开发有限公司
wow平衡的服务器