千家信息网

flink sql-clent MATCH_RECOGNIZE kafka 例子

发表于:2025-01-25 作者:千家信息网编辑
千家信息网最后更新 2025年01月25日,环境 flink1.7.2增加flink1.7.2 的lib 中的jar, 否则会报类找不到avro-1.8.2.jar flink-connector-kafka-0.10_2
千家信息网最后更新 2025年01月25日flink sql-clent MATCH_RECOGNIZE kafka 例子

环境 flink1.7.2

  1. 增加flink1.7.2 的lib 中的jar, 否则会报类找不到

    avro-1.8.2.jar            flink-connector-kafka-0.10_2.12-1.7.2.jar  flink-connector-kafka-base_2.12-1.7.2.jar  flink-json-1.7.2.jar         kafka-clients-0.11.0.0.jarflink-avro-1.7.2.jar      flink-connector-kafka-0.11_2.12-1.7.2.jar  flink-core-1.7.2.jar                       flink-python_2.12-1.7.2.jar  log4j-1.2.17.jarflink-cep_2.12-1.7.2.jar  flink-connector-kafka-0.9_2.12-1.7.2.jar   flink-dist_2.12-1.7.2.jar                  flink-table_2.12-1.7.2.jar   slf4j-log4j12-1.7.15.jar
  2. 修改 sql-client-defaults.yaml 中的table 值
tables:  - name: myTable    type: source    update-mode: append    connector:      property-version: 1      type: kafka      version: 0.11      topic: im-message-topic2      startup-mode: earliest-offset      properties:        - key: bootstrap.servers          value: kafkaip:9092        - key: group.id          value: testGroup    format:      property-version: 1      type: json      schema: "ROW(sessionId STRING, fromUid STRING, toUid STRING, chatType STRING, type STRING,msgId STRING, msg STRING, timestampSend STRING)"    schema:      - name: sessionId        type: STRING      - name: fromUid        type: STRING      - name: toUid        type: STRING              - name: chatType        type: STRING      - name: type        type: STRING      - name: msgId        type: STRING      - name: msg        type: STRING                  - name: rowTime        type: TIMESTAMP        rowtime:          timestamps:            type: "from-field"            from: "timestampSend"          watermarks:            type: "periodic-bounded"            delay: "60"      - name: procTime        type: TIMESTAMP        proctime: true
  1. 运行
./bin/sql-client.sh embedded select * from myTable;

然后使用 MATCH_RECOGNIZE 的sql

SELECT  *   FROM myTable    MATCH_RECOGNIZE (   PARTITION BY sessionId   ORDER BY rowTime   MEASURES   e2.procTime as answerTime,  LAST(e1.procTime) as customer_event_time,   e2.fromUid as empUid,     e1.procTime as askTime,                        1 as total_talk            ONE ROW PER MATCH   AFTER MATCH SKIP TO LAST e2    PATTERN (e1 e2)   DEFINE   e1 as e1.type = 'yonghu',     e2 as e2.type = 'guanjia'   );

上面是使用sql-client 不用谢代码,当然也可以写代码,下面是对应的程序

public static void main(String[] arg) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        StreamTableEnvironment  tableEnv = TableEnvironment.getTableEnvironment(env);        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        tableEnv.connect(new Kafka()                    .version("0.11")                    .topic("im-message-topic3")                    //.property("zookeeper.connect","")                    .property("bootstrap.servers","kafkaip:9092")                    .startFromEarliest()                    .sinkPartitionerRoundRobin()//Flink分区随机映射到kafka分区            ).withFormat(new Json()                    .failOnMissingField(false)                    .deriveSchema()            ).withSchema(new Schema()                    .field("sessionId", Types.STRING).from("sessionId")                    .field("fromUid", Types.STRING).from("fromUid")                    .field("toUid", Types.STRING).from("toUid")                    .field("chatType", Types.STRING).from("chatType")                    .field("type", Types.STRING).from("type")                    .field("msgId", Types.STRING).from("msgId")                    .field("msg", Types.STRING).from("msg")//                  .field("timestampSend", Types.SQL_TIMESTAMP)                    .field("rowtime", Types.SQL_TIMESTAMP)                    .rowtime(new Rowtime()                            .timestampsFromField("timestampSend")                            .watermarksPeriodicBounded(1000)                    )                    .field("proctime", Types.SQL_TIMESTAMP).proctime()            ).inAppendMode().registerTableSource("myTable");        Table tb2 = tableEnv.sqlQuery(                "SELECT " +                        "answerTime, customer_event_time, empUid, noreply_counts, total_talk " +                        "FROM myTable" +                        " " +                        "MATCH_RECOGNIZE ( " +                        "PARTITION BY sessionId " +                        "ORDER BY rowtime " +                        "MEASURES " +                        "e2.rowtime as answerTime, "+                        "LAST(e1.rowtime) as customer_event_time, " +                        "e2.fromUid as empUid, " +                        "1 as noreply_counts, " +                        "e1.rowtime as askTime," +                                              "1 as total_talk " +                                  "ONE ROW PER MATCH " +                        "AFTER MATCH SKIP TO LAST e2 " +                        "PATTERN (e1 e2) " +                        "DEFINE " +                        "e1 as e1.type = 'yonghu', " +                        "e2 as e2.type = 'guanjia' " +                        ")"+                        ""                );           DataStream appendStream =tableEnv.toAppendStream(tb2, Row.class);            System.out.println("schema is:");            tb2.printSchema();        appendStream.writeAsText("/usr/local/whk", WriteMode.OVERWRITE);        logger.info("stream end");          Table tb3 = tableEnv.sqlQuery("select  sessionId, type  from myTable");        DataStream temp =tableEnv.toAppendStream(tb3, Row.class);        tb3.printSchema();        temp.writeAsText("/usr/local/whk2", WriteMode.OVERWRITE);        env.execute("msg test");        }

大功告成,其实里面坑很多。

注意:如果使用了 TimeCharacteristic.EventTime, 请不用再使用procTime。

0