千家信息网

三、flink--DataStreamAPI原理以及用法

发表于:2024-09-30 作者:千家信息网编辑
千家信息网最后更新 2024年09月30日,一、DataStream基本概述1.1 datastream是什么?datastream是flink提供给用户使用的用于进行流计算和批处理的api,是对底层流式计算模型的api封装,便于用户编程。1.
千家信息网最后更新 2024年09月30日三、flink--DataStreamAPI原理以及用法

一、DataStream基本概述

1.1 datastream是什么?

datastream是flink提供给用户使用的用于进行流计算和批处理的api,是对底层流式计算模型的api封装,便于用户编程。

1.2 datastream运行模型

一个完整的datastream运行模型一般由三部分组成,分别为Source、Transformation、Sink。DataSource主要负责数据的读取(也就是从数据源读取,可以批数据源,也可以是流式数据数据源),Transformation主要负责对属于的转换操作(也就是正常的业务处逻辑),Sink负责最终数据的输出(计算结果的导出)。

1.3 datastream程序架构

一般来说,使用datastream api编写flink程序,包括以下流程:
1、获得一个执行环境;(Execution Environment)
2、加载/创建初始数据;(Source)
3、指定转换这些数据;(Transformation)
4、指定放置计算结果的位置;(Sink)
5、触发程序执行(这是流式计算必须的操作,如果是批处理则不需要)

二、DataStream api的使用

2.1 maven依赖配置

    4.0.0    SparkDemo    SparkDemoTest    1.0-SNAPSHOT            UTF-8        2.11.8        2.7.3        2.11        1.6.1                            org.apache.hadoop            hadoop-client            ${hadoop.version}                            mysql            mysql-connector-java            8.0.12                            junit            junit            4.12                            org.apache.logging.log4j            log4j-core            2.9.0                                            org.apache.flink            flink-java            1.6.1                                    org.apache.flink            flink-streaming-java_2.11            1.6.1                                                org.apache.flink            flink-streaming-scala_2.11            1.6.1                                    org.apache.flink            flink-scala_2.11            1.6.1                                    org.apache.flink            flink-clients_2.11            1.6.1                                    org.apache.flink            flink-table_2.11            1.6.1            provided                            org.apache.hadoop            hadoop-client            ${hadoop.version}                            com.alibaba            fastjson            1.2.22                            org.apache.flink            flink-connector-kafka-0.10_${scala.binary.version}            ${flink.version}                                                        org.scala-tools                maven-scala-plugin                2.15.2                                                                                        compile                            testCompile                                                                                                    maven-compiler-plugin                3.6.0                                    1.8                    1.8                                                        org.apache.maven.plugins                maven-surefire-plugin                2.19                                    true                                        

2.2 获取执行环境(Execution Environment)

有三种类型的执行环境:

1、StreamExecutionEnvironment.getExecutionEnvironment()创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。2、StreamExecutionEnvironment.createLocalEnvironment()返回本地执行环境,需要在调用时指定默认的并行度。3、StreamExecutionEnvironment.createRemoteEnvironment()返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。

2.3 常用数据源(source)

2.3.1 基于file的数据源

1、env.readTextFile(path)
一列一列的读取遵循TextInputFormat规范的文本文件,并将结果作为String返回。

package flinktest;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ExampleDemo {    public static void main(String[] args) throws Exception {        //1、创建环境对象        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2、读取文件作为数据源        DataStreamSource fileSource = env.readTextFile("/tmp/test.txt");        //3、打印数据        fileSource.print();        //4、启动任务执行        env.execute("test file source");    }}

2、env.readFile(fileInputFormat,path)
按照指定的fileinputformat格式来读取文件。这里的fileinputformat可以自定义类

package flinktest;import org.apache.flink.api.java.io.TextInputFormat;import org.apache.flink.core.fs.Path;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ExampleDemo {    public static void main(String[] args) throws Exception {        //1、创建环境对象        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2、读取文件作为数据源        DataStreamSource fileSource = env.readFile(new TextInputFormat(new Path("/tmp/test.txt")), "/tmp/test.txt");        //3、打印数据        fileSource.print();        //4、启动任务执行        env.execute("test file source");    }}

2.3.2 基于socket数据源

socketTextStream(host,port)

package flinktest;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class ExampleDemo {    public static void main(String[] args) throws Exception {        //1、创建环境对象        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2、读取socket作为数据源        DataStreamSource sourceSocket = env.socketTextStream("127.0.0.1", 1000);        //3、打印数据        sourceSocket.print();        //4、启动任务执行        env.execute("test socket source");    }}

2.3.3 基于集合collection的数据源

1、fromCollection(Collection)
从集合中创建一个数据流,集合中所有元素的类型是一致的。

List list = new ArrayList<>();DataStreamSource sourceCollection = env.fromCollection(list);       

2、fromCollection(Iterator)
从迭代(Iterator)中创建一个数据流,指定元素数据类型的类由iterator返回。

3、fromElements(Object)
从一个给定的对象序列中创建一个数据流,所有的对象必须是相同类型的

4、generateSequence(from, to)
从给定的间隔中并行地产生一个数字序列。读取一定范围的sequnce对象

2.3.4 自定义数据源

env.addSource(SourceFuntion)
自定义一个数据源实现类,然后 addSource 到到env中。比如场景的从kafka读取数据,从mysql读取数据

2.4 常用输出(sink)

Data Sink 消费DataStream中的数据,并将它们转发到文件、套接字、外部系统或者打印出。Flink有许多封装在DataStream操作里的内置输出格式。
1、 writeAsText
将元素以字符串形式逐行写入(TextOutputFormat),这些字符串通过调用每个元素的toString()方法来获取。

2、WriteAsCsv
将元组以逗号分隔写入文件中(CsvOutputFormat),行及字段之间的分隔是可配置的。每个字段的值来自对象的toString()方法。

3、print/printToErr
打印每个元素的toString()方法的值到标准输出或者标准错误输出流中。或者也可以在输出流中添加一个前缀,这个可以帮助区分不同的打印调用,如果并行度大于1,那么输出也会有一个标识由哪个任务产生的标志。

4、 writeUsingOutputFormat
自定义文件输出的方法和基类(FileOutputFormat),支持自定义对象到字节的转换。

5、writeToSocket
根据SerializationSchema 将元素写入到socket中。

6、stream.addSink(SinkFunction)
使用自定义的sink类

2.5 常用算子(transformation operator)

2.5.1 map

DataStream → DataStream:输入一个参数经过处理产生一个新的参数

DataStream dataStream = //...dataStream.map(new MapFunction() {    @Override    //这里将每个参数 * 2,然后返回    public Integer map(Integer value) throws Exception {        return 2 * value;    }});

2.5.2 flatMap

DataStream → DataStream:输入一个参数,产生0个、1个或者多个输出。

dataStream.flatMap(new FlatMapFunction() {    @Override    public void flatMap(String value, Collector out)        throws Exception {        //切割字符串,将处理之后的数据放到 collector 中。        for(String word: value.split(" ")){            out.collect(word);        }    }});

2.5.3 filter

DataStream → DataStream:计算每个元素的布尔值,并返回布尔值为true的元素。下面这个例子是过滤出非0的元素:

dataStream.filter(new FilterFunction() {    @Override    public boolean filter(Integer value) throws Exception {        return value != 0;    }});

2.5.4 keyBy

DataStream → KeyedStream:要求输入是tuple,或者是一个复合对象,里面有多个属性(例如student类,里面有name、age等2个以上的属性),反正就是必须有作为key和value的数据。根据key进行分区,相同key的在同一个分区,在内部使用hash实现。

//有不同方式指定keydataStream.keyBy("someKey") // 指定key的字段名称,常用于复合对象中dataStream.keyBy(0) // 指定key的位置,常用于tuple中

2.5.5 reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果,也就是每一次聚合的结果都会返回,直到最后一次聚合结束,所以不是只返回最后一个聚合结果。

keyedStream.reduce(new ReduceFunction() {    @Override    public Integer reduce(Integer value1, Integer value2)    throws Exception {        return value1 + value2;    }});

2.5.6 fold

KeyedStream → DataStream
一个有初始值的分组数据流的滚动折叠操作,合并当前元素和前一次折叠操作的结果,并产生一个新的值,返回的流中包含每一次折叠的结果,而不是只返回最后一次折叠的最终结果。

DataStream result =  keyedStream.fold("start", new FoldFunction() {    @Override    public String fold(String current, Integer value) {        return current + "-" + value;    }  });运行结果为:假设数据源为 (1,2,3,4,5)结果为:start-1,start-1-2...... 

2.5.7 aggregations

KeyedStream →DataStream:分组数据流上的滚动聚合操作。min和minBy的区别是min返回的是一个最小值,而minBy返回的是其字段中包含最小值的元素(同样原理适用于max和maxBy),返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

keyedStream.sum(0);keyedStream.sum("key");keyedStream.min(0);keyedStream.min("key");keyedStream.max(0);keyedStream.max("key");keyedStream.minBy(0);keyedStream.minBy("key");keyedStream.maxBy(0);keyedStream.maxBy("key");

注意:在2.3.10之前的算子都是可以直接作用在Stream上的,因为他们不是聚合类型的操作,但是到2.3.10后你会发现,我们虽然可以对一个无边界的流数据直接应用聚合算子,但是它会记录下每一次的聚合结果,这往往不是我们想要的,其实,reduce、fold、aggregation这些聚合算子都是和Window配合使用的,只有配合Window,才能得到想要的结果。

2.5.8 connect、coMap、coFlatMap

1、connect:
DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

DataStream someStream = //...DataStream otherStream = //...ConnectedStreams connectedStreams = someStream.connect(otherStream);

2、coMap、coFlatMap
ConnectedStreams → DataStream:专门用于connect之后的stream操作的map和flatmap算子。

connectedStreams.map(new CoMapFunction() {    @Override    public Boolean map1(Integer value) {        return true;    }    @Override    public Boolean map2(String value) {        return false;    }});connectedStreams.flatMap(new CoFlatMapFunction() {   @Override   public void flatMap1(Integer value, Collector out) {       out.collect(value.toString());   }   @Override   public void flatMap2(String value, Collector out) {       for (String word: value.split(" ")) {         out.collect(word);       }   }});

2.5.9 split和select

split:
DataStream → SplitStream:将一个数据流拆分成两个或者多个数据流.并且会给每个数据流起一个别名

select:SplitStream→DataStream:从一个SplitStream中获取一个或者多个DataStream。

SplitStream split = someDataStream.split(new OutputSelector() {    @Override    public Iterable select(Integer value) {        List output = new ArrayList();        if (value % 2 == 0) {            output.add("even");        }        else {            output.add("odd");        }        return output;    }});split.select("even").print();split.select("odd").print();

2.5.10 union

DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。这和connect不一样,connect并没有合并多个stream

dataStream.union(otherStream1, otherStream2, ...);
数据 结果 元素 数据源 环境 对象 数据流 输出 文件 两个 程序 类型 运行 多个 算子 也就是 任务 参数 字段 常用 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 响应国家号召维护网络安全 联想服务器远程管理有效期 和平区项目网络技术售后保障 2020的网络安全宣传周的意义 手机有时显示未连接到服务器 各单位网络安全人落实 数据库登陆的ip地址是什么 网络技术新干线 山东办公系统软件开发价格表 我的世界手机版小游戏服务器ip 软件开发架构师用什么语言 东莞上位机软件开发培训 网络安全技术的演进 有没有云服务器招商加盟项目 网络技术应用清华版电子教材 人体外表结构图数据库 关于网络安全主题黑板报 高中生网络安全总结 软件开发安全风险分析 网络安全教育漫画图片 数据库系统包括 opc服务器连多个设备 dcs数据库01地址 网络安全专业有必要培训吗 大逃杀换服务器 阿里巴巴网络安全大赛 sql两个数据库共用一个表 为了数据库的安全稳定 梦想精灵谷神奇宝贝服务器 规划局办公软件开发公司
0