Flink的函数有哪些
发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1. Map: 将数据流中的数据进行一个转化,形
千家信息网最后更新 2025年01月24日Flink的函数有哪些
这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
1. Map: 将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素
具体代码实现
package com.wudl.core;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName WordMap * @Description TODO map 算子实例 * @Date 2020/10/29 10:15 */public class WordMap { /** * @param args * Map 函数的用法 * 映射:将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素 *参数: Lambda 表达式或者,new MapFunction实现类 * 返回值:DataStream */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(1); env.socketTextStream("10.204.125.140", 8899) .map(new MapFunction() { @Override public String map(String s) throws Exception { String[] split = s.split(","); return split[0] + "---" + split[1]; } }).print(); env.execute(); }}
2. FlatMap:
将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素
package com.wudl.core;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;import java.util.List;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFlatMap * @Description TODO FlatMap * * FlatMap: 是一种扁平的映射,将数据流中的整体拆分成为一个个的个体使用, 消费后的元素产生零到多个元素 * * * * @Author wudl * @Date 2020/10/29 10:46 * * * 函数 FlatMap * 将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素 * 参数: lambda 表达式或者是FlatFunction的实现类 * 返回值:DataStream * * * */public class TransformFlatMap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// DataStreamSource> listDs = env.fromCollection(Arrays.asList(// Arrays.asList(1, 2, 3),// Arrays.asList(3, 4, 5),// Arrays.asList(8,9,0)// ));// listDs.flatMap(new FlatMapFunction
, Integer>() {// @Override// public void flatMap(List
list, Collector collector) throws Exception {//// for (Integer number : list) {// collector.collect(number + 100);// }//// }// }).print(); DataStreamSource strDs = env.socketTextStream("10.204.125.140", 8899); strDs.flatMap(new FlatMapFunction () { @Override public void flatMap(String s, Collector collector) throws Exception { String[] split = s.split(","); collector.collect(split[0]+split[1]); } }).print(); env.execute(); }}
第三种:Filter 对数据流的过滤根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃
package com.wudl.core;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFilter * @Description TODO 流的过滤 * @Date 2020/11/5 10:26 */public class TransformFilter { /** * 函数中Filter 中过滤 * 过滤:根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃 * 返回值:DataStream */ public static void main(String[] args) throws Exception { //1.获取上下文的环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.设置并行度 env.setParallelism(1); //3.获取数据流 DataStreamSourceSourceDs = env.socketTextStream("10.204.125.140", 8899); //4. 过滤数据流 DataStream filter = SourceDs.filter(new FilterFunction () { @Override public boolean filter(String value) throws Exception { String[] split = value.split(","); return split[1].length() > 3; } }); filter.print(); env.execute(); }}
感谢你能够认真阅读完这篇文章,希望小编分享的"Flink的函数有哪些"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!
数据
元素
数据流
函数
消费
条件
篇文章
个体
多个
整体
参数
表达式
规则
上下
上下文
代码
价值
兴趣
同时
实例
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
河南有几个公司的服务器虚拟主机
服务器新添加的硬盘
网络安全事件责任追究的方式包括
湘潭工商数据库
软件开发技术的另一种称呼
服务器每天自动重起一次
勤哲excel服务器闪退
网络安全岗公务员
重点人力资源数据库
苹果服务器通信失败
网络安全绘画品
好的系统软件开发
办公软件开发价格
中国初中数学标准测试数据库
厦门财务软件开发怎么样
使用甲骨文数据库
网络安全法知识征文500字
将自己电脑作为服务器
物流部门如何利用数据库完成作业
挖媒网络技术
统计系统网络安全工作总结
有关于网络安全的语句
网络安全岗公务员
江苏服务器维修哪家好
营口新世纪软件开发有限公司
专升本数据库原理及应用
信息网络安全主题班会活动方案
全国地区服务器
弱电网络技术资料
推特腾讯数据库