千家信息网

flinksql env的定义

发表于:2024-11-20 作者:千家信息网编辑
千家信息网最后更新 2024年11月20日,本篇内容介绍了"flinksql env的定义"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1、编写
千家信息网最后更新 2024年11月20日flinksql env的定义

本篇内容介绍了"flinksql env的定义"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1、编写 pom

    4.0.0    org.example    flinksqldemo    1.0-SNAPSHOT                    UTF-8        UTF-8        2.11        2.11.8        0.10.2.1        1.12.0        2.7.3                compile                                            org.apache.maven.plugins                maven-compiler-plugin                                    8                    8                                                                        org.apache.flink            flink-table-planner-blink_2.11            1.12.0                            org.apache.flink            flink-java            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-streaming-java_2.11            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-clients_2.11            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-connector-kafka-0.10_${scala.binary.version}            ${flink.version}                            org.apache.flink            flink-streaming-scala_${scala.binary.version}            ${flink.version}            ${setting.scope}                            org.apache.flink            flink-connector-filesystem_${scala.binary.version}            ${flink.version}                                                    org.apache.kafka            kafka_${scala.binary.version}            ${kafka.version}            ${setting.scope}                                            org.apache.hadoop            hadoop-common            ${hadoop.version}            ${setting.scope}                            org.apache.hadoop            hadoop-hdfs            ${hadoop.version}            ${setting.scope}                            org.apache.hadoop            hadoop-client            ${hadoop.version}            ${setting.scope}                                    org.slf4j            slf4j-api            1.7.25                            com.alibaba            fastjson            1.2.72                            redis.clients            jedis            2.7.3                            com.google.guava            guava            29.0-jre            

2、编写代码

package com.jd.data;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.TableEnvironment;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkTableApiDemo {    public static void main(String[] args) {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        DataStreamSource stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");//        1、创建表执行环节        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//        ==============================================//        1.1 老版本planner的流式查询        EnvironmentSettings set = EnvironmentSettings.newInstance()                .useOldPlanner() //用老版本                .inStreamingMode() //流式处理                .build();//        老版本的流式处理执行环境        StreamTableEnvironment oldStreamingEnv = StreamTableEnvironment.create(env, set);//      1.2 老版本批处理环境        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(executionEnvironment);//        =========================================================//        1.3 blink 版本的流式查询        EnvironmentSettings settings = EnvironmentSettings.newInstance()                .useBlinkPlanner()                .inStreamingMode()                .build();        StreamTableEnvironment blinkTableEnv = StreamTableEnvironment.create(env, settings);//        1.4 blink 版本的批处理查询        EnvironmentSettings bsettings = EnvironmentSettings.newInstance()                .useBlinkPlanner()                .inBatchMode()                .build();        TableEnvironment blinkBatchTableEnvironment = TableEnvironment.create(settings);    }}

"flinksql env的定义"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0