Flink的函数有哪些
发表于:2024-11-18 作者:千家信息网编辑
千家信息网最后更新 2024年11月18日,这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1. Map: 将数据流中的数据进行一个转化,形
千家信息网最后更新 2024年11月18日Flink的函数有哪些
这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
1. Map: 将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素
具体代码实现
package com.wudl.core;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName WordMap * @Description TODO map 算子实例 * @Date 2020/10/29 10:15 */public class WordMap { /** * @param args * Map 函数的用法 * 映射:将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素 *参数: Lambda 表达式或者,new MapFunction实现类 * 返回值:DataStream */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(1); env.socketTextStream("10.204.125.140", 8899) .map(new MapFunction() { @Override public String map(String s) throws Exception { String[] split = s.split(","); return split[0] + "---" + split[1]; } }).print(); env.execute(); }}
2. FlatMap:
将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素
package com.wudl.core;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;import java.util.List;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFlatMap * @Description TODO FlatMap * * FlatMap: 是一种扁平的映射,将数据流中的整体拆分成为一个个的个体使用, 消费后的元素产生零到多个元素 * * * * @Author wudl * @Date 2020/10/29 10:46 * * * 函数 FlatMap * 将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素 * 参数: lambda 表达式或者是FlatFunction的实现类 * 返回值:DataStream * * * */public class TransformFlatMap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// DataStreamSource> listDs = env.fromCollection(Arrays.asList(// Arrays.asList(1, 2, 3),// Arrays.asList(3, 4, 5),// Arrays.asList(8,9,0)// ));// listDs.flatMap(new FlatMapFunction
, Integer>() {// @Override// public void flatMap(List
list, Collector collector) throws Exception {//// for (Integer number : list) {// collector.collect(number + 100);// }//// }// }).print(); DataStreamSource strDs = env.socketTextStream("10.204.125.140", 8899); strDs.flatMap(new FlatMapFunction () { @Override public void flatMap(String s, Collector collector) throws Exception { String[] split = s.split(","); collector.collect(split[0]+split[1]); } }).print(); env.execute(); }}
第三种:Filter 对数据流的过滤根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃
package com.wudl.core;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFilter * @Description TODO 流的过滤 * @Date 2020/11/5 10:26 */public class TransformFilter { /** * 函数中Filter 中过滤 * 过滤:根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃 * 返回值:DataStream */ public static void main(String[] args) throws Exception { //1.获取上下文的环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.设置并行度 env.setParallelism(1); //3.获取数据流 DataStreamSourceSourceDs = env.socketTextStream("10.204.125.140", 8899); //4. 过滤数据流 DataStream filter = SourceDs.filter(new FilterFunction () { @Override public boolean filter(String value) throws Exception { String[] split = value.split(","); return split[1].length() > 3; } }); filter.print(); env.execute(); }}
感谢你能够认真阅读完这篇文章,希望小编分享的"Flink的函数有哪些"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!
数据
元素
数据流
函数
消费
条件
篇文章
个体
多个
整体
参数
表达式
规则
上下
上下文
代码
价值
兴趣
同时
实例
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
北京品质软件开发服务以客为尊
康皱免费领商城软件开发
国能e购 软件开发
门头沟区网络技术信息采购
java调用数据库连接
神通数据库的数据编码
便宜软件开发公司
网络安全知识答题答案在哪
温州文成县优秀软件开发品牌
四川雅安软件开发外包公司
葡萄酒数据库
河南超频服务器销售
韩金网络技术有限公司
网络安全 郭剑云
眼视光技术和计算机网络技术
软件开发 无形资产 额度
vb没有数据库控件
深圳智能软件开发定制
2021网络技术发展
分析型数据库和事务型数据库
数据库题目举例
什么叫计算机网络安全
数据库不用布尔类型
网络安全人为
邯郸应用软件开发服务
网络安全涉及的行业分工
临夏州网络安全倡议
web服务器网页独立网址
微信有服务器保存用户信息吗
连接星火数据库失败