Flink的函数有哪些
发表于:2024-11-18 作者:千家信息网编辑
千家信息网最后更新 2024年11月18日,这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1. Map: 将数据流中的数据进行一个转化,形
千家信息网最后更新 2024年11月18日Flink的函数有哪些
这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
1. Map: 将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素
具体代码实现
package com.wudl.core;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName WordMap * @Description TODO map 算子实例 * @Date 2020/10/29 10:15 */public class WordMap { /** * @param args * Map 函数的用法 * 映射:将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素 *参数: Lambda 表达式或者,new MapFunction实现类 * 返回值:DataStream */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setMaxParallelism(1); env.socketTextStream("10.204.125.140", 8899) .map(new MapFunction() { @Override public String map(String s) throws Exception { String[] split = s.split(","); return split[0] + "---" + split[1]; } }).print(); env.execute(); }}
2. FlatMap:
将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素
package com.wudl.core;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;import java.util.List;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFlatMap * @Description TODO FlatMap * * FlatMap: 是一种扁平的映射,将数据流中的整体拆分成为一个个的个体使用, 消费后的元素产生零到多个元素 * * * * @Author wudl * @Date 2020/10/29 10:46 * * * 函数 FlatMap * 将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素 * 参数: lambda 表达式或者是FlatFunction的实现类 * 返回值:DataStream * * * */public class TransformFlatMap { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1);// DataStreamSource> listDs = env.fromCollection(Arrays.asList(// Arrays.asList(1, 2, 3),// Arrays.asList(3, 4, 5),// Arrays.asList(8,9,0)// ));// listDs.flatMap(new FlatMapFunction
, Integer>() {// @Override// public void flatMap(List
list, Collector collector) throws Exception {//// for (Integer number : list) {// collector.collect(number + 100);// }//// }// }).print(); DataStreamSource strDs = env.socketTextStream("10.204.125.140", 8899); strDs.flatMap(new FlatMapFunction () { @Override public void flatMap(String s, Collector collector) throws Exception { String[] split = s.split(","); collector.collect(split[0]+split[1]); } }).print(); env.execute(); }}
第三种:Filter 对数据流的过滤根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃
package com.wudl.core;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFilter * @Description TODO 流的过滤 * @Date 2020/11/5 10:26 */public class TransformFilter { /** * 函数中Filter 中过滤 * 过滤:根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃 * 返回值:DataStream */ public static void main(String[] args) throws Exception { //1.获取上下文的环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.设置并行度 env.setParallelism(1); //3.获取数据流 DataStreamSourceSourceDs = env.socketTextStream("10.204.125.140", 8899); //4. 过滤数据流 DataStream filter = SourceDs.filter(new FilterFunction () { @Override public boolean filter(String value) throws Exception { String[] split = value.split(","); return split[1].length() > 3; } }); filter.print(); env.execute(); }}
感谢你能够认真阅读完这篇文章,希望小编分享的"Flink的函数有哪些"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!
数据
元素
数据流
函数
消费
条件
篇文章
个体
多个
整体
参数
表达式
规则
上下
上下文
代码
价值
兴趣
同时
实例
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
outlook提示服务器正忙
局域网服务器密码忘了怎么办
手机网络安全秘钥在哪
服务器怎么降温
远程服务器ping不通
中国澳门超频服务器厂家供应
系统集成软件开发 职责
麒麟 数据库sql怎么写
数据库符号简单
上位机软件开发论文
网络安全公司的logo
华为网络安全合作公司
数据库执行器技术
运行软件开发的电脑
分层网络技术
我们的网络安全空间
网络安全课进校园手抄报简单
西湖龙井什么时候开的服务器
计算机网络技术全网广播地址
绝对演绎服务器名字
服务器中打印管理如何使用
威县网络安全
计算机网络安全检查报告
长沙畅为软件开发有限公司
网络安全法是在
数据库对加密数据的查询
mv服务器下载失败
传奇4如何搭建自己的服务器ip
怎样给网络安全设置密码
怎么察看数据库的字段长度