Apache下Flink transformation的用法
这篇文章主要介绍"Apache下Flink transformation的用法",在日常操作中,相信很多人在Apache下Flink transformation的用法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Apache下Flink transformation的用法"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
Map Function
Scala
新建一个Object
object DataSetTransformationApp { def main(args: Array[String]): Unit = { val environment = ExecutionEnvironment.getExecutionEnvironment } def mapFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10)) }}
这里的数据源是一个1到10的list集合。Map的原理是:假设data数据集中有N个元素,将每一个元素进行转化:
data.map { x => x.toInt }
好比:y=f(x)
// 对data中的每一个元素都去做一个+1操作 data.map((x:Int) => x + 1 ).print()
然后对每一个元素都做一个+1操作。
简单写法:
如果这个里面只有一个元素,就可以直接写成下面形式:
data.map((x) => x + 1).print()
更简洁的写法:
data.map(x => x + 1).print()
更简洁的方法:
data.map(_ + 1).print()
输出结果:
234567891011
Java
public static void main(String[] args) throws Exception { ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); mapFunction(executionEnvironment); } public static void mapFunction(ExecutionEnvironment executionEnvironment) throws Exception { Listlist = new ArrayList<>(); for (int i = 1; i <= 10; i++) { list.add(i + ""); } DataSource data = executionEnvironment.fromCollection(list); data.map(new MapFunction () { public Integer map(String input) { return Integer.parseInt(input) + 1; } }).print(); }
因为我们定义的List是一个String的泛型,因此MapFunction的泛型是
Filter Function
将每个元素执行+1操作,并取出大于5的元素。
Scala
def filterFunction(env: ExecutionEnvironment): Unit = { val data = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) data.map(_ + 1).filter(_ > 5).print() }
filter只会返回满足条件的记录。
Java
public static void filterFunction(ExecutionEnvironment env) throws Exception { Listlist = new ArrayList<>(); for (int i = 1; i <= 10; i++) { list.add(i); } DataSource data = env.fromCollection(list); data.map(new MapFunction () { public Integer map(Integer input) { return input + 1; } }).filter(new FilterFunction () { @Override public boolean filter(Integer input) throws Exception { return input > 5; } }).print(); }
MapPartition Function
map function 与 MapPartition function有什么区别?
需求:DataSource 中有100个元素,把结果存储在数据库中
如果使用map function ,那么实现方法如下:
// DataSource 中有100个元素,把结果存储在数据库中 def mapPartitionFunction(env: ExecutionEnvironment): Unit = { val students = new ListBuffer[String] for (i <- 1 to 100) { students.append("Student" + i) } val data = env.fromCollection(students) data.map(x=>{ // 每一个元素要存储到数据库中去,肯定需要先获取到connection val connection = DBUtils.getConnection() println(connection + " ... ") // TODO .... 保存数据到DB DBUtils.returnConnection(connection) }).print() }
打印结果,将会打印100个获取DBUtils.getConnection()的请求。如果数据量增多,显然不停的获取连接是不现实的。
因此MapPartition就应运而生了,转换一个分区里面的数据,也就是说一个分区中的数据调用一次。
因此要首先设置分区:
val data = env.fromCollection(students).setParallelism(4)
设置4个分区,也就是并行度,然后使用mapPartition来处理:
data.mapPartition(x => { val connection = DBUtils.getConnection() println(connection + " ... ") // TODO .... 保存数据到DB DBUtils.returnConnection(connection) x }).print()
那么就会的到4次连接请求,每一个分区获取一个connection。
Java
public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception { Listlist = new ArrayList<>(); for (int i = 1; i <= 100; i++) { list.add("student:" + i); } DataSource data = env.fromCollection(list); /*data.map(new MapFunction () { @Override public String map(String input) throws Exception { String connection = DBUtils.getConnection(); System.out.println("connection = [" + connection + "]"); DBUtils.returnConnection(connection); return input; } }).print();*/ data.mapPartition(new MapPartitionFunction () { @Override public void mapPartition(Iterable values, Collector
first groupBy sortGroup
Scala
first表示获取前几个,groupBy表示分组,sortGroup表示分组内排序
def firstFunction(env:ExecutionEnvironment): Unit = { val info = ListBuffer[(Int, String)]() info.append((1, "hadoop")) info.append((1, "spark")) info.append((1, "flink")) info.append((2, "java")) info.append((2, "springboot")) info.append((3, "linux")) info.append((4, "vue")) val data = env.fromCollection(info) data.first(3).print() //输出:(1,hadoop) //(1,spark) //(1,flink) data.groupBy(0).first(2).print()//根据第一个字段分组,每个分组获取前两个数据 //(3,linux) //(1,hadoop) //(1,spark) //(2,java) //(2,springboot) //(4,vue) data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print() //根据第一个字段分组,然后在分组内根据第二个字段升序排序,并取出前两个数据 //输出(3,linux) //(1,flink) //(1,hadoop) //(2,java) //(2,springboot) //(4,vue) }
Java
public static void firstFunction(ExecutionEnvironment env) throws Exception { List> info = new ArrayList<>(); info.add(new Tuple2<>(1, "hadoop")); info.add(new Tuple2<>(1, "spark")); info.add(new Tuple2<>(1, "flink")); info.add(new Tuple2<>(2, "java")); info.add(new Tuple2<>(2, "springboot")); info.add(new Tuple2<>(3, "linux")); info.add(new Tuple2<>(4, "vue")); DataSource > data = env.fromCollection(info); data.first(3).print(); data.groupBy(0).first(2).print(); data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print(); }
FlatMap Function
获取一个元素,然后产生0个、1个或多个元素
Scala
def flatMapFunction(env: ExecutionEnvironment): Unit = { val info = ListBuffer[(String)]() info.append("hadoop,spark"); info.append("hadoop,flink"); info.append("flink,flink"); val data = env.fromCollection(info) data.flatMap(_.split(",")).print() }
输出:
hadoopsparkhadoopflinkflinkflink
FlatMap将每个元素都用逗号分割,然后变成多个。
经典例子:
data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()
将每个元素用逗号分割,然后每个元素做map,然后根据第一个字段分组,然后根据第二个字段求和。
输出结果如下:
(hadoop,2)(flink,3)(spark,1)
Java
同样实现一个经典案例wordcount
public static void flatMapFunction(ExecutionEnvironment env) throws Exception { Listinfo = new ArrayList<>(); info.add("hadoop,spark"); info.add("hadoop,flink"); info.add("flink,flink"); DataSource data = env.fromCollection(info); data.flatMap(new FlatMapFunction () { @Override public void flatMap(String input, Collector out) throws Exception { String[] splits = input.split(","); for(String split: splits) { //发送出去 out.collect(split); } } }).map(new MapFunction >() { @Override public Tuple2 map(String value) throws Exception { return new Tuple2<>(value,1); } }).groupBy(0).sum(1).print(); }
Distinct
去重操作
Scala
def distinctFunction(env: ExecutionEnvironment): Unit = { val info = ListBuffer[(String)]() info.append("hadoop,spark"); info.append("hadoop,flink"); info.append("flink,flink"); val data = env.fromCollection(info) data.flatMap(_.split(",")).distinct().print() }
这样就将每一个元素都做了去重操作。输出如下:
hadoopflinkspark
Java
public static void distinctFunction(ExecutionEnvironment env) throws Exception { Listinfo = new ArrayList<>(); info.add("hadoop,spark"); info.add("hadoop,flink"); info.add("flink,flink"); DataSource data = env.fromCollection(info); data.flatMap(new FlatMapFunction () { @Override public void flatMap(String input, Collector out) throws Exception { String[] splits = input.split(","); for(String split: splits) { //发送出去 out.collect(split); } } }).distinct().print(); }
Join
Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element, or a FlatJoinFunction to turn the pair of elements into arbitrarily many (including none) elements. See the keys section to learn how to define join keys.
result = input1.join(input2) .where(0) // key of the first input (tuple field 0) .equalTo(1); // key of the second input (tuple field 1)
表示第一个tuple input1中的第0个字段,与第二个tuple input2中的第一个字段进行join。
def joinFunction(env: ExecutionEnvironment): Unit = { val info1 = ListBuffer[(Int, String)]() //编号 名字 info1.append((1, "hadoop")) info1.append((2, "spark")) info1.append((3, "flink")) info1.append((4, "java")) val info2 = ListBuffer[(Int, String)]() //编号 城市 info2.append((1, "北京")) info2.append((2, "上海")) info2.append((3, "深圳")) info2.append((5, "广州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.join(data2).where(0).equalTo(0).apply((first, second)=>{ (first._1, first._2, second._2) }).print() }
输出结果如下:
(3,flink,深圳)(1,hadoop,北京)(2,spark,上海)
Java
public static void joinFunction(ExecutionEnvironment env) throws Exception { List> info1 = new ArrayList<>(); //编号 名字 info1.add(new Tuple2<>(1, "hadoop")); info1.add(new Tuple2<>(2, "spark")); info1.add(new Tuple2<>(3, "flink")); info1.add(new Tuple2<>(4, "java")); List > info2 = new ArrayList<>(); //编号 城市 info2.add(new Tuple2<>(1, "北京")); info2.add(new Tuple2<>(2, "上海")); info2.add(new Tuple2<>(3, "深圳")); info2.add(new Tuple2<>(5, "广州")); DataSource > data1 = env.fromCollection(info1); DataSource > data2 = env.fromCollection(info2); data1.join(data2).where(0).equalTo(0).with(new JoinFunction , Tuple2 , Tuple3 >() { @Override public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception { return new Tuple3 (first.f0, first.f1,second.f1); } }).print(); }
Tuple2
OuterJoin
上面讲的join是内连接,这个OuterJoin是外连接,包括左外连接,右外连接,全连接在两个数据集上。
def outJoinFunction(env: ExecutionEnvironment): Unit = { val info1 = ListBuffer[(Int, String)]() //编号 名字 info1.append((1, "hadoop")) info1.append((2, "spark")) info1.append((3, "flink")) info1.append((4, "java")) val info2 = ListBuffer[(Int, String)]() //编号 城市 info2.append((1, "北京")) info2.append((2, "上海")) info2.append((3, "深圳")) info2.append((5, "广州")) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (second == null) { (first._1, first._2, "-") }else { (first._1, first._2, second._2) } }).print() //左外连接 把左边的所有数据展示出来 }
左外连接,当左边的数据在右边没有对应的数据时,需要进行处理,否则会出现空指针异常。输出如下:
(3,flink,深圳)(1,hadoop,北京)(2,spark,上海)(4,java,-)
右外连接:
data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (first == null) { (second._1, "-", second._2) }else { (first._1, first._2, second._2) } }).print()
右外连接,输出:
(3,flink,深圳)(1,hadoop,北京)(5,-,广州)(2,spark,上海)
全连接:
data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first, second) => { if (first == null) { (second._1, "-", second._2) }else if (second == null){ (second._1, "-", second._2) } else { (first._1, first._2, second._2) } }).print()
(3,flink,深圳)(1,hadoop,北京)(5,-,广州)(2,spark,上海)(4,java,-)
Java
左外连接:
public static void outjoinFunction(ExecutionEnvironment env) throws Exception { List> info1 = new ArrayList<>(); //编号 名字 info1.add(new Tuple2<>(1, "hadoop")); info1.add(new Tuple2<>(2, "spark")); info1.add(new Tuple2<>(3, "flink")); info1.add(new Tuple2<>(4, "java")); List > info2 = new ArrayList<>(); //编号 城市 info2.add(new Tuple2<>(1, "北京")); info2.add(new Tuple2<>(2, "上海")); info2.add(new Tuple2<>(3, "深圳")); info2.add(new Tuple2<>(5, "广州")); DataSource > data1 = env.fromCollection(info1); DataSource > data2 = env.fromCollection(info2); data1.leftOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction , Tuple2 , Tuple3 >() { @Override public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception { if(second == null) { return new Tuple3 (first.f0, first.f1, "-"); } return new Tuple3 (first.f0, first.f1,second.f1); } }).print(); }
右外连接:
data1.rightOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2 , Tuple3 >() { @Override public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception { if (first == null) { return new Tuple3 (second.f0, "-", second.f1); } return new Tuple3 (first.f0, first.f1, second.f1); } }).print();
全连接:
data1.fullOuterJoin(data2).where(0).equalTo(0).with(new JoinFunction, Tuple2 , Tuple3 >() { @Override public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception { if (first == null) { return new Tuple3 (second.f0, "-", second.f1); } else if (second == null) { return new Tuple3 (first.f0, first.f1, "-"); } return new Tuple3 (first.f0, first.f1, second.f1); } }).print();
cross function
Scala
笛卡尔积,左边与右边交叉处理
def crossFunction(env: ExecutionEnvironment): Unit = { val info1 = List("乔峰", "慕容复") val info2 = List(3,1,0) val data1 = env.fromCollection(info1) val data2 = env.fromCollection(info2) data1.cross(data2).print() }
输出:
(乔峰,3)(乔峰,1)(乔峰,0)(慕容复,3)(慕容复,1)(慕容复,0)
Java
public static void crossFunction(ExecutionEnvironment env) throws Exception { Listinfo1 = new ArrayList<>(); info1.add("乔峰"); info1.add("慕容复"); List info2 = new ArrayList<>(); info2.add("3"); info2.add("1"); info2.add("0"); DataSource data1 = env.fromCollection(info1); DataSource data2 = env.fromCollection(info2); data1.cross(data2).print(); }
到此,关于"Apache下Flink transformation的用法"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!