如何实现Apache Flink中Flink数据流转换
发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,本篇文章给大家分享的是有关如何实现Apache Flink中Flink数据流转换,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。Opera
千家信息网最后更新 2025年02月06日如何实现Apache Flink中Flink数据流转换
本篇文章给大家分享的是有关如何实现Apache Flink中Flink数据流转换,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
Operators操作转换一个或多个DataStream到一个新的DataStream 。
filter function
Scala
object DataStreamTransformationApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment filterFunction(env) env.execute("DataStreamTransformationApp") } def filterFunction(env: StreamExecutionEnvironment): Unit = { val data=env.addSource(new CustomNonParallelSourceFunction) data.map(x=>{ println("received:" + x) x }).filter(_%2 == 0).print().setParallelism(1) }}
数据源选择之前的任意一个数据源即可。
这里的map中没有做任何实质性的操作,filter中将所有的数都对2取模操作,打印结果如下:
received:1received:22received:3received:44received:5received:66received:7received:88
说明map中得到的所有的数据,而在filter中进行了过滤操作。
Java
public static void filterFunction(StreamExecutionEnvironment env) { DataStreamSourcedata = env.addSource(new JavaCustomParallelSourceFunction()); data.setParallelism(1).map(new MapFunction () { @Override public Long map(Long value) throws Exception { System.out.println("received:"+value); return value; } }).filter(new FilterFunction () { @Override public boolean filter(Long value) throws Exception { return value % 2==0; } }).print().setParallelism(1); }
需要先使用data.setParallelism(1)然后再进行map操作,否则会输出多次。因为我们用的是JavaCustomParallelSourceFunction(),而当我们使用JavaCustomNonParallelSourceFunction时,默认就是并行度1,可以不用设置。
Union Function
Scala
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment// filterFunction(env) unionFunction(env) env.execute("DataStreamTransformationApp") } def unionFunction(env: StreamExecutionEnvironment): Unit = { val data01 = env.addSource(new CustomNonParallelSourceFunction) val data02 = env.addSource(new CustomNonParallelSourceFunction) data01.union(data02).print().setParallelism(1) }
Union操作将两个数据集综合起来,可以一同处理,上面打印输出如下:
11223344
Java
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();// filterFunction(environment); unionFunction(environment); environment.execute("JavaDataStreamTransformationApp"); } public static void unionFunction(StreamExecutionEnvironment env) { DataStreamSourcedata1 = env.addSource(new JavaCustomNonParallelSourceFunction()); DataStreamSource data2 = env.addSource(new JavaCustomNonParallelSourceFunction()); data1.union(data2).print().setParallelism(1); }
Split Select Function
Scala
split可以将一个流拆成多个流,select可以从多个流中进行选择处理的流。
def splitSelectFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomNonParallelSourceFunction) val split = data.split(new OutputSelector[Long] { override def select(value: Long): lang.Iterable[String] = { val list = new util.ArrayList[String]() if (value % 2 == 0) { list.add("even") } else { list.add("odd") } list } }) split.select("odd","even").print().setParallelism(1) }
可以根据选择的名称来处理数据。
Java
public static void splitSelectFunction(StreamExecutionEnvironment env) { DataStreamSourcedata = env.addSource(new JavaCustomNonParallelSourceFunction()); SplitStream split = data.split(new OutputSelector () { @Override public Iterable select(Long value) { List output = new ArrayList<>(); if (value % 2 == 0) { output.add("odd"); } else { output.add("even"); } return output; } }); split.select("odd").print().setParallelism(1); }
以上就是如何实现Apache Flink中Flink数据流转换,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。
数据
多个
处理
选择
数据流
就是
数据源
更多
知识
篇文章
输出
实用
不用
两个
中将
名称
实质
实质性
工作会
文章
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
微软云服务器搭建教程
西安绿硒网络技术有限公司
做域名解析的服务器
湖南pdu服务器专用电源种类
县级网络安全协会
热搜榜cba数据库
服务器如何搬家
中小学家校共育与网络安全
小型聊天软件开发的实训报告
数字五笔软件开发
中化集团互联网科技
网络安全工程师证书好考吗
安庆鑫马网络技术有限责任公司
阳光电源科大讯飞软件开发
关于计算机软件开发文件
服务器进pe按什么键
医药共享数据库
戴尔r730服务器创建raid
软件开发找工作需要回什么
hyde数据库
数据库设计规范性
王者荣耀游戏无法连接服务器
g胖的服务器
哪个公司的软件开发
dayz社区怎么没有服务器
挪威网络安全战略
实惠的app软件开发管理
浪潮服务器QCODE1提示CC
公民信息数据库
生活中怎么注意网络安全