千家信息网

Apache下Flink transformation的用法

发表于:2024-11-22 作者:千家信息网编辑
千家信息网最后更新 2024年11月22日,这篇文章主要介绍"Apache下Flink transformation的用法",在日常操作中,相信很多人在Apache下Flink transformation的用法问题上存在疑惑,小编查阅了各式资
千家信息网最后更新 2024年11月22日Apache下Flink transformation的用法

这篇文章主要介绍"Apache下Flink transformation的用法",在日常操作中,相信很多人在Apache下Flink transformation的用法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Apache下Flink transformation的用法"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

Map Function

Scala

新建一个Object

object DataSetTransformationApp {  def main(args: Array[String]): Unit = {    val environment = ExecutionEnvironment.getExecutionEnvironment  }  def mapFunction(env: ExecutionEnvironment): Unit = {    val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))  }}

这里的数据源是一个1到10的list集合。Map的原理是:假设data数据集中有N个元素,将每一个元素进行转化:

data.map { x => x.toInt }

好比:y=f(x)

    // 对data中的每一个元素都去做一个+1操作    data.map((x:Int) => x + 1 ).print()

然后对每一个元素都做一个+1操作。

简单写法:

如果这个里面只有一个元素,就可以直接写成下面形式:

data.map((x) => x + 1).print()

更简洁的写法:

data.map(x => x + 1).print()

更简洁的方法:

data.map(_ + 1).print()

输出结果:

234567891011

Java

    public static void main(String[] args) throws Exception {        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();        mapFunction(executionEnvironment);    }    public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception {        List list = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            list.add(i + "");        }        DataSource data = executionEnvironment.fromCollection(list);        data.map(new MapFunction() {            public Integer map(String input) {                return Integer.parseInt(input) + 1;            }        }).print();    }

因为我们定义的List是一个String的泛型,因此MapFunction的泛型是,第一个参数表示输入的类型,第二个参数表示输出是一个Integer类型。

Filter Function

将每个元素执行+1操作,并取出大于5的元素。

Scala

  def filterFunction(env: ExecutionEnvironment): Unit = {    val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))    data.map(_ + 1).filter(_ > 5).print()  }

filter只会返回满足条件的记录。

Java

    public static void filterFunction(ExecutionEnvironment env) throws Exception {        List list = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            list.add(i);        }        DataSource data = env.fromCollection(list);        data.map(new MapFunction() {            public Integer map(Integer input) {                return input + 1;            }        }).filter(new FilterFunction() {            @Override            public boolean filter(Integer input) throws Exception {                return input > 5;            }        }).print();    }

MapPartition Function

map function 与 MapPartition function有什么区别?

需求:DataSource 中有100个元素,把结果存储在数据库中

如果使用map function ,那么实现方法如下:

  // DataSource 中有100个元素,把结果存储在数据库中  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {    val students = new ListBuffer[String]    for (i <- 1 to 100) {      students.append("Student" + i)    }    val data = env.fromCollection(students)    data.map(x=>{      // 每一个元素要存储到数据库中去,肯定需要先获取到connection      val connection = DBUtils.getConnection()      println(connection + " ... ")      // TODO .... 保存数据到DB      DBUtils.returnConnection(connection)    }).print()  }

打印结果,将会打印100个获取DBUtils.getConnection()的请求。如果数据量增多,显然不停的获取连接是不现实的。

因此MapPartition就应运而生了,转换一个分区里面的数据,也就是说一个分区中的数据调用一次。

因此要首先设置分区:

val data = env.fromCollection(students).setParallelism(4)

设置4个分区,也就是并行度,然后使用mapPartition来处理:

data.mapPartition(x => {      val connection = DBUtils.getConnection()      println(connection + " ... ")      // TODO .... 保存数据到DB      DBUtils.returnConnection(connection)      x    }).print()

那么就会的到4次连接请求,每一个分区获取一个connection。

Java

public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List list = new ArrayList<>();        for (int i = 1; i <= 100; i++) {            list.add("student:" + i);        }        DataSource data = env.fromCollection(list);        /*data.map(new MapFunction() {            @Override            public String map(String input) throws Exception {                String connection = DBUtils.getConnection();                System.out.println("connection = [" + connection + "]");                DBUtils.returnConnection(connection);                return input;            }        }).print();*/        data.mapPartition(new MapPartitionFunction() {            @Override            public void mapPartition(Iterable values, Collector out) throws Exception {                String connection = DBUtils.getConnection();                System.out.println("connection = [" + connection + "]");                DBUtils.returnConnection(connection);            }        }).print();    }

first groupBy sortGroup

Scala

first表示获取前几个,groupBy表示分组,sortGroup表示分组内排序

def firstFunction(env:ExecutionEnvironment): Unit = {    val info = ListBuffer[(Int, String)]()    info.append((1, "hadoop"))    info.append((1, "spark"))    info.append((1, "flink"))    info.append((2, "java"))    info.append((2, "springboot"))    info.append((3, "linux"))    info.append((4, "vue"))    val data = env.fromCollection(info)    data.first(3).print()    //输出:(1,hadoop)    //(1,spark)    //(1,flink)    data.groupBy(0).first(2).print()//根据第一个字段分组,每个分组获取前两个数据    //(3,linux)    //(1,hadoop)    //(1,spark)    //(2,java)    //(2,springboot)    //(4,vue)    data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根据第一个字段分组,然后在分组内根据第二个字段升序排序,并取出前两个数据    //输出(3,linux)    //(1,flink)    //(1,hadoop)    //(2,java)    //(2,springboot)    //(4,vue)  }

Java

    public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = new ArrayList<>();        info.add(new Tuple2<>(1, "hadoop"));        info.add(new Tuple2<>(1, "spark"));        info.add(new Tuple2<>(1, "flink"));        info.add(new Tuple2<>(2, "java"));        info.add(new Tuple2<>(2, "springboot"));        info.add(new Tuple2<>(3, "linux"));        info.add(new Tuple2<>(4, "vue"));        DataSource> data = env.fromCollection(info);        data.first(3).print();        data.groupBy(0).first(2).print();        data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();    }

FlatMap Function

获取一个元素,然后产生0个、1个或多个元素

Scala

  def flatMapFunction(env: ExecutionEnvironment): Unit = {    val info = ListBuffer[(String)]()    info.append("hadoop,spark");    info.append("hadoop,flink");    info.append("flink,flink");    val data = env.fromCollection(info)    data.flatMap(_.split(",")).print()  }

输出:

hadoopsparkhadoopflinkflinkflink

FlatMap将每个元素都用逗号分割,然后变成多个。

经典例子:

data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()

将每个元素用逗号分割,然后每个元素做map,然后根据第一个字段分组,然后根据第二个字段求和。

输出结果如下:

(hadoop,2)(flink,3)(spark,1)

Java

同样实现一个经典案例wordcount

public static void flatMapFunction(ExecutionEnvironment env) throws Exception {        List info = new ArrayList<>();        info.add("hadoop,spark");        info.add("hadoop,flink");        info.add("flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap(new FlatMapFunction() {            @Override            public void flatMap(String input, Collector out) throws Exception {                String[] splits = input.split(",");                for(String split: splits) {                    //发送出去                    out.collect(split);                }            }        }).map(new MapFunction>() {            @Override            public Tuple2 map(String value) throws Exception {                return new Tuple2<>(value,1);            }        }).groupBy(0).sum(1).print();    }

Distinct

去重操作

Scala

  def distinctFunction(env: ExecutionEnvironment): Unit = {    val info = ListBuffer[(String)]()    info.append("hadoop,spark");    info.append("hadoop,flink");    info.append("flink,flink");    val data = env.fromCollection(info)    data.flatMap(_.split(",")).distinct().print()  }

这样就将每一个元素都做了去重操作。输出如下:

hadoopflinkspark

Java

    public static void distinctFunction(ExecutionEnvironment env) throws Exception {        List info = new ArrayList<>();        info.add("hadoop,spark");        info.add("hadoop,flink");        info.add("flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap(new FlatMapFunction() {            @Override            public void flatMap(String input, Collector out) throws Exception {                String[] splits = input.split(",");                for(String split: splits) {                    //发送出去                    out.collect(split);                }            }        }).distinct().print();    }

Join

Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.

result = input1.join(input2)               .where(0)       // key of the first input (tuple field 0)               .equalTo(1);    // key of the second input (tuple field 1)

表示第一个tuple input1中的第0个字段,与第二个tuple input2中的第一个字段进行join。

  def joinFunction(env: ExecutionEnvironment): Unit = {    val info1 = ListBuffer[(Int, String)]() //编号 名字    info1.append((1, "hadoop"))    info1.append((2, "spark"))    info1.append((3, "flink"))    info1.append((4, "java"))    val info2 = ListBuffer[(Int, String)]() //编号 城市    info2.append((1, "北京"))    info2.append((2, "上海"))    info2.append((3, "深圳"))    info2.append((5, "广州"))    val data1 = env.fromCollection(info1)    val data2 = env.fromCollection(info2)    data1.join(data2).where(0).equalTo(0).apply((first, second)=>{      (first._1, first._2, second._2)    }).print()  }

输出结果如下:

(3,flink,深圳)(1,hadoop,北京)(2,spark,上海)

Java

    public static void joinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = new ArrayList<>(); //编号 名字        info1.add(new Tuple2<>(1, "hadoop"));        info1.add(new Tuple2<>(2, "spark"));        info1.add(new Tuple2<>(3, "flink"));        info1.add(new Tuple2<>(4, "java"));        List> info2 = new ArrayList<>(); //编号 城市        info2.add(new Tuple2<>(1, "北京"));        info2.add(new Tuple2<>(2, "上海"));        info2.add(new Tuple2<>(3, "深圳"));        info2.add(new Tuple2<>(5, "广州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.join(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {            @Override            public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {                return new Tuple3(first.f0, first.f1,second.f1);            }        }).print();    }

Tuple2, Tuple2表示两个输入的集合,Tuple3>表示输出的Tuple3

OuterJoin

上面讲的join是内连接,这个OuterJoin是外连接,包括左外连接,右外连接,全连接在两个数据集上。

def outJoinFunction(env: ExecutionEnvironment): Unit = {    val info1 = ListBuffer[(Int, String)]() //编号 名字    info1.append((1, "hadoop"))    info1.append((2, "spark"))    info1.append((3, "flink"))    info1.append((4, "java"))    val info2 = ListBuffer[(Int, String)]() //编号 城市    info2.append((1, "北京"))    info2.append((2, "上海"))    info2.append((3, "深圳"))    info2.append((5, "广州"))    val data1 = env.fromCollection(info1)    val data2 = env.fromCollection(info2)    data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {      if (second == null) {        (first._1, first._2, "-")      }else {        (first._1, first._2, second._2)      }    }).print() //左外连接 把左边的所有数据展示出来  }

左外连接,当左边的数据在右边没有对应的数据时,需要进行处理,否则会出现空指针异常。输出如下:

(3,flink,深圳)(1,hadoop,北京)(2,spark,上海)(4,java,-)

右外连接:

    data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {      if (first == null) {        (second._1, "-", second._2)      }else {        (first._1, first._2, second._2)      }    }).print()

右外连接,输出:

(3,flink,深圳)(1,hadoop,北京)(5,-,广州)(2,spark,上海)

全连接:

    data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => {      if (first == null) {        (second._1, "-", second._2)      }else if (second == null){        (second._1, "-", second._2)      } else {        (first._1, first._2, second._2)      }    }).print()
(3,flink,深圳)(1,hadoop,北京)(5,-,广州)(2,spark,上海)(4,java,-)

Java

左外连接:

    public static void outjoinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = new ArrayList<>(); //编号 名字        info1.add(new Tuple2<>(1, "hadoop"));        info1.add(new Tuple2<>(2, "spark"));        info1.add(new Tuple2<>(3, "flink"));        info1.add(new Tuple2<>(4, "java"));        List> info2 = new ArrayList<>(); //编号 城市        info2.add(new Tuple2<>(1, "北京"));        info2.add(new Tuple2<>(2, "上海"));        info2.add(new Tuple2<>(3, "深圳"));        info2.add(new Tuple2<>(5, "广州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {            @Override            public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {                if(second == null) {                    return new Tuple3(first.f0, first.f1, "-");                }                return new Tuple3(first.f0, first.f1,second.f1);            }        }).print();    }

右外连接:

        data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {            @Override            public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {                if (first == null) {                    return new Tuple3(second.f0, "-", second.f1);                }                return new Tuple3(first.f0, first.f1, second.f1);            }        }).print();

全连接:

        data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2, Tuple3>() {            @Override            public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {                if (first == null) {                    return new Tuple3(second.f0, "-", second.f1);                } else if (second == null) {                    return new Tuple3(first.f0, first.f1, "-");                }                return new Tuple3(first.f0, first.f1, second.f1);            }        }).print();

cross function

Scala

笛卡尔积,左边与右边交叉处理

  def crossFunction(env: ExecutionEnvironment): Unit = {    val info1 = List("乔峰", "慕容复")    val info2 = List(3,1,0)    val data1 = env.fromCollection(info1)    val data2 = env.fromCollection(info2)    data1.cross(data2).print()  }

输出:

(乔峰,3)(乔峰,1)(乔峰,0)(慕容复,3)(慕容复,1)(慕容复,0)

Java

public static void crossFunction(ExecutionEnvironment env) throws Exception {        List info1 = new ArrayList<>();        info1.add("乔峰");        info1.add("慕容复");        List info2 = new ArrayList<>();        info2.add("3");        info2.add("1");        info2.add("0");        DataSource data1 = env.fromCollection(info1);        DataSource data2 = env.fromCollection(info2);        data1.cross(data2).print();    }

到此,关于"Apache下Flink transformation的用法"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0