千家信息网

Flink的函数有哪些

发表于:2024-11-18 作者:千家信息网编辑
千家信息网最后更新 2024年11月18日,这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。1. Map: 将数据流中的数据进行一个转化,形
千家信息网最后更新 2024年11月18日Flink的函数有哪些

这篇文章主要介绍了Flink的函数有哪些,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

1. Map: 将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素

具体代码实现

package com.wudl.core;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName WordMap * @Description TODO map 算子实例 * @Date 2020/10/29 10:15 */public class WordMap {    /**     * @param args     * Map 函数的用法     * 映射:将数据流中的数据进行一个转化,形成一个新的数据流,消费一个元素,并且产生一个元素     *参数: Lambda 表达式或者,new MapFunction实现类     * 返回值:DataStream     */    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setMaxParallelism(1);        env.socketTextStream("10.204.125.140", 8899)                .map(new MapFunction() {                    @Override                    public String map(String s) throws Exception {                        String[] split = s.split(",");                        return split[0] + "---" + split[1];                    }                }).print();        env.execute();    }}

2. FlatMap:

将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素
package com.wudl.core;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;import java.util.Arrays;import java.util.List;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFlatMap * @Description TODO FlatMap * * FlatMap: 是一种扁平的映射,将数据流中的整体拆分成为一个个的个体使用, 消费后的元素产生零到多个元素 * * * * @Author wudl * @Date 2020/10/29 10:46 * * * 函数 FlatMap * 将数据流中的整体拆分成一个 一个 的个体使用, 消费一个元素并产生零到多个元素 * 参数: lambda 表达式或者是FlatFunction的实现类 * 返回值:DataStream * * * */public class TransformFlatMap {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);//        DataStreamSource> listDs = env.fromCollection(Arrays.asList(//                Arrays.asList(1, 2, 3),//                Arrays.asList(3, 4, 5),//                Arrays.asList(8,9,0)//        ));//        listDs.flatMap(new FlatMapFunction, Integer>() {//            @Override//            public void flatMap(List list, Collector collector) throws Exception {////                for (Integer number : list) {//                    collector.collect(number + 100);//                }////            }//        }).print();        DataStreamSource strDs = env.socketTextStream("10.204.125.140", 8899);        strDs.flatMap(new FlatMapFunction() {            @Override            public void flatMap(String s, Collector collector) throws Exception {                String[] split = s.split(",");                collector.collect(split[0]+split[1]);            }        }).print();        env.execute();    }}

第三种:Filter 对数据流的过滤根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false) 将丢弃

package com.wudl.core;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/** * @version v1.0 * @ProjectName Flinklearning * @ClassName TransformFilter * @Description TODO 流的过滤 * @Date 2020/11/5 10:26 */public class TransformFilter {    /**     * 函数中Filter 中过滤     * 过滤:根据指定的规则将满足条件的(true) 的数据保留, 不瞒住条件的(false)  将丢弃     * 返回值:DataStream     */    public static void main(String[] args) throws Exception {        //1.获取上下文的环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        //2.设置并行度        env.setParallelism(1);        //3.获取数据流        DataStreamSource SourceDs = env.socketTextStream("10.204.125.140", 8899);        //4. 过滤数据流        DataStream filter = SourceDs.filter(new FilterFunction() {            @Override            public boolean filter(String value) throws Exception {                String[] split = value.split(",");                return split[1].length() > 3;            }        });        filter.print();        env.execute();    }}

感谢你能够认真阅读完这篇文章,希望小编分享的"Flink的函数有哪些"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

0