千家信息网

如何实现Apache Flink中Flink数据流转换

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,本篇文章给大家分享的是有关如何实现Apache Flink中Flink数据流转换,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。Opera
千家信息网最后更新 2025年02月06日如何实现Apache Flink中Flink数据流转换

本篇文章给大家分享的是有关如何实现Apache Flink中Flink数据流转换,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

Operators操作转换一个或多个DataStream到一个新的DataStream 。

filter function

Scala

object DataStreamTransformationApp {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    filterFunction(env)    env.execute("DataStreamTransformationApp")  }  def filterFunction(env: StreamExecutionEnvironment): Unit = {    val data=env.addSource(new CustomNonParallelSourceFunction)    data.map(x=>{      println("received:" + x)      x    }).filter(_%2 == 0).print().setParallelism(1)  }}

数据源选择之前的任意一个数据源即可。

这里的map中没有做任何实质性的操作,filter中将所有的数都对2取模操作,打印结果如下:

received:1received:22received:3received:44received:5received:66received:7received:88

说明map中得到的所有的数据,而在filter中进行了过滤操作。

Java

    public static void filterFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new JavaCustomParallelSourceFunction());        data.setParallelism(1).map(new MapFunction() {            @Override            public Long map(Long value) throws Exception {                System.out.println("received:"+value);                return value;            }        }).filter(new FilterFunction() {            @Override            public boolean filter(Long value) throws Exception {                return value % 2==0;            }        }).print().setParallelism(1);    }

需要先使用data.setParallelism(1)然后再进行map操作,否则会输出多次。因为我们用的是JavaCustomParallelSourceFunction(),而当我们使用JavaCustomNonParallelSourceFunction时,默认就是并行度1,可以不用设置。

Union Function

Scala

  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment//    filterFunction(env)    unionFunction(env)    env.execute("DataStreamTransformationApp")  }  def unionFunction(env: StreamExecutionEnvironment): Unit = {    val data01 = env.addSource(new CustomNonParallelSourceFunction)    val data02 = env.addSource(new CustomNonParallelSourceFunction)    data01.union(data02).print().setParallelism(1)  }

Union操作将两个数据集综合起来,可以一同处理,上面打印输出如下:

11223344

Java

    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//        filterFunction(environment);        unionFunction(environment);        environment.execute("JavaDataStreamTransformationApp");    }    public static void unionFunction(StreamExecutionEnvironment env) {        DataStreamSource data1 = env.addSource(new JavaCustomNonParallelSourceFunction());        DataStreamSource data2 = env.addSource(new JavaCustomNonParallelSourceFunction());        data1.union(data2).print().setParallelism(1);    }

Split Select Function

Scala

split可以将一个流拆成多个流,select可以从多个流中进行选择处理的流。

def splitSelectFunction(env: StreamExecutionEnvironment): Unit = {    val data = env.addSource(new CustomNonParallelSourceFunction)    val split = data.split(new OutputSelector[Long] {      override def select(value: Long): lang.Iterable[String] = {        val list = new util.ArrayList[String]()        if (value % 2 == 0) {          list.add("even")        } else {          list.add("odd")        }        list      }    })    split.select("odd","even").print().setParallelism(1)  }

可以根据选择的名称来处理数据。

Java

public static void splitSelectFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new JavaCustomNonParallelSourceFunction());        SplitStream split = data.split(new OutputSelector() {            @Override            public Iterable select(Long value) {                List output = new ArrayList<>();                if (value % 2 == 0) {                    output.add("odd");                } else {                    output.add("even");                }                return output;            }        });        split.select("odd").print().setParallelism(1);    }

以上就是如何实现Apache Flink中Flink数据流转换,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0