利用flink统计消息回复情况
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下public static void main(S
千家信息网最后更新 2025年01月24日利用flink统计消息回复情况
其中用到了滑动窗口函数大小30秒,间隔15秒,且大于窗口10秒的数据,被丢弃。(实际业务这三个值 应为是 10 分钟,1分钟,5分钟)。代码先记录一下
public static void main(String[] arg) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableSysoutLogging();//开启Sysout打日志 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //设置窗口的时间单位为process time Properties props = new Properties(); props.put("bootstrap.servers", "kafkaip:9092"); props.put("group.id", "metric-group4"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); //value 反序列化 DataStreamSource dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "im-message-topic3", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); DataStream bean3DataStream = dataStreamSource.map(new MapFunction() { @Override public Message map(String value) throws Exception { logger.info("receive msg:"+value); JSONObject jsonObject =JSONObject.parseObject(value); Message s= new Message( jsonObject.getString("sessionId"), jsonObject.getString("fromUid"), jsonObject.getString("toUid"), jsonObject.getString("chatType"), jsonObject.getString("type"), jsonObject.getString("msgId"), jsonObject.getString("msg"), jsonObject.getLong("timestampSend") ); return s; } }); //设置水印,并过滤数据 DataStream bean3DataStreamWithAssignTime = bean3DataStream.assignTimestampsAndWatermarks(new TruckTimestamp()).timeWindowAll(Time.seconds(30),Time.seconds(15)).apply(new AllWindowFunction() { @Override public void apply(TimeWindow window, Iterable values, Collector out) throws Exception { for (Message t: values) { logger.info("window start time:"+new Date(window.getStart()).toString()); logger.info("real time:"+new Date(t.getTimestampSend()).toString()); if(t.getTimestampSend() appendStream =tableEnv.toAppendStream(tb3, Row.class);// appendStream.addSink(new Sink()); //对过滤后的数据,使用正则匹配数据 Table tb2 = tableEnv.sqlQuery( "SELECT " + " * " + "FROM myTable" + " " + "MATCH_RECOGNIZE ( " + "PARTITION BY sessionId " + "ORDER BY rowtime " + "MEASURES " + "e2.timestampSend as answerTime, "+ "LAST(e1.timestampSend) as customer_event_time, " + "e2.fromUid as empUid, " + "e1.timestampSend as askTime," + "1 as total_talk " + "ONE ROW PER MATCH " + "AFTER MATCH SKIP TO LAST e2 " + "PATTERN (e1+ e2+?) " + "DEFINE " + "e1 as e1.type = 'yonghu', " + "e2 as e2.type = 'guanjia' " + ")"+ "" ); DataStream appendStream2 =tableEnv.toAppendStream(tb2, Row.class); appendStream2.addSink(new Sink2()); env.execute("msg v5"); } public static class TruckTimestamp extends AscendingTimestampExtractor { private static final long serialVersionUID = 1L; @Override public long extractAscendingTimestamp(Message element) { return element.getTimestampSend(); } } public static class Sink implements SinkFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"orinal time:"+value.toString()); } } public static class Sink2 implements SinkFunction { /** * */ private static final long serialVersionUID = 1L; @Override public void invoke(Row value) throws Exception { System.out.println(new Date().toString()+"new time:"+value.toString()); } }
数据
序列
三个
业务
代码
函数
单位
大小
实际
日志
时间
正则
水印
情况
消息
统计
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
湖北项目软件开发哪家专业
对于服务器的要求
c 数据库 类库
高清视频录播服务器器暖
宜兴进口软件开发专业服务
计算机网络技术课题研究
网络安全大学排名榜
msmq服务器找不到
尚米网络技术是正规吗
西藏网络安全法律
我的世界java版怎样开服务器
浙江云图网络技术
4u服务器一般多少钱
网络安全方面的黑板报
我的世界服务器权限组管理员
如何查询网络安全成绩
2019河南网络安全宣传周
梦想飞翔众筹服务器
关于网络安全宣传短信视频
计算机网络安全实际操作
数据库oracle培训
是数据库or
宜兴进口软件开发专业服务
银川新华互联网科技学校图片
服务器JODB配置
无线网络安全实现技术
长沙帝国网络技术
fz服务器下载
兴创服务器是什么意思
app软件开发选择云趣科技