千家信息网

flinksql如何链接kafka

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要介绍"flinksql如何链接kafka",在日常操作中,相信很多人在flinksql如何链接kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"fli
千家信息网最后更新 2025年01月23日flinksql如何链接kafka

这篇文章主要介绍"flinksql如何链接kafka",在日常操作中,相信很多人在flinksql如何链接kafka问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"flinksql如何链接kafka"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

    4.0.0    org.example    flinksqldemo    1.0-SNAPSHOT                    UTF-8        UTF-8        2.11        2.11.8        0.10.2.1        1.12.0        2.7.3                compile                                            org.apache.maven.plugins                maven-compiler-plugin                                    8                    8                                                                        org.apache.flink            flink-table-planner-blink_2.11            1.12.0                            org.apache.flink            flink-java            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-streaming-java_2.11            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-clients_2.11            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-connector-kafka_2.11            1.12.0                            org.apache.flink            flink-csv            1.12.0                            org.apache.flink            flink-streaming-scala_${scala.binary.version}            ${flink.version}            ${setting.scope}                                                    org.apache.kafka            kafka_${scala.binary.version}            ${kafka.version}            ${setting.scope}                                            org.apache.hadoop            hadoop-common            ${hadoop.version}            ${setting.scope}                            org.apache.hadoop            hadoop-hdfs            ${hadoop.version}            ${setting.scope}                            org.apache.hadoop            hadoop-client            ${hadoop.version}            ${setting.scope}                                    org.slf4j            slf4j-api            1.7.25                            com.alibaba            fastjson            1.2.72                            redis.clients            jedis            2.7.3                            com.google.guava            guava            29.0-jre            

代码:

package com.jd.data;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.DataTypes;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.descriptors.Csv;import org.apache.flink.table.descriptors.Kafka;import org.apache.flink.table.descriptors.Schema;import org.apache.flink.types.Row;public class TableApiConnectKafka04 {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);//        1、创建表执行环节        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);          tableEnv.connect(new Kafka()                .version("0.11") // 定义版本                .topic("xxx") // 定义主题                .property("zookeeper.connect", "localhost:2181")                .property("bootstrap.servers", "localhost:9092")        ).withFormat(new Csv()).withSchema(new Schema().field("a", DataTypes.STRING())  // 定义表的结构                  .field("b", DataTypes.STRING())                  .field("c", DataTypes.STRING())          )                  .inAppendMode()                  .createTemporaryTable("xxx");        Table xxx = tableEnv.from("xxx");        xxx.printSchema();        tableEnv.toAppendStream(xxx,  Row.class ).print();        env.execute("job");    }}

到此,关于"flinksql如何链接kafka"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0