flink sql-clent MATCH_RECOGNIZE kafka 例子
发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,环境 flink1.7.2增加flink1.7.2 的lib 中的jar, 否则会报类找不到avro-1.8.2.jar flink-connector-kafka-0.10_2
千家信息网最后更新 2025年01月25日flink sql-clent MATCH_RECOGNIZE kafka 例子
环境 flink1.7.2
增加flink1.7.2 的lib 中的jar, 否则会报类找不到
avro-1.8.2.jar flink-connector-kafka-0.10_2.12-1.7.2.jar flink-connector-kafka-base_2.12-1.7.2.jar flink-json-1.7.2.jar kafka-clients-0.11.0.0.jarflink-avro-1.7.2.jar flink-connector-kafka-0.11_2.12-1.7.2.jar flink-core-1.7.2.jar flink-python_2.12-1.7.2.jar log4j-1.2.17.jarflink-cep_2.12-1.7.2.jar flink-connector-kafka-0.9_2.12-1.7.2.jar flink-dist_2.12-1.7.2.jar flink-table_2.12-1.7.2.jar slf4j-log4j12-1.7.15.jar
- 修改 sql-client-defaults.yaml 中的table 值
tables: - name: myTable type: source update-mode: append connector: property-version: 1 type: kafka version: 0.11 topic: im-message-topic2 startup-mode: earliest-offset properties: - key: bootstrap.servers value: kafkaip:9092 - key: group.id value: testGroup format: property-version: 1 type: json schema: "ROW(sessionId STRING, fromUid STRING, toUid STRING, chatType STRING, type STRING,msgId STRING, msg STRING, timestampSend STRING)" schema: - name: sessionId type: STRING - name: fromUid type: STRING - name: toUid type: STRING - name: chatType type: STRING - name: type type: STRING - name: msgId type: STRING - name: msg type: STRING - name: rowTime type: TIMESTAMP rowtime: timestamps: type: "from-field" from: "timestampSend" watermarks: type: "periodic-bounded" delay: "60" - name: procTime type: TIMESTAMP proctime: true
- 运行
./bin/sql-client.sh embedded select * from myTable;
然后使用 MATCH_RECOGNIZE 的sql
SELECT * FROM myTable MATCH_RECOGNIZE ( PARTITION BY sessionId ORDER BY rowTime MEASURES e2.procTime as answerTime, LAST(e1.procTime) as customer_event_time, e2.fromUid as empUid, e1.procTime as askTime, 1 as total_talk ONE ROW PER MATCH AFTER MATCH SKIP TO LAST e2 PATTERN (e1 e2) DEFINE e1 as e1.type = 'yonghu', e2 as e2.type = 'guanjia' );
上面是使用sql-client 不用谢代码,当然也可以写代码,下面是对应的程序
public static void main(String[] arg) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); tableEnv.connect(new Kafka() .version("0.11") .topic("im-message-topic3") //.property("zookeeper.connect","") .property("bootstrap.servers","kafkaip:9092") .startFromEarliest() .sinkPartitionerRoundRobin()//Flink分区随机映射到kafka分区 ).withFormat(new Json() .failOnMissingField(false) .deriveSchema() ).withSchema(new Schema() .field("sessionId", Types.STRING).from("sessionId") .field("fromUid", Types.STRING).from("fromUid") .field("toUid", Types.STRING).from("toUid") .field("chatType", Types.STRING).from("chatType") .field("type", Types.STRING).from("type") .field("msgId", Types.STRING).from("msgId") .field("msg", Types.STRING).from("msg")// .field("timestampSend", Types.SQL_TIMESTAMP) .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("timestampSend") .watermarksPeriodicBounded(1000) ) .field("proctime", Types.SQL_TIMESTAMP).proctime() ).inAppendMode().registerTableSource("myTable"); Table tb2 = tableEnv.sqlQuery( "SELECT " + "answerTime, customer_event_time, empUid, noreply_counts, total_talk " + "FROM myTable" + " " + "MATCH_RECOGNIZE ( " + "PARTITION BY sessionId " + "ORDER BY rowtime " + "MEASURES " + "e2.rowtime as answerTime, "+ "LAST(e1.rowtime) as customer_event_time, " + "e2.fromUid as empUid, " + "1 as noreply_counts, " + "e1.rowtime as askTime," + "1 as total_talk " + "ONE ROW PER MATCH " + "AFTER MATCH SKIP TO LAST e2 " + "PATTERN (e1 e2) " + "DEFINE " + "e1 as e1.type = 'yonghu', " + "e2 as e2.type = 'guanjia' " + ")"+ "" ); DataStream appendStream =tableEnv.toAppendStream(tb2, Row.class); System.out.println("schema is:"); tb2.printSchema(); appendStream.writeAsText("/usr/local/whk", WriteMode.OVERWRITE); logger.info("stream end"); Table tb3 = tableEnv.sqlQuery("select sessionId, type from myTable"); DataStream temp =tableEnv.toAppendStream(tb3, Row.class); tb3.printSchema(); temp.writeAsText("/usr/local/whk2", WriteMode.OVERWRITE); env.execute("msg test"); }
大功告成,其实里面坑很多。
注意:如果使用了 TimeCharacteristic.EventTime, 请不用再使用procTime。
不用
代码
大功告成
大功
环境
程序
会报
运行
例子
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
软件开发部门的顶梁柱称为什么
青岛港软件开发招生学历
网络安全专业的视频素材
vmware虚拟机服务器管理
怎样在服务器上看到访问信息
西安软件开发外派公司
软件开发电脑配置组装
网络安全实训的总结
网络技术中专
徐汇区网络技术服务咨询口碑推荐
服务器root权限管理
软件开发没有开发思维
逻辑算符在数据库中执行的顺序是
测试岗位面试试题数据库
网络安全的重要性外国文献
个人信息基础数据库是哪个设计的
邹平机械软件开发报价
儿童网络安全顺口溜
软件开发产品经理职能
中国信通网络安全考试
创梦云服务器管理系统
云服务器到期了会有什么影响
货拉拉物流软件开发者是谁
阜阳联想服务器内存条推荐商家
景区软件开发政策
国产服务器处理器排行榜
nist动力学数据库
通讯服务器3.3v待机电源
省平台服务器错误是什么意思
北碚区综合软件开发服务公司