千家信息网

Flink技术的使用方法有哪些

发表于:2024-11-26 作者:千家信息网编辑
千家信息网最后更新 2024年11月26日,本篇内容介绍了"Flink技术的使用方法有哪些"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!首先先拉
千家信息网最后更新 2024年11月26日Flink技术的使用方法有哪些

本篇内容介绍了"Flink技术的使用方法有哪些"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

首先先拉取Flink的样例代码

mvn archetype:generate                               \      -DarchetypeGroupId=org.apache.flink              \      -DarchetypeArtifactId=flink-quickstart-java      \      -DarchetypeVersion=1.7.2                         \      -DarchetypeCatalog=local

实现从文件读取的批处理

建立一个hello.txt,文件内容如下

hello world welcome
hello welcome

统计词频

public class BatchJavaApp {public static void main(String[] args) throws Exception {        String input = "/Users/admin/Downloads/flink/data/hello.txt";        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        DataSource text = env.readTextFile(input);        text.print();        text.flatMap(new FlatMapFunction>() {@Override            public void flatMap(String value, Collector> collector) throws Exception {                String[] tokens = value.toLowerCase().split(" ");                for (String token : tokens) {                    collector.collect(new Tuple2<>(token,1));                }            }        }).groupBy(0).sum(1).print();    }}

运行结果(日志省略)

hello welcomehello world welcome(world,1)(hello,2)(welcome,2)

纯Java实现

文件读取类

public class FileOperation {/**     * 读取文件名称为filename中的内容,并将其中包含的所有词语放进words中     * @param filename     * @param words     * @return     */    public static boolean readFile(String filename, List words) {if (filename == null || words == null) {            System.out.println("filename为空或words为空");            return false;        }        Scanner scanner;        try {            File file = new File(filename);            if (file.exists()) {                FileInputStream fis = new FileInputStream(file);                scanner = new Scanner(new BufferedInputStream(fis),"UTF-8");                scanner.useLocale(Locale.ENGLISH);            }else {return false;            }        } catch (FileNotFoundException e) {            System.out.println("无法打开" + filename);            return false;        }//简单分词        if (scanner.hasNextLine()) {            String contents = scanner.useDelimiter("\\A").next();            int start = firstCharacterIndex(contents,0);            for (int i = start + 1;i <= contents.length();) {if (i == contents.length() || !Character.isLetter(contents.charAt(i))) {                    String word = contents.substring(start,i).toLowerCase();                    words.add(word);                    start = firstCharacterIndex(contents,i);                    i = start + 1;                }else {                    i++;                }            }        }return true;    }private static int firstCharacterIndex(String s,int start) {for (int i = start;i < s.length();i++) {if (Character.isLetter(s.charAt(i))) {return i;            }        }return s.length();    }}
public class BatchJavaOnly {public static void main(String[] args) {        String input = "/Users/admin/Downloads/flink/data/hello.txt";        List list = new ArrayList<>();        FileOperation.readFile(input,list);        System.out.println(list);        Map map = new HashMap<>();        list.forEach(token -> {if (map.containsKey(token)) {map.put(token,map.get(token) + 1);            }else {map.put(token,1);            }        });        map.entrySet().stream().map(entry -> new Tuple2<>(entry.getKey(),entry.getValue()))                .forEach(System.out::println);    }}

运行结果

[hello, world, welcome, hello, welcome](world,1)(hello,2)(welcome,2)

Scala代码

拉取Scala样例代码

mvn archetype:generate                               \      -DarchetypeGroupId=org.apache.flink              \      -DarchetypeArtifactId=flink-quickstart-scala     \      -DarchetypeVersion=1.7.2                         \      -DarchetypeCatalog=local

安装好Scala插件和配置好Scala SDK后

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object BatchScalaApp {  def main(args: Array[String]): Unit = {val input = "/Users/admin/Downloads/flink/data/hello.txt"    val env = ExecutionEnvironment.getExecutionEnvironment    val text = env.readTextFile(input)    text.flatMap(_.toLowerCase.split(" "))      .filter(_.nonEmpty)      .map((_,1))      .groupBy(0)      .sum(1)      .print()  }}

运行结果(省略日志)

(world,1)(hello,2)(welcome,2)

Scala基础内容请参考Scala入门篇 Scala入门之面向对象

从网络传输的流式处理

public class StreamingJavaApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource text = env.socketTextStream("127.0.0.1",9999);        text.flatMap(new FlatMapFunction>() {@Override            public void flatMap(String value, Collector> collector) throws Exception {                String[] tokens = value.toLowerCase().split(" ");                for (String token : tokens) {                    collector.collect(new Tuple2<>(token,1));                }            }        }).keyBy(0).timeWindow(Time.seconds(5))                .sum(1).print();        env.execute("StreamingJavaApp");    }}

运行前打开端口

nc -lk 9999

运行代码,在nc命令输入a a c d b c e e f a

运行结果(日志省略)

1> (e,2)9> (d,1)11> (a,3)3> (b,1)4> (f,1)8> (c,2)

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.api.scala._object StreamScalaApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    val text = env.socketTextStream("127.0.0.1",9999)    text.flatMap(_.split(" "))      .map((_,1))      .keyBy(0)      .timeWindow(Time.seconds(5))      .sum(1)      .print()      .setParallelism(1)    env.execute("StreamScalaApp")  }}

运行结果(省略日志)

(c,2)(b,1)(d,1)(f,1)(e,2)(a,3)

现在我们将元组改成实体类

public class StreamObjJavaApp {@AllArgsConstructor    @Data    @ToString    @NoArgsConstructor    public static class WordCount {private String word;        private int count;    }public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource text = env.socketTextStream("127.0.0.1",9999);        text.flatMap(new FlatMapFunction() {@Override            public void flatMap(String value, Collector collector) throws Exception {                String[] tokens = value.toLowerCase().split(" ");                for (String token : tokens) {                    collector.collect(new WordCount(token,1));                }            }        }).keyBy("word").timeWindow(Time.seconds(5))                .sum("count").print();        env.execute("StreamingJavaApp");    }}

运行结果

4> StreamObjJavaApp.WordCount(word=f, count=1)11> StreamObjJavaApp.WordCount(word=a, count=3)8> StreamObjJavaApp.WordCount(word=c, count=2)1> StreamObjJavaApp.WordCount(word=e, count=2)9> StreamObjJavaApp.WordCount(word=d, count=1)3> StreamObjJavaApp.WordCount(word=b, count=1)

当然我们也可以这么写

public class StreamObjJavaApp {@AllArgsConstructor    @Data    @ToString    @NoArgsConstructor    public static class WordCount {private String word;        private int count;    }public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource text = env.socketTextStream("127.0.0.1",9999);        text.flatMap(new FlatMapFunction() {@Override            public void flatMap(String value, Collector collector) throws Exception {                String[] tokens = value.toLowerCase().split(" ");                for (String token : tokens) {                    collector.collect(new WordCount(token,1));                }            }        }).keyBy(WordCount::getWord).timeWindow(Time.seconds(5))                .sum("count").print().setParallelism(1);        env.execute("StreamingJavaApp");    }}

keyBy里面是一个函数式接口KeySelector

@Public@FunctionalInterfacepublic interface KeySelector extends Function, Serializable {   KEY getKey(IN value) throws Exception;}

flatMap的lambda表达式写法,比较繁琐,不如匿名类的写法

public class StreamObjJavaApp {@AllArgsConstructor    @Data    @ToString    @NoArgsConstructor    public static class WordCount {private String word;        private int count;    }public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource text = env.socketTextStream("127.0.0.1",9999);        text.flatMap((FlatMapFunction)(value,collector) -> {            String[] tokens = value.toLowerCase().split(" ");            for (String token : tokens) {                collector.collect(new WordCount(token,1));            }        }).returns(WordCount.class)                .keyBy(WordCount::getWord).timeWindow(Time.seconds(5))                .sum("count").print().setParallelism(1);        env.execute("StreamingJavaApp");    }}

flatMap还可以使用RichFlatMapFunction抽象类

public class StreamObjJavaApp {@AllArgsConstructor    @Data    @ToString    @NoArgsConstructor    public static class WordCount {private String word;        private int count;    }public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource text = env.socketTextStream("127.0.0.1",9999);        text.flatMap(new RichFlatMapFunction() {@Override            public void flatMap(String value, Collector collector) throws Exception {                String[] tokens = value.toLowerCase().split(" ");                for (String token : tokens) {                    collector.collect(new WordCount(token,1));                }            }        }).keyBy(WordCount::getWord).timeWindow(Time.seconds(5))                .sum("count").print().setParallelism(1);        env.execute("StreamingJavaApp");    }}

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.api.scala._object StreamObjScalaApp {  case class WordCount(word: String,count: Int)  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    val text = env.socketTextStream("127.0.0.1",9999)    text.flatMap(_.split(" "))      .map(WordCount(_,1))      .keyBy("word")      .timeWindow(Time.seconds(5))      .sum("count")      .print()      .setParallelism(1)    env.execute("StreamScalaApp")  }}

运行结果(省略日志)

WordCount(b,1)WordCount(d,1)WordCount(e,2)WordCount(f,1)WordCount(a,3)WordCount(c,2)

数据源

从集合获取数据

public class DataSetDataSourceApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        fromCollection(env);    }public static void fromCollection(ExecutionEnvironment env) throws Exception {        List list = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            list.add(i);        }        env.fromCollection(list).print();    }}

运行结果(省略日志)

12345678910

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object DataSetDataSourceApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment    fromCollection(env)  }  def fromCollection(env: ExecutionEnvironment): Unit = {val data = 1 to 10    env.fromCollection(data).print()  }}

运行结果(省略日志)

12345678910

从文件获取数据

public class DataSetDataSourceApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        fromCollection(env);        textFile(env);    }public static void textFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/hello.txt";        env.readTextFile(filePath).print();    }public static void fromCollection(ExecutionEnvironment env) throws Exception {        List list = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            list.add(i);        }        env.fromCollection(list).print();    }}

运行结果(省略日志)

hello welcomehello world welcome

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object DataSetDataSourceApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    fromCollection(env)    textFile(env)  }  def textFile(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/hello.txt"    env.readTextFile(filePath).print()  }  def fromCollection(env: ExecutionEnvironment): Unit = {val data = 1 to 10    env.fromCollection(data).print()  }}

运行结果(省略日志)

hello welcomehello world welcome

从csv文件获取数据

在data目录下新增一个people.csv,内容如下

name,age,jobJorge,30,DeveloperBob,32,Developer
public class DataSetDataSourceApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        fromCollection(env);//        textFile(env);        csvFile(env);    }public static void csvFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/people.csv";        env.readCsvFile(filePath).ignoreFirstLine()                .types(String.class,Integer.class,String.class)                .print();    }public static void textFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/hello.txt";        env.readTextFile(filePath).print();    }public static void fromCollection(ExecutionEnvironment env) throws Exception {        List list = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            list.add(i);        }        env.fromCollection(list).print();    }}

运行结果(省略日志)

(Bob,32,Developer)(Jorge,30,Developer)

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object DataSetDataSourceApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    fromCollection(env)//    textFile(env)    csvFile(env)  }  def csvFile(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/people.csv"    env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine = true).print()  }  def textFile(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/hello.txt"    env.readTextFile(filePath).print()  }  def fromCollection(env: ExecutionEnvironment): Unit = {val data = 1 to 10    env.fromCollection(data).print()  }}

运行结果(省略日志)

(Jorge,30,Developer)(Bob,32,Developer)

将结果放入实体类中

public class DataSetDataSourceApp {@AllArgsConstructor    @Data    @ToString    @NoArgsConstructor    public static class Case {private String name;        private Integer age;        private String job;    }public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        fromCollection(env);//        textFile(env);        csvFile(env);    }public static void csvFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/people.csv";//        env.readCsvFile(filePath).ignoreFirstLine()//                .types(String.class,Integer.class,String.class)//                .print();        env.readCsvFile(filePath).ignoreFirstLine()                .pojoType(Case.class,"name","age","job")                .print();    }public static void textFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/hello.txt";        env.readTextFile(filePath).print();    }public static void fromCollection(ExecutionEnvironment env) throws Exception {        List list = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            list.add(i);        }        env.fromCollection(list).print();    }}

运行结果(省略日志)

DataSetDataSourceApp.Case(name=Bob, age=32, job=Developer)DataSetDataSourceApp.Case(name=Jorge, age=30, job=Developer)

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object DataSetDataSourceApp {  case class Case(name: String,age: Int,job: String)  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    fromCollection(env)//    textFile(env)    csvFile(env)  }  def csvFile(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/people.csv"//    env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine = true).print()    env.readCsvFile[Case](filePath,ignoreFirstLine = true,includedFields = Array(0,1,2))      .print()  }  def textFile(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/hello.txt"    env.readTextFile(filePath).print()  }  def fromCollection(env: ExecutionEnvironment): Unit = {val data = 1 to 10    env.fromCollection(data).print()  }}

运行结果(省略日志)

Case(Bob,32,Developer)Case(Jorge,30,Developer)

获取递归文件夹

我们在data目录下新增两个文件夹1、2,将hello.txt分别拷贝进这两个文件夹

public class DataSetDataSourceApp {@AllArgsConstructor    @Data    @ToString    @NoArgsConstructor    public static class Case {private String name;        private Integer age;        private String job;    }public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        fromCollection(env);//        textFile(env);//        csvFile(env);        readRecursiveFile(env);    }public static void readRecursiveFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data";        Configuration parameters = new Configuration();        parameters.setBoolean("recursive.file.enumeration",true);        env.readTextFile(filePath).withParameters(parameters)                .print();    }public static void csvFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/people.csv";//        env.readCsvFile(filePath).ignoreFirstLine()//                .types(String.class,Integer.class,String.class)//                .print();        env.readCsvFile(filePath).ignoreFirstLine()                .pojoType(Case.class,"name","age","job")                .print();    }public static void textFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/hello.txt";        env.readTextFile(filePath).print();    }public static void fromCollection(ExecutionEnvironment env) throws Exception {        List list = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            list.add(i);        }        env.fromCollection(list).print();    }}

运行结果(省略日志)

hello world welcomehello world welcomehello welcomeJorge,30,Developername,age,jobhello world welcomehello welcomehello welcomeBob,32,Developer

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.configuration.Configurationobject DataSetDataSourceApp {  case class Case(name: String,age: Int,job: String)  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    fromCollection(env)//    textFile(env)//    csvFile(env)    readRecursiveFiles(env)  }  def readRecursiveFiles(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data"    val parameters = new Configuration    parameters.setBoolean("recursive.file.enumeration",true)    env.readTextFile(filePath).withParameters(parameters).print()  }  def csvFile(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/people.csv"//    env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine = true).print()    env.readCsvFile[Case](filePath,ignoreFirstLine = true,includedFields = Array(0,1,2))      .print()  }  def textFile(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/hello.txt"    env.readTextFile(filePath).print()  }  def fromCollection(env: ExecutionEnvironment): Unit = {val data = 1 to 10    env.fromCollection(data).print()  }}

运行结果(省略日志)

hello world welcomehello world welcomehello welcomeJorge,30,Developername,age,jobhello world welcomehello welcomehello welcomeBob,32,Developer

获取压缩文件

在data文件夹下新建一个文件夹3,并压缩hello.txt

gzip hello.txt

得到一个新的文件hello.txt.gz,将改文件放入3中

public class DataSetDataSourceApp {@AllArgsConstructor    @Data    @ToString    @NoArgsConstructor    public static class Case {private String name;        private Integer age;        private String job;    }public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        fromCollection(env);//        textFile(env);//        csvFile(env);//        readRecursiveFile(env);        readCompresssionFile(env);    }public static void readCompresssionFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/3";        env.readTextFile(filePath).print();    }public static void readRecursiveFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data";        Configuration parameters = new Configuration();        parameters.setBoolean("recursive.file.enumeration",true);        env.readTextFile(filePath).withParameters(parameters)                .print();    }public static void csvFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/people.csv";//        env.readCsvFile(filePath).ignoreFirstLine()//                .types(String.class,Integer.class,String.class)//                .print();        env.readCsvFile(filePath).ignoreFirstLine()                .pojoType(Case.class,"name","age","job")                .print();    }public static void textFile(ExecutionEnvironment env) throws Exception {        String filePath = "/Users/admin/Downloads/flink/data/hello.txt";        env.readTextFile(filePath).print();    }public static void fromCollection(ExecutionEnvironment env) throws Exception {        List list = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            list.add(i);        }        env.fromCollection(list).print();    }}

运行结果

hello world welcomehello welcome

flink支持的压缩格式有:.deflate,.gz,.gzip,.bz2,.xz

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.configuration.Configurationobject DataSetDataSourceApp {  case class Case(name: String,age: Int,job: String)  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    fromCollection(env)//    textFile(env)//    csvFile(env)//    readRecursiveFiles(env)    readCompressionFiles(env)  }  def readCompressionFiles(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/3"    env.readTextFile(filePath).print()  }  def readRecursiveFiles(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data"    val parameters = new Configuration    parameters.setBoolean("recursive.file.enumeration",true)    env.readTextFile(filePath).withParameters(parameters).print()  }  def csvFile(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/people.csv"//    env.readCsvFile[(String,Int,String)](filePath,ignoreFirstLine = true).print()    env.readCsvFile[Case](filePath,ignoreFirstLine = true,includedFields = Array(0,1,2))      .print()  }  def textFile(env: ExecutionEnvironment): Unit = {val filePath = "/Users/admin/Downloads/flink/data/hello.txt"    env.readTextFile(filePath).print()  }  def fromCollection(env: ExecutionEnvironment): Unit = {val data = 1 to 10    env.fromCollection(data).print()  }}

运行结果

hello world welcomehello welcome

算子

map算子

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        mapFunction(env);    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

234567891011

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment    mapFunction(env)  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

234567891011

filter算子

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);        filterFunction(env);    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

67891011

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)    filterFunction(env)  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

67891011

mapPartition算子

按照并行度来分区返回结果

模拟一个数据库连接的工具类

public class DBUntils {public static int getConnection() {return new Random().nextInt(10);    }public static void returnConnection(int connection) {    }}
public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);        mapPartitionFunction(env);    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志,点号代表上面还有很多数字,横线上方总共有100个)

....5403-----------5503

Scala代码

import scala.util.Randomobject DBUntils {  def getConnection(): Int = {new Random().nextInt(10)  }  def returnConnection(connection: Int): Unit = {  }}
import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)    mapPartitionFunction(env)  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果

....5403-----------5503

first算子

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);        firstFunction(env);    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.first(3).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(1,Hadoop)(1,Spark)(1,Flink)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)    firstFunction(env)  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.first(3).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(1,Hadoop)(1,Spark)(1,Flink)

分组取前两条

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);        firstFunction(env);    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(3,Linux)(1,Hadoop)(1,Spark)(4,VUE)(2,Java)(2,Spring boot)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)    firstFunction(env)  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(3,Linux)(1,Hadoop)(1,Spark)(4,VUE)(2,Java)(2,Spring boot)

分组以后按升序取前两条

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);        firstFunction(env);    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.ASCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(3,Linux)(1,Flink)(1,Hadoop)(4,VUE)(2,Java)(2,Spring boot)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)    firstFunction(env)  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.ASCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(3,Linux)(1,Flink)(1,Hadoop)(4,VUE)(2,Java)(2,Spring boot)

分组以后按降序取前两条

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);        firstFunction(env);    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(3,Linux)(1,Spark)(1,Hadoop)(4,VUE)(2,Spring boot)(2,Java)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)    firstFunction(env)  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(3,Linux)(1,Spark)(1,Hadoop)(4,VUE)(2,Spring boot)(2,Java)

flatMap算子

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);//        firstFunction(env);        flatMapFunction(env);    }public static void flatMapFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark","hadoop,flink","flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction)(value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class).print();    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

hadoopsparkhadoopflinkflinkflink

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)//    firstFunction(env)    flatMapFunction(env)  }  def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).print()  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

hadoopsparkhadoopflinkflinkflink

当然它也支持跟Java同样的写法

def flatMapFunction(env: ExecutionEnvironment): Unit = {  val info = List("hadoop,spark","hadoop,flink","flink,flink")  val data = env.fromCollection(info)  data.flatMap((value,collector: Collector[String]) => {val tokens = value.split(",")    tokens.foreach(collector.collect(_))  }).print()}

统计单词数量

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);//        firstFunction(env);        flatMapFunction(env);    }public static void flatMapFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .map(new MapFunction>() {@Override                    public Tuple2 map(String value) throws Exception {return new Tuple2<>(value,1);                    }                })            .groupBy(0).sum(1).print();    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(hadoop,2)(flink,3)(spark,1)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)//    firstFunction(env)    flatMapFunction(env)  }  def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).map((_,1)).groupBy(0)      .sum(1).print()  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(hadoop,2)(flink,3)(spark,1)

distinct算子

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);//        firstFunction(env);//        flatMapFunction(env);        distinctFunction(env);    }public static void distinctFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .distinct().print();    }public static void flatMapFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .map(new MapFunction>() {@Override                    public Tuple2 map(String value) throws Exception {return new Tuple2<>(value,1);                    }                })            .groupBy(0).sum(1).print();    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

hadoopflinkspark

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)//    firstFunction(env)//    flatMapFunction(env)    distinctFunction(env)  }  def distinctFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).distinct().print()  }  def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).map((_,1)).groupBy(0)      .sum(1).print()  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

hadoopflinkspark

join算子

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);//        firstFunction(env);//        flatMapFunction(env);//        distinctFunction(env);        joinFunction(env);    }public static void joinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = Arrays.asList(new Tuple2<>(1,"PK哥"),                new Tuple2<>(2,"J哥"),                new Tuple2<>(3,"小队长"),                new Tuple2<>(4,"天空蓝"));        List> info2 = Arrays.asList(new Tuple2<>(1,"北京"),                new Tuple2<>(2,"上海"),                new Tuple2<>(3,"成都"),                new Tuple2<>(4,"杭州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.join(data2).where(0).equalTo(0)                .with(new JoinFunction, Tuple2, Tuple3>() {@Override                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1));                    }                }).print();    }public static void distinctFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .distinct().print();    }public static void flatMapFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .map(new MapFunction>() {@Override                    public Tuple2 map(String value) throws Exception {return new Tuple2<>(value,1);                    }                })            .groupBy(0).sum(1).print();    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(3,小队长,成都)(1,PK哥,北京)(4,天空蓝,杭州)(2,J哥,上海)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)//    firstFunction(env)//    flatMapFunction(env)//    distinctFunction(env)    joinFunction(env)  }  def joinFunction(env: ExecutionEnvironment): Unit = {val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝"))val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(4,"杭州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)    data1.join(data2).where(0).equalTo(0).apply((first,second) =>      (first._1,first._2,second._2)    ).print()  }  def distinctFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).distinct().print()  }  def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).map((_,1)).groupBy(0)      .sum(1).print()  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(3,小队长,成都)(1,PK哥,北京)(4,天空蓝,杭州)(2,J哥,上海)

outJoin算子

左连接

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);//        firstFunction(env);//        flatMapFunction(env);//        distinctFunction(env);//        joinFunction(env);        outJoinFunction(env);    }public static void outJoinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = Arrays.asList(new Tuple2<>(1,"PK哥"),                new Tuple2<>(2,"J哥"),                new Tuple2<>(3,"小队长"),                new Tuple2<>(4,"天空蓝"));        List> info2 = Arrays.asList(new Tuple2<>(1,"北京"),                new Tuple2<>(2,"上海"),                new Tuple2<>(3,"成都"),                new Tuple2<>(5,"杭州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.leftOuterJoin(data2).where(0).equalTo(0)                .with(new JoinFunction, Tuple2, Tuple3>() {@Override                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {if (second == null) {return new Tuple3<>(first.getField(0),first.getField(1),"-");                        }return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1));                    }                }).print();    }public static void joinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = Arrays.asList(new Tuple2<>(1,"PK哥"),                new Tuple2<>(2,"J哥"),                new Tuple2<>(3,"小队长"),                new Tuple2<>(4,"天空蓝"));        List> info2 = Arrays.asList(new Tuple2<>(1,"北京"),                new Tuple2<>(2,"上海"),                new Tuple2<>(3,"成都"),                new Tuple2<>(4,"杭州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.join(data2).where(0).equalTo(0)                .with(new JoinFunction, Tuple2, Tuple3>() {@Override                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1));                    }                }).print();    }public static void distinctFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .distinct().print();    }public static void flatMapFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .map(new MapFunction>() {@Override                    public Tuple2 map(String value) throws Exception {return new Tuple2<>(value,1);                    }                })            .groupBy(0).sum(1).print();    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(3,小队长,成都)(1,PK哥,北京)(4,天空蓝,-)(2,J哥,上海)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)//    firstFunction(env)//    flatMapFunction(env)//    distinctFunction(env)    joinFunction(env)  }  def joinFunction(env: ExecutionEnvironment): Unit = {val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝"))val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(5,"杭州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)    data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second) => {      if (second == null) {        (first._1,first._2,"-")      }else {        (first._1,first._2,second._2)      }    }).print()  }  def distinctFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).distinct().print()  }  def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).map((_,1)).groupBy(0)      .sum(1).print()  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(3,小队长,成都)(1,PK哥,北京)(4,天空蓝,-)(2,J哥,上海)

右连接

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);//        firstFunction(env);//        flatMapFunction(env);//        distinctFunction(env);//        joinFunction(env);        outJoinFunction(env);    }public static void outJoinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = Arrays.asList(new Tuple2<>(1,"PK哥"),                new Tuple2<>(2,"J哥"),                new Tuple2<>(3,"小队长"),                new Tuple2<>(4,"天空蓝"));        List> info2 = Arrays.asList(new Tuple2<>(1,"北京"),                new Tuple2<>(2,"上海"),                new Tuple2<>(3,"成都"),                new Tuple2<>(5,"杭州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.rightOuterJoin(data2).where(0).equalTo(0)                .with(new JoinFunction, Tuple2, Tuple3>() {@Override                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {if (first == null) {return new Tuple3<>(second.getField(0),"-",second.getField(1));                        }return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1));                    }                }).print();    }public static void joinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = Arrays.asList(new Tuple2<>(1,"PK哥"),                new Tuple2<>(2,"J哥"),                new Tuple2<>(3,"小队长"),                new Tuple2<>(4,"天空蓝"));        List> info2 = Arrays.asList(new Tuple2<>(1,"北京"),                new Tuple2<>(2,"上海"),                new Tuple2<>(3,"成都"),                new Tuple2<>(4,"杭州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.join(data2).where(0).equalTo(0)                .with(new JoinFunction, Tuple2, Tuple3>() {@Override                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1));                    }                }).print();    }public static void distinctFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .distinct().print();    }public static void flatMapFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .map(new MapFunction>() {@Override                    public Tuple2 map(String value) throws Exception {return new Tuple2<>(value,1);                    }                })            .groupBy(0).sum(1).print();    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(3,小队长,成都)(1,PK哥,北京)(5,-,杭州)(2,J哥,上海)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)//    firstFunction(env)//    flatMapFunction(env)//    distinctFunction(env)    joinFunction(env)  }  def joinFunction(env: ExecutionEnvironment): Unit = {val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝"))val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(5,"杭州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)    data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first,second) => {      if (first == null) {        (second._1,"-",second._2)      }else {        (first._1,first._2,second._2)      }    }).print()  }  def distinctFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).distinct().print()  }  def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).map((_,1)).groupBy(0)      .sum(1).print()  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(3,小队长,成都)(1,PK哥,北京)(5,-,杭州)(2,J哥,上海)

全外连接

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);//        firstFunction(env);//        flatMapFunction(env);//        distinctFunction(env);//        joinFunction(env);        outJoinFunction(env);    }public static void outJoinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = Arrays.asList(new Tuple2<>(1,"PK哥"),                new Tuple2<>(2,"J哥"),                new Tuple2<>(3,"小队长"),                new Tuple2<>(4,"天空蓝"));        List> info2 = Arrays.asList(new Tuple2<>(1,"北京"),                new Tuple2<>(2,"上海"),                new Tuple2<>(3,"成都"),                new Tuple2<>(5,"杭州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.fullOuterJoin(data2).where(0).equalTo(0)                .with(new JoinFunction, Tuple2, Tuple3>() {@Override                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {if (first == null) {return new Tuple3<>(second.getField(0),"-",second.getField(1));                        }else if (second == null) {return new Tuple3<>(first.getField(0),first.getField(1),"-");                        }return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1));                    }                }).print();    }public static void joinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = Arrays.asList(new Tuple2<>(1,"PK哥"),                new Tuple2<>(2,"J哥"),                new Tuple2<>(3,"小队长"),                new Tuple2<>(4,"天空蓝"));        List> info2 = Arrays.asList(new Tuple2<>(1,"北京"),                new Tuple2<>(2,"上海"),                new Tuple2<>(3,"成都"),                new Tuple2<>(4,"杭州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.join(data2).where(0).equalTo(0)                .with(new JoinFunction, Tuple2, Tuple3>() {@Override                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1));                    }                }).print();    }public static void distinctFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .distinct().print();    }public static void flatMapFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .map(new MapFunction>() {@Override                    public Tuple2 map(String value) throws Exception {return new Tuple2<>(value,1);                    }                })            .groupBy(0).sum(1).print();    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(3,小队长,成都)(1,PK哥,北京)(4,天空蓝,-)(5,-,杭州)(2,J哥,上海)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)//    firstFunction(env)//    flatMapFunction(env)//    distinctFunction(env)    joinFunction(env)  }  def joinFunction(env: ExecutionEnvironment): Unit = {val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝"))val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(5,"杭州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)    data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second) => {      if (first == null) {        (second._1,"-",second._2)      }else if (second == null) {        (first._1,first._2,"-")      }else {        (first._1,first._2,second._2)      }    }).print()  }  def distinctFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).distinct().print()  }  def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).map((_,1)).groupBy(0)      .sum(1).print()  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(3,小队长,成都)(1,PK哥,北京)(4,天空蓝,-)(5,-,杭州)(2,J哥,上海)

cross算子

笛卡尔积

public class DataSetTransformationApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();//        mapFunction(env);//        filterFunction(env);//        mapPartitionFunction(env);//        firstFunction(env);//        flatMapFunction(env);//        distinctFunction(env);//        joinFunction(env);//        outJoinFunction(env);        crossFunction(env);    }public static void crossFunction(ExecutionEnvironment env) throws Exception {        List info1 = Arrays.asList("曼联","曼城");        List info2 = Arrays.asList(3,1,0);        DataSource data1 = env.fromCollection(info1);        DataSource data2 = env.fromCollection(info2);        data1.cross(data2).print();    }public static void outJoinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = Arrays.asList(new Tuple2<>(1,"PK哥"),                new Tuple2<>(2,"J哥"),                new Tuple2<>(3,"小队长"),                new Tuple2<>(4,"天空蓝"));        List> info2 = Arrays.asList(new Tuple2<>(1,"北京"),                new Tuple2<>(2,"上海"),                new Tuple2<>(3,"成都"),                new Tuple2<>(5,"杭州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.fullOuterJoin(data2).where(0).equalTo(0)                .with(new JoinFunction, Tuple2, Tuple3>() {@Override                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {if (first == null) {return new Tuple3<>(second.getField(0),"-",second.getField(1));                        }else if (second == null) {return new Tuple3<>(first.getField(0),first.getField(1),"-");                        }return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1));                    }                }).print();    }public static void joinFunction(ExecutionEnvironment env) throws Exception {        List> info1 = Arrays.asList(new Tuple2<>(1,"PK哥"),                new Tuple2<>(2,"J哥"),                new Tuple2<>(3,"小队长"),                new Tuple2<>(4,"天空蓝"));        List> info2 = Arrays.asList(new Tuple2<>(1,"北京"),                new Tuple2<>(2,"上海"),                new Tuple2<>(3,"成都"),                new Tuple2<>(4,"杭州"));        DataSource> data1 = env.fromCollection(info1);        DataSource> data2 = env.fromCollection(info2);        data1.join(data2).where(0).equalTo(0)                .with(new JoinFunction, Tuple2, Tuple3>() {@Override                    public Tuple3 join(Tuple2 first, Tuple2 second) throws Exception {return new Tuple3<>(first.getField(0),first.getField(1),second.getField(1));                    }                }).print();    }public static void distinctFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .distinct().print();    }public static void flatMapFunction(ExecutionEnvironment env) throws Exception {        List info = Arrays.asList("hadoop,spark", "hadoop,flink", "flink,flink");        DataSource data = env.fromCollection(info);        data.flatMap((FlatMapFunction) (value, collector) -> {            String tokens[] = value.split(",");            Stream.of(tokens).forEach(collector::collect);        }).returns(String.class)                .map(new MapFunction>() {@Override                    public Tuple2 map(String value) throws Exception {return new Tuple2<>(value,1);                    }                })            .groupBy(0).sum(1).print();    }public static void firstFunction(ExecutionEnvironment env) throws Exception {        List> info = Arrays.asList(new Tuple2<>(1,"Hadoop"),                new Tuple2<>(1,"Spark"),                new Tuple2<>(1,"Flink"),                new Tuple2<>(2,"Java"),                new Tuple2<>(2,"Spring boot"),                new Tuple2<>(3,"Linux"),                new Tuple2<>(4,"VUE"));        DataSource> data = env.fromCollection(info);        data.groupBy(0).sortGroup(1, Order.DESCENDING).first(2).print();    }public static void mapPartitionFunction(ExecutionEnvironment env) throws Exception {        List students = new ArrayList<>();        for (int i = 0; i < 100; i++) {            students.add("student: " + i);        }        DataSource data = env.fromCollection(students).setParallelism(4);        //此处会按照students的数量进行转换        data.map(student -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            return connection;        }).print();        System.out.println("-----------");        //此处会按照并行度的数量进行转换        data.mapPartition((MapPartitionFunction)(student, collector) -> {int connection = DBUntils.getConnection();            //TODO 数据库操作            DBUntils.returnConnection(connection);            collector.collect(connection);        }).returns(Integer.class).print();    }public static void filterFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).filter(x -> x > 5).print();    }public static void mapFunction(ExecutionEnvironment env) throws Exception {        DataSource data = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7,8,9,10));        data.map(x -> x + 1).print();    }}

运行结果(省略日志)

(曼联,3)(曼联,1)(曼联,0)(曼城,3)(曼城,1)(曼城,0)

Scala代码

import com.guanjian.flink.scala.until.DBUntilsimport org.apache.flink.api.common.operators.Orderimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBufferobject DataSetTransformationApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment//    mapFunction(env)//    filterFunction(env)//    mapPartitionFunction(env)//    firstFunction(env)//    flatMapFunction(env)//    distinctFunction(env)//    joinFunction(env)    crossFunction(env)  }  def crossFunction(env: ExecutionEnvironment): Unit = {val info1 = List("曼联","曼城")val info2 = List(3,1,0)val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)    data1.cross(data2).print()  }  def joinFunction(env: ExecutionEnvironment): Unit = {val info1 = List((1,"PK哥"),(2,"J哥"),(3,"小队长"),(4,"天空蓝"))val info2 = List((1,"北京"),(2,"上海"),(3,"成都"),(5,"杭州"))val data1 = env.fromCollection(info1)val data2 = env.fromCollection(info2)    data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second) => {      if (first == null) {        (second._1,"-",second._2)      }else if (second == null) {        (first._1,first._2,"-")      }else {        (first._1,first._2,second._2)      }    }).print()  }  def distinctFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).distinct().print()  }  def flatMapFunction(env: ExecutionEnvironment): Unit = {val info = List("hadoop,spark","hadoop,flink","flink,flink")val data = env.fromCollection(info)    data.flatMap(_.split(",")).map((_,1)).groupBy(0)      .sum(1).print()  }  def firstFunction(env: ExecutionEnvironment): Unit = {val info = List((1,"Hadoop"),(1,"Spark"),(1,"Flink"),(2,"Java"),      (2,"Spring boot"),(3,"Linux"),(4,"VUE"))val data = env.fromCollection(info)    data.groupBy(0).sortGroup(1,Order.DESCENDING).first(2).print()  }  def mapPartitionFunction(env: ExecutionEnvironment): Unit = {val students = new ListBuffer[String]for(i <- 1 to 100) {      students.append("student: " + i)    }val data = env.fromCollection(students).setParallelism(4)    data.map(student => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      connection    }).print()println("-----------")    data.mapPartition((student,collector: Collector[Int]) => {      val connection = DBUntils.getConnection()      //TODO 数据库操作      DBUntils.returnConnection(connection)      collector.collect(connection)    }).print()  }  def filterFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).filter(_ > 5).print()  }  def mapFunction(env: ExecutionEnvironment): Unit = {val data = env.fromCollection(List(1,2,3,4,5,6,7,8,9,10))    data.map(_ + 1).print()  }}

运行结果(省略日志)

(曼联,3)(曼联,1)(曼联,0)(曼城,3)(曼城,1)(曼城,0)

Sink(输出)

我们在flink文件夹下面新增一个sink-out的文件夹,此时文件夹为空

输出成文本文件

public class DataSetSinkApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        List data = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            data.add(i);        }        DataSource text = env.fromCollection(data);        String filePath = "/Users/admin/Downloads/flink/sink-out/sinkjava/";        text.writeAsText(filePath);        env.execute("DataSetSinkApp");    }}

运行结果

进入sink-out文件夹

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._object DataSetSinkApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment    val data = 1 to 10    val text = env.fromCollection(data)val filePath = "/Users/admin/Downloads/flink/sink-out/sinkscala/"    text.writeAsText(filePath)    env.execute("DataSetSinkApp")  }}

运行结果

如果此时我们再次运行代码就会报错,因为输出文件已经存在,如果要覆盖该文件,则需要调整代码

public class DataSetSinkApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        List data = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            data.add(i);        }        DataSource text = env.fromCollection(data);        String filePath = "/Users/admin/Downloads/flink/sink-out/sinkjava/";        text.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE);        env.execute("DataSetSinkApp");    }}

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.core.fs.FileSystem.WriteModeobject DataSetSinkApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment    val data = 1 to 10    val text = env.fromCollection(data)val filePath = "/Users/admin/Downloads/flink/sink-out/sinkscala/"    text.writeAsText(filePath,WriteMode.OVERWRITE)    env.execute("DataSetSinkApp")  }}

增加并行度,输出多个文件

public class DataSetSinkApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        List data = new ArrayList<>();        for (int i = 1; i <= 10; i++) {            data.add(i);        }        DataSource text = env.fromCollection(data);        String filePath = "/Users/admin/Downloads/flink/sink-out/sinkjava/";        text.writeAsText(filePath, FileSystem.WriteMode.OVERWRITE).setParallelism(4);        env.execute("DataSetSinkApp");    }}

运行结果

此时我们可以看到sinkjava变成了一个文件夹,而该文件夹下面有4个文件

Scala代码

import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.core.fs.FileSystem.WriteModeobject DataSetSinkApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment    val data = 1 to 10    val text = env.fromCollection(data)val filePath = "/Users/admin/Downloads/flink/sink-out/sinkscala/"    text.writeAsText(filePath,WriteMode.OVERWRITE).setParallelism(4)    env.execute("DataSetSinkApp")  }}

运行结果

计数器

public class CounterApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        DataSource data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm");        String filePath = "/Users/admin/Downloads/flink/sink-out/sink-java-counter/";        data.map(new RichMapFunction() {            LongCounter counter = new LongCounter();            @Override            public void open(Configuration parameters) throws Exception {super.open(parameters);                getRuntimeContext().addAccumulator("ele-counts-java", counter);            }@Override            public String map(String value) throws Exception {counter.add(1);                return value;            }        }).writeAsText(filePath, FileSystem.WriteMode.OVERWRITE).setParallelism(4);        JobExecutionResult jobResult = env.execute("CounterApp");        Long num = jobResult.getAccumulatorResult("ele-counts-java");        System.out.println("num: " + num);    }}

运行结果(省略日志)

num: 5

Scala代码

import org.apache.flink.api.common.accumulators.LongCounterimport org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.configuration.Configurationimport org.apache.flink.core.fs.FileSystem.WriteModeobject CounterApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment    val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm")val filePath = "/Users/admin/Downloads/flink/sink-out/sink-scala-counter/"    data.map(new RichMapFunction[String,String]() {      val counter = new LongCounter      override def open(parameters: Configuration): Unit = {        getRuntimeContext.addAccumulator("ele-counts-scala", counter)      }      override def map(value: String) = {counter.add(1)        value      }    }).writeAsText(filePath,WriteMode.OVERWRITE).setParallelism(4)val jobResult = env.execute("CounterApp")val num = jobResult.getAccumulatorResult[Long]("ele-counts-scala")println("num: " + num)  }}

运行结果(省略日志)

num: 5

分布式缓存

public class DistriutedCacheApp {public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        String filePath = "/Users/admin/Downloads/flink/data/hello.txt";        env.registerCachedFile(filePath,"pk-java-dc");        DataSource data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm");        data.map(new RichMapFunction() {@Override            public void open(Configuration parameters) throws Exception {super.open(parameters);                File dcFile = getRuntimeContext().getDistributedCache().getFile("pk-java-dc");                List lines = FileUtils.readLines(dcFile);                lines.forEach(System.out::println);            }@Override            public String map(String value) throws Exception {return value;            }        }).print();    }}

运行结果(省略日志)

hello world welcomehello welcomehadoopsparkflinkpysparkstorm

Scala代码

import org.apache.commons.io.FileUtilsimport org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.configuration.Configurationimport scala.collection.JavaConverters._object DistriutedCacheApp {  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment    val filePath = "/Users/admin/Downloads/flink/data/hello.txt"    env.registerCachedFile(filePath,"pk-scala-dc")val data = env.fromElements("hadoop", "spark", "flink", "pyspark", "storm")    data.map(new RichMapFunction[String,String] {      override def open(parameters: Configuration): Unit = {val dcFile = getRuntimeContext.getDistributedCache.getFile("pk-scala-dc")val lines = FileUtils.readLines(dcFile)        lines.asScala.foreach(println(_))      }      override def map(value: String) = {        value      }    })  }.print()}

运行结果(省略日志)

hello world welcomehello welcomehadoopsparkflinkpysparkstorm

流处理

socket

public class DataStreamSourceApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        socketFunction(env);        env.execute("DataStreamSourceApp");    }public static void socketFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.socketTextStream("127.0.0.1", 9999);        data.print().setParallelism(1);    }}

运行前执行控制台

nc -lk 9999

执行后,在控制台输入

运行结果(省略日志)

hello world welcomehello welcomehadoopsparkflinkpysparkstorm

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentobject DataStreamSourceApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    socketFunction(env)    env.execute("DataStreamSourceApp")  }  def socketFunction(env: StreamExecutionEnvironment): Unit = {val data = env.socketTextStream("127.0.0.1",9999)    data.print().setParallelism(1)  }}

运行结果(省略日志)

hello world welcomehello welcomehadoopsparkflinkpysparkstorm

自定义数据源

不可并行数据源

public class CustomNonParallelSourceFunction implements SourceFunction {private boolean isRunning = true;    private long count = 1;    @Override    public void run(SourceContext ctx) throws Exception {while (isRunning) {            ctx.collect(count);            count++;            Thread.sleep(1000);        }    }@Override    public void cancel() {isRunning = false;    }}
public class DataStreamSourceApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        socketFunction(env);        nonParallelSourceFunction(env);        env.execute("DataStreamSourceApp");    }public static void nonParallelSourceFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction());        data.print().setParallelism(1);    }public static void socketFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.socketTextStream("127.0.0.1", 9999);        data.print().setParallelism(1);    }}

运行结果(省略日志,每隔1秒打印一次)

123456...

因为是不可并行,如果我们调大并行度则会报错,如

public class DataStreamSourceApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        socketFunction(env);        nonParallelSourceFunction(env);        env.execute("DataStreamSourceApp");    }public static void nonParallelSourceFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction())                .setParallelism(2);        data.print().setParallelism(1);    }public static void socketFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.socketTextStream("127.0.0.1", 9999);        data.print().setParallelism(1);    }}

结果报错

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source        at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)        at com.guanjian.flink.java.test.DataStreamSourceApp.nonParallelSourceFunction(DataStreamSourceApp.java:17)        at com.guanjian.flink.java.test.DataStreamSourceApp.main(DataStreamSourceApp.java:11)

Scala代码

import org.apache.flink.streaming.api.functions.source.SourceFunctionclass CustomNonParallelSourceFunction extends SourceFunction[Long] {  private var isRunning = true  private var count = 1L  override def cancel(): Unit = {isRunning = false  }  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {      ctx.collect(count)      count += 1      Thread.sleep(1000)    }  }}
import com.guanjian.flink.scala.until.CustomNonParallelSourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object DataStreamSourceApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    socketFunction(env)    nonParallelSourceFunction(env)    env.execute("DataStreamSourceApp")  }  def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)    data.print().setParallelism(1)  }  def socketFunction(env: StreamExecutionEnvironment): Unit = {val data = env.socketTextStream("127.0.0.1",9999)    data.print().setParallelism(1)  }}

运行结果(省略日志,每隔1秒打印一次)

123456...

可并行数据源

public class CustomParallelSourceFunction implements ParallelSourceFunction {private boolean isRunning = true;    private long count = 1;    @Override    public void run(SourceContext ctx) throws Exception {while (isRunning) {            ctx.collect(count);            count++;            Thread.sleep(1000);        }    }@Override    public void cancel() {isRunning = false;    }}
public class DataStreamSourceApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        socketFunction(env);//        nonParallelSourceFunction(env);        parallelSourceFunction(env);        env.execute("DataStreamSourceApp");    }public static void parallelSourceFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomParallelSourceFunction())                .setParallelism(2);        data.print().setParallelism(1);    }public static void nonParallelSourceFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction());        data.print().setParallelism(1);    }public static void socketFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.socketTextStream("127.0.0.1", 9999);        data.print().setParallelism(1);    }}

运行结果(省略日志,每隔1秒打印一次,每次打印两条)

1122334455....

Scala代码

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}class CustomParallelSourceFunction extends ParallelSourceFunction[Long] {  private var isRunning = true  private var count = 1L  override def cancel(): Unit = {isRunning = false  }  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {      ctx.collect(count)      count += 1      Thread.sleep(1000)    }  }}
import com.guanjian.flink.scala.until.{CustomNonParallelSourceFunction, CustomParallelSourceFunction}import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object DataStreamSourceApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    socketFunction(env)//    nonParallelSourceFunction(env)    parallelSourceFunction(env)    env.execute("DataStreamSourceApp")  }  def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomParallelSourceFunction)      .setParallelism(2)    data.print().setParallelism(1)  }  def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)    data.print().setParallelism(1)  }  def socketFunction(env: StreamExecutionEnvironment): Unit = {val data = env.socketTextStream("127.0.0.1",9999)    data.print().setParallelism(1)  }}

运行结果(省略日志,每隔1秒打印一次,每次打印两条)

1122334455....

增强数据源

public class CustomRichParallelSourceFunction extends RichParallelSourceFunction {private boolean isRunning = true;    private long count = 1;    /**     * 可以在这里面实现一些其他需求的代码     * @param parameters     * @throws Exception     */    @Override    public void open(Configuration parameters) throws Exception {super.open(parameters);    }@Override    public void close() throws Exception {super.close();    }@Override    public void run(SourceContext ctx) throws Exception {while (isRunning) {            ctx.collect(count);            count++;            Thread.sleep(1000);        }    }@Override    public void cancel() {isRunning = false;    }}
public class DataStreamSourceApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        socketFunction(env);//        nonParallelSourceFunction(env);//        parallelSourceFunction(env);        richParallelSourceFunction(env);        env.execute("DataStreamSourceApp");    }public static void richParallelSourceFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomRichParallelSourceFunction())                .setParallelism(2);        data.print().setParallelism(1);    }public static void parallelSourceFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomParallelSourceFunction())                .setParallelism(2);        data.print().setParallelism(1);    }public static void nonParallelSourceFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction());        data.print().setParallelism(1);    }public static void socketFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.socketTextStream("127.0.0.1", 9999);        data.print().setParallelism(1);    }}

运行结果与可并行数据源相同

Scala代码

import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] {  private var isRunning = true  private var count = 1L  override def open(parameters: Configuration): Unit = {  }  override def close(): Unit = {  }  override def cancel(): Unit = {isRunning = false  }  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {while (isRunning) {      ctx.collect(count)      count += 1      Thread.sleep(1000)    }  }}
import com.guanjian.flink.scala.until.{CustomNonParallelSourceFunction, CustomParallelSourceFunction, CustomRichParallelSourceFunction}import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object DataStreamSourceApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    socketFunction(env)//    nonParallelSourceFunction(env)//    parallelSourceFunction(env)    richParallelSourceFunction(env)    env.execute("DataStreamSourceApp")  }  def richParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomRichParallelSourceFunction)      .setParallelism(2)    data.print().setParallelism(1)  }  def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomParallelSourceFunction)      .setParallelism(2)    data.print().setParallelism(1)  }  def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)    data.print().setParallelism(1)  }  def socketFunction(env: StreamExecutionEnvironment): Unit = {val data = env.socketTextStream("127.0.0.1",9999)    data.print().setParallelism(1)  }}

运行结果与可并行数据源相同

流算子

map和filter

public class DataStreamTransformationApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        filterFunction(env);        env.execute("DataStreamTransformationApp");    }public static void filterFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction());        data.map(x -> {            System.out.println("接收到: " + x);            return x;        }).filter(x -> x % 2 == 0).print().setParallelism(1);    }}

运行结果(省略日志)

接收到: 1接收到: 22接收到: 3接收到: 44接收到: 5接收到: 66..

Scala代码

import com.guanjian.flink.scala.until.CustomNonParallelSourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object DataStreamTransformationApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    filterFunction(env)    env.execute("DataStreamTransformationApp")  }  def filterFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)    data.map(x => {      println("接收到: " + x)      x    }).filter(_ % 2 == 0).print().setParallelism(1)  }}

运行结果(省略日志)

接收到: 1接收到: 22接收到: 3接收到: 44接收到: 5接收到: 66..

union

public class DataStreamTransformationApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        filterFunction(env);        unionFunction(env);        env.execute("DataStreamTransformationApp");    }public static void unionFunction(StreamExecutionEnvironment env) {        DataStreamSource data1 = env.addSource(new CustomNonParallelSourceFunction());        DataStreamSource data2 = env.addSource(new CustomNonParallelSourceFunction());        data1.union(data2).print().setParallelism(1);    }public static void filterFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction());        data.map(x -> {            System.out.println("接收到: " + x);            return x;        }).filter(x -> x % 2 == 0).print().setParallelism(1);    }}

运行结果(省略日志)

1122334455..

Scala代码

import com.guanjian.flink.scala.until.CustomNonParallelSourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object DataStreamTransformationApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    filterFunction(env)    unionFunction(env)    env.execute("DataStreamTransformationApp")  }  def unionFunction(env: StreamExecutionEnvironment): Unit = {val data1 = env.addSource(new CustomNonParallelSourceFunction)val data2 = env.addSource(new CustomNonParallelSourceFunction)    data1.union(data2).print().setParallelism(1)  }  def filterFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)    data.map(x => {      println("接收到: " + x)      x    }).filter(_ % 2 == 0).print().setParallelism(1)  }}

运行结果(省略日志)

1122334455..

split和select

将一个流拆成多个流以及挑选其中一个流

public class DataStreamTransformationApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        filterFunction(env);//        unionFunction(env);        splitSelectFunction(env);        env.execute("DataStreamTransformationApp");    }public static void splitSelectFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction());        SplitStream splits = data.split(value -> {            List list = new ArrayList<>();            if (value % 2 == 0) {                list.add("偶数");            } else {                list.add("奇数");            }return list;        });        splits.select("奇数").print().setParallelism(1);    }public static void unionFunction(StreamExecutionEnvironment env) {        DataStreamSource data1 = env.addSource(new CustomNonParallelSourceFunction());        DataStreamSource data2 = env.addSource(new CustomNonParallelSourceFunction());        data1.union(data2).print().setParallelism(1);    }public static void filterFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction());        data.map(x -> {            System.out.println("接收到: " + x);            return x;        }).filter(x -> x % 2 == 0).print().setParallelism(1);    }}

运行结果(省略日志)

1357911..

Scala代码

import com.guanjian.flink.scala.until.CustomNonParallelSourceFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.collector.selector.OutputSelectorimport java.util.ArrayListimport java.lang.Iterableobject DataStreamTransformationApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//    filterFunction(env)//    unionFunction(env)    splitSelectFunction(env)    env.execute("DataStreamTransformationApp")  }  def splitSelectFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)val splits = data.split(new OutputSelector[Long] {      override def select(value: Long): Iterable[String] = {val list = new ArrayList[String]if (value % 2 == 0) {          list.add("偶数")        } else {          list.add("奇数")        }        list      }    })    splits.select("奇数").print().setParallelism(1)  }  def unionFunction(env: StreamExecutionEnvironment): Unit = {val data1 = env.addSource(new CustomNonParallelSourceFunction)val data2 = env.addSource(new CustomNonParallelSourceFunction)    data1.union(data2).print().setParallelism(1)  }  def filterFunction(env: StreamExecutionEnvironment): Unit = {val data = env.addSource(new CustomNonParallelSourceFunction)    data.map(x => {      println("接收到: " + x)      x    }).filter(_ % 2 == 0).print().setParallelism(1)  }}

运行结果(省略日志)

1357911..

这里需要说明的是split已经被设置为不推荐使用的方法

@deprecateddef split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))

因为OutputSelector函数式接口的返回类型为一个Java专属类型,对于Scala是不友好的,所以Scala这里也无法使用lambda表达式

当然select也可以选取多个流

public class DataStreamTransformationApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//        filterFunction(env);//        unionFunction(env);        splitSelectFunction(env);        env.execute("DataStreamTransformationApp");    }public static void splitSelectFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction());        SplitStream splits = data.split(value -> {            List list = new ArrayList<>();            if (value % 2 == 0) {                list.add("偶数");            } else {                list.add("奇数");            }return list;        });        splits.select("奇数","偶数").print().setParallelism(1);    }public static void unionFunction(StreamExecutionEnvironment env) {        DataStreamSource data1 = env.addSource(new CustomNonParallelSourceFunction());        DataStreamSource data2 = env.addSource(new CustomNonParallelSourceFunction());        data1.union(data2).print().setParallelism(1);    }public static void filterFunction(StreamExecutionEnvironment env) {        DataStreamSource data = env.addSource(new CustomNonParallelSourceFunction());        data.map(x -> {            System.out.println("接收到: " + x);            return x;        }).filter(x -> x % 2 == 0).print().setParallelism(1);    }}

运行结果(省略日志)

123456..

Scala代码修改是一样的,这里就不重复了

流Sink

自定义Sink

将socket中的数据传入mysql中

@Data@ToString@AllArgsConstructor@NoArgsConstructorpublic class Student {private int id;    private String name;    private int age;}
public class SinkToMySQL extends RichSinkFunction {private Connection connection;    private PreparedStatement pstmt;    private Connection getConnection() {        Connection conn = null;        try {            Class.forName("com.mysql.cj.jdbc.Driver");            String url = "jdbc:mysql://127.0.0.1:3306/flink";            conn = DriverManager.getConnection(url,"root","abcd123");        }catch (Exception e) {            e.printStackTrace();        }return conn;    }@Override    public void open(Configuration parameters) throws Exception {super.open(parameters);        connection = getConnection();        String sql = "insert into student(id,name,age) values (?,?,?)";        pstmt = connection.prepareStatement(sql);    }@Override    public void invoke(Student value) throws Exception {        System.out.println("invoke--------");        pstmt.setInt(1,value.getId());        pstmt.setString(2,value.getName());        pstmt.setInt(3,value.getAge());        pstmt.executeUpdate();    }@Override    public void close() throws Exception {super.close();        if (pstmt != null) {pstmt.close();        }if (connection != null) {connection.close();        }    }}
public class CustomSinkToMySQL {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource source = env.socketTextStream("127.0.0.1", 9999);        SingleOutputStreamOperator studentStream = source.map(value -> {            String[] splits = value.split(",");            Student stu = new Student(Integer.parseInt(splits[0]), splits[1], Integer.parseInt(splits[2]));            return stu;        }).returns(Student.class);        studentStream.addSink(new SinkToMySQL());        env.execute("CustomSinkToMySQL");    }}

代码执行前执行

nc -lk 9999

执行代码后输入

执行结果

Scala代码

class Student(var id: Int,var name: String,var age: Int) {}
import java.sql.{Connection, DriverManager, PreparedStatement}import com.guanjian.flink.scala.test.Studentimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.functions.sink.RichSinkFunctionclass SinkToMySQL extends RichSinkFunction[Student] {  var connection: Connection = null  var pstmt: PreparedStatement = null  def getConnection:Connection = {var conn: Connection = null    Class.forName("com.mysql.cj.jdbc.Driver")val url = "jdbc:mysql://127.0.0.1:3306/flink"    conn = DriverManager.getConnection(url, "root", "abcd123")    conn  }  override def open(parameters: Configuration): Unit = {connection = getConnectionval sql = "insert into student(id,name,age) values (?,?,?)"    pstmt = connection.prepareStatement(sql)  }  override def invoke(value: Student): Unit = {println("invoke--------")pstmt.setInt(1,value.id)pstmt.setString(2,value.name)pstmt.setInt(3,value.age)pstmt.executeUpdate()  }  override def close(): Unit = {if (pstmt != null) {      pstmt.close()    }if (connection != null) {      connection.close()    }  }}
import com.guanjian.flink.scala.until.SinkToMySQLimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object CustomSinkToMySQL {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    val source = env.socketTextStream("127.0.0.1",9999)val studentStream = source.map(value => {      val splits = value.split(",")      val stu = new Student(splits(0).toInt, splits(1), splits(2).toInt)      stu    })    studentStream.addSink(new SinkToMySQL)    env.execute("CustomSinkToMySQL")  }}

控制台输入

运行结果

Table API以及SQL

要使用flink的Table API,Java工程需要添加Scala依赖库

   org.apache.flink   flink-scala_${scala.binary.version}   ${flink.version}   org.apache.flink   flink-streaming-scala_${scala.binary.version}   ${flink.version}   org.scala-lang   scala-library   ${scala.version}   org.apache.flink   flink-table_2.11   ${flink.version}

在data目录下添加一个sales.csv文件,文件内容如下

transactionId,customerId,itemId,amountPaid111,1,1,100.0112,2,3,505.0113,3,3,510.0114,4,4,600.0115,1,2,500.0116,1,2,500.0117,1,2,500.0118,1,2,600.0119,2,3,400.0120,1,2,500.0121,1,4,500.0122,1,2,500.0123,1,4,500.0124,1,2,600.0
import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import lombok.ToString;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.java.BatchTableEnvironment;import org.apache.flink.types.Row;public class TableSQLAPI {@Data    @ToString    @AllArgsConstructor    @NoArgsConstructor    public static class SalesLog {private String transactionId;        private String customerId;        private String itemId;        private Double amountPaid;    }public static void main(String[] args) throws Exception {        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);        String filePath = "/Users/admin/Downloads/flink/data/sales.csv";        DataSource csv = env.readCsvFile(filePath).ignoreFirstLine()                .pojoType(SalesLog.class, "transactionId", "customerId",                        "itemId", "amountPaid");        Table sales = tableEnv.fromDataSet(csv);        tableEnv.registerTable("sales",sales);        Table resultTable = tableEnv.sqlQuery("select customerId,sum(amountPaid) money from sales " +"group by customerId");        DataSet result = tableEnv.toDataSet(resultTable, Row.class);        result.print();    }}

运行结果(省略日志)

3,510.04,600.01,4800.02,905.0

Scala代码

Scala项目同样要放入依赖

   org.apache.flink   flink-table_2.11   ${flink.version}
import org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.table.api.TableEnvironmentimport org.apache.flink.types.Rowimport org.apache.flink.api.scala._object TableSQLAPI {  case class SalesLog(transactionId: String,customerId: String,itemId: String,amountPaid: Double)  def main(args: Array[String]): Unit = {val env = ExecutionEnvironment.getExecutionEnvironment    val tableEnv = TableEnvironment.getTableEnvironment(env)val filePath = "/Users/admin/Downloads/flink/data/sales.csv"    val csv = env.readCsvFile[SalesLog](filePath,ignoreFirstLine = true,includedFields = Array(0,1,2,3))val sales = tableEnv.fromDataSet(csv)    tableEnv.registerTable("sales",sales)val resultTable = tableEnv.sqlQuery("select customerId,sum(amountPaid) money from sales " +      "group by customerId")    tableEnv.toDataSet[Row](resultTable).print()  }}

运行结果(省略日志)

3,510.04,600.01,4800.02,905.0

时间和窗口

Flink中有三个时间是比较重要的,包括事件时间(Event Time),处理时间(Processing Time),进入Flink系统的时间(Ingestion Time)

通常我们都是使用事件时间来作为基准。

设置时间的代码

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

事件时间通常以时间戳的形式会包含在传入的数据中的一个字段,通过提取,来决定窗口什么时候来执行。

窗口(Windows)是主要进行流处理(无限流)中,将流数据拆成按照时间段或者大小的一个个的数据桶,窗口分为两种,一种是根据key来统计,一种是全部的。它的处理过程如下

Keyed Windowsstream       .keyBy(...)               <-  keyed versus non-keyed windows       .window(...)              <-  required: "assigner"      [.trigger(...)]            <-  optional: "trigger" (else default trigger)      [.evictor(...)]            <-  optional: "evictor" (else no evictor)      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)       .reduce/aggregate/fold/apply()      <-  required: "function"      [.getSideOutput(...)]      <-  optional: "output tag"Non-Keyed Windowsstream       .windowAll(...)           <-  required: "assigner"      [.trigger(...)]            <-  optional: "trigger" (else default trigger)      [.evictor(...)]            <-  optional: "evictor" (else no evictor)      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)       .reduce/aggregate/fold/apply()      <-  required: "function"      [.getSideOutput(...)]      <-  optional: "output tag"

窗口触发可以有两种条件,比方说达到了一定的数量或者水印(watermark)达到了条件。watermark是一种衡量Event Time进展的机制,

watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

我们从socket接收数据,然后经过map后立刻抽取timetamp并生成watermark,之后应用window来看看watermark和event time如何变化,才导致window被触发的。

public class WindowsApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        DataStreamSource input = env.socketTextStream("127.0.0.1", 9999);        //将数据流(key,时间戳组成的字符串)转换成元组        SingleOutputStreamOperator> inputMap = input.map(new MapFunction>() {@Override            public Tuple2 map(String value) throws Exception {                String[] splits = value.split(",");                return new Tuple2<>(splits[0], Long.parseLong(splits[1]));            }        });        //提取时间戳,生成水印        SingleOutputStreamOperator> watermarks = inputMap.setParallelism(1).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks>() {private Long currentMaxTimestamp = 0L;            //最大允许的乱序时间为10秒            private Long maxOutOfOrderness = 10000L;            private Watermark watermark;            private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");            @Nullable            @Override            public Watermark getCurrentWatermark() {watermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);                return watermark;            }@Override            public long extractTimestamp(Tuple2 element, long previousElementTimestamp) {                Long timestamp = element.getField(1);                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);                System.out.println("timestamp:" + element.getField(0) + "," +                        element.getField(1) + "|" + format.format(new Date((Long)element.getField(1))) +"," + currentMaxTimestamp + "|" + format.format(new Date(currentMaxTimestamp)) +"," + watermark.toString() + "|" + format.format(new Date(watermark.getTimestamp())));                return timestamp;            }        });        //根据水印的条件,来执行我们需要的方法        //如果水印条件不满足,该方法是永远不会执行的        watermarks.keyBy(x -> (String)x.getField(0)).timeWindow(Time.seconds(3))                .apply(new WindowFunction, Tuple6, String, TimeWindow>() {@Override                    public void apply(String key, TimeWindow window, Iterable> input, Collector> out) throws Exception {                        List> list = (List) input;                        //将乱序进行有序整理                        List collect = list.stream().map(x -> (Long)x.getField(1)).sorted().collect(Collectors.toList());                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");                        out.collect(new Tuple6<>(key,list.size(),                                format.format(collect.get(0)),format.format(collect.get(collect.size() - 1)),                                format.format(window.getStart()),format.format(window.getEnd())));                    }                }).print().setParallelism(1);        env.execute("WindowsApp");    }}

在控制台执行nc -lk 9999后,运行我们的程序,控制台输入

000001,1461756862000

打印

timestamp:000001,1461756862000|2016-04-27 19:34:22.000,1461756862000|2016-04-27 19:34:22.000,Watermark @ -10000|1970-01-01 07:59:50.000

由该执行结果watermark = -10000,我们可以看出,水印是先获取的,再执行时间戳的提取。

控制台继续输入

000001,1461756866000

打印

timestamp:000001,1461756866000|2016-04-27 19:34:26.000,1461756866000|2016-04-27 19:34:26.000,Watermark @ 1461756852000|2016-04-27 19:34:12.000

由于水印是先获取的,则此时的水印1461756852000|2016-04-27 19:34:12.000是第一次输入所产生的。

控制台继续输入

000001,1461756872000

打印

timestamp:000001,1461756872000|2016-04-27 19:34:32.000,1461756872000|2016-04-27 19:34:32.000,Watermark @ 1461756856000|2016-04-27 19:34:16.000

此时我们的时间戳来到了32秒,比第一个数据的时间多出了10秒。

控制台继续输入

000001,1461756873000

打印

timestamp:000001,1461756873000|2016-04-27 19:34:33.000,1461756873000|2016-04-27 19:34:33.000,Watermark @ 1461756862000|2016-04-27 19:34:22.000

此时我们的时间戳来到了33秒,比第一个数据的时间多出了11秒。此时依然没有触发Windows窗体执行代码。

控制台继续输入

000001,1461756874000

打印

timestamp:000001,1461756874000|2016-04-27 19:34:34.000,1461756874000|2016-04-27 19:34:34.000,Watermark @ 1461756863000|2016-04-27 19:34:23.000(000001,1,2016-04-27 19:34:22.000,2016-04-27 19:34:22.000,2016-04-27 19:34:21.000,2016-04-27 19:34:24.000)

此时触发了Windows窗体执行代码。输出了一个六元组

控制台继续输入

000001,1461756876000

打印

timestamp:000001,1461756876000|2016-04-27 19:34:36.000,1461756876000|2016-04-27 19:34:36.000,Watermark @ 1461756864000|2016-04-27 19:34:24.000

此时我们可以看到该水印是上一条数据会产生的,刚好在上一条数据的时间窗口内2016-04-27 19:34:22.000,2016-04-27 19:34:21.000,2016-04-27 19:34:24.000,触发Windows执行代码

则触发条件为

  1. watermark时间 >= window_end_time

  2. 在[window_start_time,window_end_time)中有数据存在

Scala代码

import java.text.SimpleDateFormatimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala._object WindowsApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val input = env.socketTextStream("127.0.0.1",9999)val inputMap = input.map(f => {      val splits = f.split(",")      (splits(0), splits(1).toLong)    })val watermarks = inputMap.setParallelism(1).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {      var currentMaxTimestamp = 0L      var maxOutofOrderness = 10000L      var watermark: Watermark = null      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")      override def getCurrentWatermark: Watermark = {watermark = new Watermark(currentMaxTimestamp - maxOutofOrderness)watermark      }      override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)println("timestamp:" + element._1 + "," + element._2 + "|" + format.format(element._2) +          "," + currentMaxTimestamp + "|" + format.format(currentMaxTimestamp) +          "," + watermark.toString + "|" + format.format(watermark.getTimestamp))        timestamp      }    })    watermarks.keyBy(_._1).timeWindow(Time.seconds(3))      .apply(new WindowFunction[(String,Long),(String,Int,String,String,String,String),String,TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, Int, String, String, String, String)]): Unit = {          val list = input.toList.sortBy(_._2)          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")          out.collect((key,input.size,format.format(list.head._2),format.format(list.last._2),format.format(window.getStart),format.format(window.getEnd)))        }      }).print().setParallelism(1)    env.execute("WindowsApp")  }}

滚动窗口和滑动窗口

滚动窗口就是一个不重叠的时间分片,落入到该时间分片的数据都会被该窗口计算。上面的例子就是一个滚动窗口

代码中可以写成

.window(TumblingEventTimeWindows.of(Time.seconds(5)))

也可以简写成

.timeWindow(Time.seconds(5))

滑动窗口是一个可以重叠的时间分片,同样的数据可以落入不同的窗口,不同的窗口都会计算落入自己时间分片的数据。

代码可以写成

.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

或者简写成

.timeWindow(Time.seconds(10),Time.seconds(5))
public class SliderWindowsApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource text = env.socketTextStream("127.0.0.1", 9999);        text.flatMap(new FlatMapFunction>() {@Override            public void flatMap(String value, Collector> out) throws Exception {                String[] splits = value.split(",");                Stream.of(splits).forEach(token -> out.collect(new Tuple2<>(token,1)));            }        }).keyBy(0).timeWindow(Time.seconds(10),Time.seconds(5))                .sum(1).print().setParallelism(1);        env.execute("SliderWindowsApp");    }}

控制台输入

a,b,c,d,e,fa,b,c,d,e,fa,b,c,d,e,f

运行结果

(d,3)(a,3)(e,3)(f,3)(c,3)(b,3)(c,3)(f,3)(b,3)(d,3)(e,3)(a,3)

从结果我们可以看到,数据被运算了两次

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.api.scala._object SliderWindowsApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    val text = env.socketTextStream("127.0.0.1",9999)    text.flatMap(_.split(","))      .map((_,1))      .keyBy(0)      .timeWindow(Time.seconds(10),Time.seconds(5))      .sum(1)      .print()      .setParallelism(1)    env.execute("SliderWindowsApp")  }}

Windows Functions

RedueFunction

这是一个增量函数,即它不会把时间窗口内的所有数据统一处理,只会一条一条处理

public class ReduceWindowsApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource text = env.socketTextStream("127.0.0.1", 9999);        text.flatMap((FlatMapFunction)(f, collector) -> {            String[] splits = f.split(",");            Stream.of(splits).forEach(collector::collect);        }).returns(String.class)                .map(new MapFunction>() {@Override                    public Tuple2 map(String value) throws Exception {return new Tuple2<>(1,Integer.parseInt(value));                    }                })                .keyBy(0)                .timeWindow(Time.seconds(5))                .reduce((x,y) -> new Tuple2<>(x.getField(0),(int)x.getField(1) + (int)y.getField(1)))                .print().setParallelism(1);        env.execute("ReduceWindowsApp");    }}

控制台输入

1,2,3,4,57,8,9

运行结果

(1,15)(1,24)

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.api.scala._object ReduceWindowsApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    val text = env.socketTextStream("127.0.0.1",9999)    text.flatMap(_.split(","))      .map(x => (1,x.toInt))      .keyBy(0)      .timeWindow(Time.seconds(5))      .reduce((x,y) => (x._1,x._2 + y._2))      .print()      .setParallelism(1)    env.execute("ReduceWindowsApp")  }}

ProcessFunction

这是一个全量函数,即它会把一个时间窗口内的所有数据一起处理

public class ProcessWindowsApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource text = env.socketTextStream("127.0.0.1", 9999);        text.flatMap((FlatMapFunction)(f, collector) -> {            String[] splits = f.split(",");            Stream.of(splits).forEach(collector::collect);        }).returns(String.class)                .map(new MapFunction>() {@Override                    public Tuple2 map(String value) throws Exception {return new Tuple2<>(1,Integer.parseInt(value));                    }                })                .keyBy(0)                .timeWindow(Time.seconds(5))                .process(new ProcessWindowFunction, Tuple2, Tuple, TimeWindow>() {@Override                    public void process(Tuple tuple, Context context, Iterable> elements, Collector> out) throws Exception {                        List> list = (List) elements;                        out.collect(list.stream().reduce((x, y) -> new Tuple2<>(x.getField(0), (int) x.getField(1) + (int) y.getField(1)))                                .get());                    }                }).print().setParallelism(1);        env.execute("ProcessWindowsApp");    }}

控制台输入

1,2,3,4,57,8,9

运行结果

(1,39)

Scala代码

import org.apache.flink.api.java.tuple.Tupleimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.ProcessWindowFunctionimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala._object ProcessWindowsApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    val text = env.socketTextStream("127.0.0.1",9999)    text.flatMap(_.split(","))      .map(x => (1,x.toInt))      .keyBy(0)      .timeWindow(Time.seconds(5))        .process(new ProcessWindowFunction[(Int,Int),(Int,Int),Tuple,TimeWindow] {          override def process(key: Tuple, context: Context, elements: Iterable[(Int, Int)], out: Collector[(Int, Int)]): Unit = {val list = elements.toList            out.collect(list.reduce((x,y) => (x._1,x._2 + y._2)))          }        })      .print()      .setParallelism(1)    env.execute("ReduceWindowsApp")  }}

Connector

Flink提供了很多内置的数据源或者输出的连接Connector,当前包括的有

Apache Kafka (source/sink)Apache Cassandra (sink)Amazon Kinesis Streams (source/sink)Elasticsearch (sink)Hadoop FileSystem (sink)RabbitMQ (source/sink)Apache NiFi (source/sink)Twitter Streaming API (source)

HDFS Connector

这是一个把数据流输出到Hadoop HDFS分布式文件系统的连接,要使用该连接,需要添加以下依赖

   org.apache.flink   flink-connector-filesystem_2.11   ${flink.version}   org.apache.hadoop   hadoop-client   ${hadoop.version}

版本号根据自己实际情况来选择,我这里hadoop的版本号为

2.8.1

在data下新建一个hdfssink的文件夹,此时文件夹内容为空

public class FileSystemSinkApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource data = env.socketTextStream("127.0.0.1", 9999);        String filePath = "/Users/admin/Downloads/flink/data/hdfssink";        BucketingSink sink = new BucketingSink<>(filePath);        sink.setBucketer(new DateTimeBucketer<>("yyyy-MM-dd--HHmm"));        sink.setWriter(new StringWriter<>());//        sink.setBatchSize(1024 * 1024 * 400);        sink.setBatchRolloverInterval(20);        data.addSink(sink);        env.execute("FileSystemSinkApp");    }}

我这里并没有真正使用hadoop的hdfs,hdfs的搭建可以参考Hadoop hdfs+Spark配置 。而是本地目录,在控制台随便输入

adfdsdfwfdgg

我们可以看到在hdfssink文件夹下面多了一个

2021-01-15--0627

的文件夹,进入该文件夹后可以看到3个文件

_part-4-0.pending _part-5-0.pending _part-6-0.pending

查看三个文件

(base) -bash-3.2$ cat _part-4-0.pending
adf

(base) -bash-3.2$ cat _part-5-0.pending
dsdf

(base) -bash-3.2$ cat _part-6-0.pending
wfdgg

BucketingSink其实是RichSinkFunction抽象类的子类,跟之前写的自定义Sink的SinkToMySQL是一样的。

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.fs.StringWriterimport org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer}object FileSystemSinkApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    val data = env.socketTextStream("127.0.0.1",9999)val filePath = "/Users/admin/Downloads/flink/data/hdfssink"    val sink = new BucketingSink[String](filePath)    sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))    sink.setWriter(new StringWriter[String]())//    sink.setBatchSize(1024 * 1024 * 400)    sink.setBatchRolloverInterval(20)    data.addSink(sink)    env.execute("FileSystemSinkApp")  }}

Kafka Connector

要使用Kafka Connector,当然首先必须安装Kafka。先安装一个zookeeper 3.4.5,kafka 1.1.1

由于我的Kafka是安装在阿里云上面的,本地访问需要配置一下,在kafka的config目录下修改server.properties

advertised.listeners=PLAINTEXT://外网IP:9092host.name=内网IP

同时阿里云需要开放9092端口

kafka启动

 ./kafka-server-start.sh -daemon ../config/server.properties

创建topic

 ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic pktest

此时我们进入zookeeper可以看到该topic

[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics[pktest, __consumer_offsets]

查看topic

./kafka-topics.sh --list --zookeeper localhost:2181

该命令返回的结果为

[bin]# ./kafka-topics.sh --list --zookeeper localhost:2181__consumer_offsetspktest

启动生产者

./kafka-console-producer.sh --broker-list localhost:9092 --topic pktest

但由于我们是在阿里云上面启动,则启动生产者需要更改为

./kafka-console-producer.sh --broker-list 外网ip:9092 --topic pktest

启动消费者

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic pktest

但由于我们是在阿里云上面启动,则启动消费者需要更改为

./kafka-console-consumer.sh --bootstrap-server 外网ip:9092 --topic pktest

此时我们在生产者窗口输入,消费者窗口这边就会获取

Kafka作为Source代码,添加依赖

   org.apache.flink   flink-connector-kafka_2.11   ${flink.version}   org.apache.kafka   kafka-clients   1.1.1
public class KafkaConnectorConsumerApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(4000);        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);        env.getCheckpointConfig().setCheckpointTimeout(10000);        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);        String topic = "pktest";        Properties properties = new Properties();        properties.setProperty("bootstrap.servers","外网ip:9092");        properties.setProperty("group.id","test");        DataStreamSource data = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties));        data.print().setParallelism(1);        env.execute("KafkaConnectorConsumerApp");    }}

运行结果

服务器输入

[bin]# ./kafka-console-producer.sh --broker-list 外网ip:9092 --topic pktest       >sdfa

打印

sdfa

Scala代码

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerimport org.apache.flink.api.scala._import org.apache.flink.streaming.api.CheckpointingModeobject KafkaConnectorConsumerApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.enableCheckpointing(4000)    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)    env.getCheckpointConfig.setCheckpointTimeout(10000)    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)val topic = "pktest"    val properties = new Properties    properties.setProperty("bootstrap.servers", "外网ip:9092")    properties.setProperty("group.id","test")val data = env.addSource(new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, properties))    data.print().setParallelism(1)    env.execute("KafkaConnectorConsumerApp")  }}

Kafka作为Sink

public class KafkaConnectorProducerApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(4000);        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);        env.getCheckpointConfig().setCheckpointTimeout(10000);        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);        DataStreamSource data = env.socketTextStream("127.0.0.1", 9999);        String topic = "pktest";        Properties properties = new Properties();        properties.setProperty("bootstrap.servers","外网ip:9092");        FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(topic,                new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),properties,                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);        data.addSink(kafkaSink);        env.execute("KafkaConnectorProducerApp");    }}

控制台输入

sdfaedffe

服务器打印

[bin]# ./kafka-console-consumer.sh --bootstrap-server 外网ip:9092 --topic pktest      sdfaedffe

Scala代码

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.CheckpointingModeimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerimport org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapperobject KafkaConnectorProducerApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.enableCheckpointing(4000)    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)    env.getCheckpointConfig.setCheckpointTimeout(10000)    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)val data = env.socketTextStream("127.0.0.1",9999)val topic = "pktest"    val properties = new Properties    properties.setProperty("bootstrap.servers", "外网ip:9092")val kafkaSink = new FlinkKafkaProducer[String](topic,      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),properties,      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)    data.addSink(kafkaSink)    env.execute("KafkaConnectorProducerApp")  }}

部署

单机部署

下载地址:https://archive.apache.org/dist/flink/flink-1.7.2/flink-1.7.2-bin-hadoop28-scala_2.11.tgz

由于我这里用的是1.7.2(当然你可以使用其他版本),下载解压缩后,进入bin目录,执行

./start-cluster.sh

进入web界面 http://外网ip:8081/

提交一个测试用例

nc -lk 9000

退出bin目录,返回上级目录,执行

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

此时在web界面中可以看到这个运行的任务

点RUNNING按钮见到的如下

虽然我们在nc中敲入一些字符,比如

a       f       g       r       a       da       f       g       r       a       da       f       g       r       a       d

但并没有打印的地方,我们查看结果需要在log目录下查看

[log]# cat flink-root-taskexecutor-0-iZ7xvi8yoh0wspvk6rjp7cZ.outa : 6d : 3r : 3g : 3f : 3 : 90

上传我们自己的jar包

上传之前,修改一下我们需要运行的main方法的类

         com.guanjian.flink.java.test.StreamingJavaApp   

由于我们代码的端口为9999,执行

nc -lk 9999

上传后执行(上传至flink的新建test目录下)

./bin/flink run test/flink-train-java-1.0.jar

nc下输入

a d e g a d g f

在log下执行

cat flink-root-taskexecutor-0-iZ7xvi8yoh0wspvk6rjp7cZ.out

可以看到除了之前的记录,多出了几条新的记录

a : 6d : 3r : 3g : 3f : 3 : 90(a,2)(f,1)(g,2)(e,1)(d,2)

Yarn集群部署

要进行Yarn集群部署,得要先安装Hadoop,我这里Hadoop的版本为2.8.1

进入Hadoop安装目录下的etc/hadoop文件夹

首先依然是hadoop-env.sh的配置,需要配置一下JAVA_HOME

export JAVA_HOME=/home/soft/jdk1.8.0_161

core-site.xml

    fs.defaultFS    hdfs://host1:8020

hdfs-site.xml

    dfs.namenode.name.dir    /opt/hadoop2/tmp/dfs/name    dfs.datanode.data.dir    /opt/hadoop2/tmp/dfs/data

此时需要新建这两个目录

mkdir -p /opt/hadoop2/tmp/dfs/namemkdir -p /opt/hadoop2/tmp/dfs/data

yarn-site.xml

    yarn.nodemanager.aux-services    mapreduce_shuffle    yarn.resourcemanager.hostname    host1     yarn.nodemanager.vmem-check-enabled     false  

mapred-site.xml

    mapreduce.framework.name    yarn

启动后,可以看到50070的hdfs页面以及8088的Yarn页面

在进行Flink的Yarn部署前需要配置HADOOP_HOME,此处包括JAVA_HOME

vim /etc/profile

JAVA_HOME=/home/soft/jdk1.8.0_161HADOOP_HOME=/home/soft/hadoop-2.8.1JRE_HOME=${JAVA_HOME}/jreCLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:${HADOOP_HOME}/bin:${HADOOP_HOME}/sbinPATH=${JAVA_HOME}/bin:${HADOOP_HOME}/bin:$PATHexport JAVA_HOME HADOOP_HOME PATH

保存后,source /etc/profile

第一种Yarn部署

在flink的bin目录下

./yarn-session.sh -n 1 -jm 1024m -tm 1024m

-n : taskManager的数量

-jm: jobManager的内存

-tm: taskManager的内存

此时在Yarn的Web页面(8088端口)可以看到

在我们的访问机上的/etc/hosts配置好host1的IP地址后,点击ApplicationMaster进入Flink的管理页面

提交代码任务

上传一份文件到hdfs的根目录

hdfs dfs -put LICENSE-2.0.txt /

提交代码任务,在flink的bin目录下

./flink run ../examples/batch/WordCount.jar -input hdfs://host1:8020/LICENSE-2.0.txt -output hdfs://host1:8020/wordcount-result.txt

运算完成后,查看hdfs的文件

[bin]# hdfs dfs -ls /Found 4 items-rw-r--r--   3 root supergroup      11358 2021-01-24 14:47 /LICENSE-2.0.txtdrwxr-xr-x   - root supergroup          0 2021-01-24 09:43 /abcddrwxr-xr-x   - root supergroup          0 2021-01-24 14:17 /user-rw-r--r--   3 root supergroup       4499 2021-01-24 15:06 /wordcount-result.txt

在Flink的页面也可以看到

第二种Yarn部署

要进行第二种Yarn部署,我们需要先取消第一种的配置

yarn application -kill application_1611471412139_0001

在flink的bin目录下

./flink run -m yarn-cluster -yn 1 ../examples/batch/WordCount.jar

-m : yarn集群,yarn-cluster为常量

-yn: taskManager的数量

此时在Yarn的Web界面也可以看到

提交我们自己的任务,将代码socket的IP改成host1

public class StreamingJavaApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        DataStreamSource text = env.socketTextStream("host1",9999);        text.flatMap(new FlatMapFunction>() {@Override            public void flatMap(String value, Collector> collector) throws Exception {                String[] tokens = value.toLowerCase().split(" ");                for (String token : tokens) {                    collector.collect(new Tuple2<>(token,1));                }            }        }).keyBy(0).timeWindow(Time.seconds(5))                .sum(1).print();        env.execute("StreamingJavaApp");    }}

启动nc

nc -lk 9999

./flink run -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar

输入

[~]# nc -lk 9999              a v d t e a d e fa v d t e a d e fa v d t e a d e fa v d t e a d e f

查看结果

在Flink的Web界面上

State

  1. State是指某一个具体的Task/Operator的状态

  2. State数据默认存放在JVM中

  3. 分类:Keyed State & Operator State

Keyed State

/** * 从一组数据中,每两个数据统计一次平均值 */public class KeyedStateApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.fromCollection(Arrays.asList(new Tuple2<>(1,3),                new Tuple2<>(1,5),                new Tuple2<>(1,7),                new Tuple2<>(1,4),                new Tuple2<>(1,2)))                .keyBy(ele -> ele.getField(0))                .flatMap(new RichFlatMapFunction, Tuple2>() {private transient ValueState> state;                    @Override                    public void open(Configuration parameters) throws Exception {super.open(parameters);                        state = getRuntimeContext().getState(new ValueStateDescriptor<>("avg",                                TypeInformation.of(new TypeHint>() {                                })));                    }@Override                    public void flatMap(Tuple2 value, Collector> out) throws Exception {                        Tuple2 tmpState = state.value();                        Tuple2 currentState = tmpState == null ? Tuple2.of(0,0) : tmpState;                        Tuple2 newState = new Tuple2<>((int) currentState.getField(0) + 1,(int) currentState.getField(1) + (int) value.getField(1));                        state.update(newState);                        if ((int) newState.getField(0) >= 2) {                            out.collect(new Tuple2<>(value.getField(0),(int) newState.getField(1) / (int) newState.getField(0)));                            state.clear();                        }                    }                }).print().setParallelism(1);        env.execute("KeyedStateApp");    }}

运行结果

(1,4)(1,5)

Scala代码

import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala.createTypeInformationobject KeyedStateApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2)))      .keyBy(_._1)      .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int)] {var state: ValueState[(Int,Int)] = _override def open(parameters: Configuration): Unit = {          state = getRuntimeContext.getState(new ValueStateDescriptor[(Int, Int)]("avg",            createTypeInformation[(Int,Int)]))        }override def flatMap(value: (Int, Int), out: Collector[(Int, Int)]) = {          val tmpState = state.value()          val currentState = if (tmpState != null) {            tmpState          } else {            (0,0)          }          val newState = (currentState._1 + 1,currentState._2 + value._2)          state.update(newState)          if (newState._1 >= 2) {            out.collect((value._1,newState._2 / newState._1))state.clear()          }        }      }).print().setParallelism(1)    env.execute("KeyedStateApp")  }}

运行结果

(1,4)(1,5)

Reducing State

/** * 统计数据条数,并加总 */public class ReducingStateApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.fromCollection(Arrays.asList(new Tuple2<>(1,3),                new Tuple2<>(1,5),                new Tuple2<>(1,7),                new Tuple2<>(1,4),                new Tuple2<>(1,2)))                .keyBy(ele -> ele.getField(0))                .flatMap(new RichFlatMapFunction, Tuple2>() {private transient ReducingState> state;                    @Override                    public void open(Configuration parameters) throws Exception {super.open(parameters);                        state = getRuntimeContext().getReducingState(new ReducingStateDescriptor<>("sum",                                new ReduceFunction>() {@Override                                    public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {                                        Tuple2 tuple2 = new Tuple2<>((int) value1.getField(0) + 1,                                                (int) value1.getField(1) + (int) value2.getField(1));                                        return tuple2;                                    }                                }, TypeInformation.of(new TypeHint>() {})));                    }@Override                    public void flatMap(Tuple2 value, Collector> out) throws Exception {                        Tuple2 tuple2 = new Tuple2<>(value.getField(0),                                value.getField(1));                        state.add(tuple2);                        out.collect(new Tuple2<>(state.get().getField(0),state.get().getField(1)));                    }                }).print().setParallelism(1);        env.execute("ReducingStateApp");    }}

运行结果

(2,8)(3,15)(4,19)(5,21)

Scala代码

import org.apache.flink.api.common.functions.{ReduceFunction, RichFlatMapFunction}import org.apache.flink.api.common.state.{ReducingState, ReducingStateDescriptor}import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorobject ReducingStateApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2)))      .keyBy(_._1)      .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int)] {var state: ReducingState[(Int,Int)] = _override def open(parameters: Configuration): Unit = {          state = getRuntimeContext.getReducingState(new ReducingStateDescriptor[(Int, Int)]("sum",            new ReduceFunction[(Int, Int)] {              override def reduce(value1: (Int, Int), value2: (Int, Int)): (Int, Int) = {                (value1._1 + 1,value1._2 + value2._2)              }            },            createTypeInformation[(Int,Int)]))        }override def flatMap(value: (Int, Int), out: Collector[(Int, Int)]) = {          val tuple2 = (value._1,value._2)          state.add(tuple2)          out.collect((state.get()._1,state.get()._2))        }      }).print().setParallelism(1)    env.execute("ReducingStateApp")  }}

运行结果

(2,8)(3,15)(4,19)(5,21)

List State

/** * 获取每一条所在的位置 */public class ListStateApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.fromCollection(Arrays.asList(new Tuple2<>(1,3),                new Tuple2<>(1,5),                new Tuple2<>(1,7),                new Tuple2<>(1,4),                new Tuple2<>(1,2)))                .keyBy(ele -> ele.getField(0))                .flatMap(new RichFlatMapFunction, Tuple3>() {private transient ListState state;                    @Override                    public void open(Configuration parameters) throws Exception {super.open(parameters);                        state = getRuntimeContext().getListState(new ListStateDescriptor<>("list",                                TypeInformation.of(new TypeHint() {})));                    }@Override                    public void flatMap(Tuple2 value, Collector> out) throws Exception {state.add(value.getField(0));                        Iterator iterator = state.get().iterator();                        Integer l = 0;                        while (iterator.hasNext()) {                            l += iterator.next();                        }                        Tuple3 tuple3 = new Tuple3<>(value.getField(0),value.getField(1),l);                        out.collect(tuple3);                    }                }).print().setParallelism(1);        env.execute("ListStateApp");    }}

运行结果

(1,3,1)(1,5,2)(1,7,3)(1,4,4)(1,2,5)

Scala代码

import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state.{ListState, ListStateDescriptor}import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorobject ListStateApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2)))      .keyBy(_._1)      .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int,Int)] {var state: ListState[Int] = _override def open(parameters: Configuration): Unit = {          state = getRuntimeContext.getListState(new ListStateDescriptor[Int]("list",            createTypeInformation[Int]));        }override def flatMap(value: (Int, Int), out: Collector[(Int, Int, Int)]) = {          state.add(value._1)          val iterator = state.get().iterator()          var l: Int = 0          while (iterator.hasNext) {            l += iterator.next()          }          val tuple3 = (value._1,value._2,l)          out.collect(tuple3)        }      }).print().setParallelism(1)    env.execute("ListStateApp")  }}

运行结果

(1,3,1)(1,5,2)(1,7,3)(1,4,4)(1,2,5)

Fold State

/** * 从某个初始值开始统计条数 */public class FoldStateApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.fromCollection(Arrays.asList(new Tuple2<>(1,3),                new Tuple2<>(1,5),                new Tuple2<>(1,7),                new Tuple2<>(1,4),                new Tuple2<>(1,2)))                .keyBy(ele -> ele.getField(0))                .flatMap(new RichFlatMapFunction, Tuple3>() {private transient FoldingState state;                    @Override                    public void open(Configuration parameters) throws Exception {super.open(parameters);                        state = getRuntimeContext().getFoldingState(new FoldingStateDescriptor("fold",                                1, (accumulator, value) -> accumulator + value,                                TypeInformation.of(new TypeHint() {})                        ));                    }@Override                    public void flatMap(Tuple2 value, Collector> out) throws Exception {state.add(value.getField(0));                        out.collect(new Tuple3<>(value.getField(0),value.getField(1),state.get()));                    }                }).print().setParallelism(1);        env.execute("FoldStateApp");    }}

运行结果

(1,3,2)(1,5,3)(1,7,4)(1,4,5)(1,2,6)

Scala代码

import org.apache.flink.api.common.functions.{FoldFunction, RichFlatMapFunction}import org.apache.flink.api.common.state.{FoldingState, FoldingStateDescriptor}import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorobject FoldStateApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2)))      .keyBy(_._1)      .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int,Int)] {var state: FoldingState[Int,Int] = _override def open(parameters: Configuration): Unit = {          state = getRuntimeContext.getFoldingState(new FoldingStateDescriptor[Int,Int]("fold",            1,new FoldFunction[Int,Int] {              override def fold(accumulator: Int, value: Int) = {                accumulator + value              }            }, createTypeInformation[Int]))        }override def flatMap(value: (Int, Int), out: Collector[(Int, Int, Int)]) = {          state.add(value._1)          out.collect((value._1,value._2,state.get()))        }      }).print().setParallelism(1)    env.execute("FoldStateApp")  }}

运行结果

(1,3,2)(1,5,3)(1,7,4)(1,4,5)(1,2,6)

Map State

/** * 将每一条的数据加上上一条的数据,第一条保持自身 */public class MapStateApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.fromCollection(Arrays.asList(new Tuple2<>(1,3),                new Tuple2<>(1,5),                new Tuple2<>(1,7),                new Tuple2<>(1,4),                new Tuple2<>(1,2)))                .keyBy(ele -> ele.getField(0))                .flatMap(new RichFlatMapFunction, Tuple2>() {private transient MapState state;                    @Override                    public void open(Configuration parameters) throws Exception {super.open(parameters);                        state = getRuntimeContext().getMapState(new MapStateDescriptor<>("map",                                TypeInformation.of(new TypeHint() {}),                                TypeInformation.of(new TypeHint() {})));                    }@Override                    public void flatMap(Tuple2 value, Collector> out) throws Exception {                        Integer tmp = state.get(value.getField(0));                        Integer current = tmp == null ? 0 : tmp;                        state.put(value.getField(0),value.getField(1));                        Tuple2 tuple2 = new Tuple2<>(value.getField(0),                                current + (int) value.getField(1));                        out.collect(tuple2);                    }                }).print().setParallelism(1);        env.execute("MapStateApp");    }}

运行结果

(1,3)(1,8)(1,12)(1,11)(1,6)

Scala代码

import org.apache.flink.api.common.functions.RichFlatMapFunctionimport org.apache.flink.api.common.state.{MapState, MapStateDescriptor}import org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorimport org.apache.flink.api.scala.createTypeInformationobject MapStateApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2)))      .keyBy(_._1)      .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int)] {var state: MapState[Int,Int] = _override def open(parameters: Configuration): Unit = {          state = getRuntimeContext.getMapState(new MapStateDescriptor[Int,Int]("map",            createTypeInformation[Int],createTypeInformation[Int]))        }override def flatMap(value: (Int, Int), out: Collector[(Int, Int)]) = {          val tmp: Int = state.get(value._1)          val current: Int = if (tmp == null) {0          } else {            tmp          }          state.put(value._1,value._2)          val tuple2 = (value._1,current + value._2)          out.collect(tuple2)        }      }).print().setParallelism(1)    env.execute("MapStateApp")  }}

运行结果

(1,3)(1,8)(1,12)(1,11)(1,6)

Aggregating State

/** * 求每一条数据跟之前所有数据的平均值 */public class AggregatingStateApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.fromCollection(Arrays.asList(new Tuple2<>(1,3),                new Tuple2<>(1,5),                new Tuple2<>(1,7),                new Tuple2<>(1,4),                new Tuple2<>(1,2)))                .keyBy(ele -> ele.getField(0))                .flatMap(new RichFlatMapFunction, Tuple2>() {private transient AggregatingState state;                    @Override                    public void open(Configuration parameters) throws Exception {super.open(parameters);                        state = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("agg",                                new AggregateFunction, Integer>() {@Override                                    public Tuple2 createAccumulator() {return new Tuple2<>(0,0);                                    }@Override                                    public Tuple2 add(Integer value, Tuple2 accumulator) {return new Tuple2<>((int) accumulator.getField(0) + value,                                                (int) accumulator.getField(1) + 1);                                    }@Override                                    public Integer getResult(Tuple2 accumulator) {return (int) accumulator.getField(0) / (int) accumulator.getField(1);                                    }@Override                                    public Tuple2 merge(Tuple2 a, Tuple2 b) {return new Tuple2<>((int) a.getField(0) + (int) b.getField(0),                                                (int) a.getField(1) + (int) b.getField(1));                                    }                                }, TypeInformation.of(new TypeHint>() {})));                    }@Override                    public void flatMap(Tuple2 value, Collector> out) throws Exception {state.add(value.getField(1));                        Tuple2 tuple2 = new Tuple2<>(value.getField(0),                                state.get());                        out.collect(tuple2);                    }                }).print().setParallelism(1);        env.execute("AggregatingStateApp");    }}

运行结果

(1,3)(1,4)(1,5)(1,4)(1,4)

Scala代码

import org.apache.flink.api.common.functions.{AggregateFunction, RichFlatMapFunction}import org.apache.flink.api.common.state.{AggregatingState, AggregatingStateDescriptor}import org.apache.flink.api.scala.createTypeInformationimport org.apache.flink.configuration.Configurationimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.util.Collectorobject AggregatingStateApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.fromCollection(List((1,3),(1,5),(1,7),(1,4),(1,2)))      .keyBy(_._1)      .flatMap(new RichFlatMapFunction[(Int,Int),(Int,Int)] {var state: AggregatingState[Int,Int] = _override def open(parameters: Configuration): Unit = {          state = getRuntimeContext.getAggregatingState(new AggregatingStateDescriptor[Int,(Int,Int),Int]("agg",            new AggregateFunction[Int,(Int,Int),Int] {              override def add(value: Int, accumulator: (Int, Int)) = {                (accumulator._1 + value,accumulator._2 + 1)              }              override def createAccumulator() = {                (0,0)              }              override def getResult(accumulator: (Int, Int)) = {                accumulator._1 / accumulator._2              }              override def merge(a: (Int, Int), b: (Int, Int)) = {                (a._1 + b._1,a._2 + b._2)              }            },createTypeInformation[(Int,Int)]))        }override def flatMap(value: (Int, Int), out: Collector[(Int, Int)]) = {          state.add(value._2)          val tuple2 = (value._1,state.get())          out.collect(tuple2)        }      }).print().setParallelism(1)    env.execute("AggregatingStateApp")  }}

运行结果

(1,3)(1,4)(1,5)(1,4)(1,4)

Checkpoint机制

Flink中的每一个算子都能成为有状态的,为了使得状态能够容错,持久化状态,就有了Checkpoint机制。Checkpoint能够恢复状态以及在流中消费的位置,提供一种无故障执行的方式。

默认情况下,checkpoint机制是禁用的,需要我们手动开启。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//开启Checkpoint,间隔时间4秒进行一次Checkpointenv.enableCheckpointing(4000);//设置Checkpoint的模式,精准一次,也是Checkpoint默认的方式,适合大部分应用,//还有一种CheckpointingMode.AT_LEAST_ONCE最少一次,一般用于超低延迟的场景env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置Checkpoint的超时时间,这里是10秒env.getCheckpointConfig().setCheckpointTimeout(10000);//设置Checkpoint的并发数,可以1个,可以多个env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
public class CheckpointApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(5000);        DataStreamSource stream = env.socketTextStream("127.0.0.1", 9999);        stream.map(x -> {if (x.contains("pk")) {throw new RuntimeException("出bug了...");            }else {return x;            }        }).print().setParallelism(1);        env.execute("CheckpointApp");    }}

按照一般的情况,如果我们没有开启nc -lk 9999,则程序会直接挂掉,但是我们这里开启了Checkpoint,此时虽然9999端口没有开启,但它会一直试图连接9999端口,并不会挂掉,而Checkpoint的重试次数为Integer.MAX_VALUE,所以我们会一直看到这样的日志

java.net.ConnectException: Connection refused (Connection refused)        at java.net.PlainSocketImpl.socketConnect(Native Method)        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)        at java.net.Socket.connect(Socket.java:589)        at org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction.run(SocketTextStreamFunction.java:96)        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)        at java.lang.Thread.run(Thread.java:748)

Scala代码

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object CheckpointApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.enableCheckpointing(5000)val stream = env.socketTextStream("127.0.0.1",9999)    stream.map(x => {      if (x.contains("pk")) {throw new RuntimeException("出bug了...")      } else {        x      }    }).print().setParallelism(1)    env.execute("CheckpointApp")  }}

重启策略

就像我们刚才看到的,如果不设置重启策略,则Checkpoint会有一个默认的重启策略,次数为Integer.MAX_VALUE,延迟为1秒。如果我们只想重启两次,就需要设置重启策略,重启策略的设置可以在Flink的配置文件中设置,也可以在代码中设置

如在flink的conf目录下编辑flink-conf.yaml

restart-strategy.fixed-delay.attempts: 2restart-strategy.fixed-delay.delay: 5 s

失败后重启次数2,延迟时间间隔5秒

代码中设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.enableCheckpointing(5000);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS)));

这里需要注意的是使用重启策略,必须开启Checkpoint机制,否则无效

public class CheckpointApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(5000);        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS)));        DataStreamSource stream = env.socketTextStream("127.0.0.1", 9999);        stream.map(x -> {if (x.contains("pk")) {throw new RuntimeException("出bug了...");            }else {return x;            }        }).print().setParallelism(1);        env.execute("CheckpointApp");    }}

当我们打开nc -lk 9999,再运行该程序,当我们在控制台输出2次pk,程序虽然会抛出异常

java.lang.RuntimeException: 出bug了...        at com.guanjian.flink.java.test.CheckpointApp.lambda$main$95f17bfa$1(CheckpointApp.java:18)        at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)        at java.lang.Thread.run(Thread.java:748)

但不会挂掉,当我们输入第三次pk的时候,程序就会彻底挂掉

Scala代码

import java.util.concurrent.TimeUnitimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.time.Timeimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._object CheckpointApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.enableCheckpointing(5000)    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS)))val stream = env.socketTextStream("127.0.0.1",9999)    stream.map(x => {      if (x.contains("pk")) {throw new RuntimeException("出bug了...")      } else {        x      }    }).print().setParallelism(1)    env.execute("CheckpointApp")  }}

StateBackend

默认情况下,Checkpoint的State是存储在内存中,一旦我们的程序挂掉了,重新启动,那么之前的状态都会丢失,比方说之前我们在nc中输入了

a,a,a

以之前的CheckpointApp来说,我们稍作修改

public class CheckpointApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(5000);        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS)));        DataStreamSource stream = env.socketTextStream("127.0.0.1", 9999);        stream.map(x -> {if (x.contains("pk")) {throw new RuntimeException("出bug了...");            }else {return x;            }        }).flatMap(new FlatMapFunction>() {@Override            public void flatMap(String value, Collector> out) throws Exception {                String[] splits = value.split(",");                Stream.of(splits).forEach(token -> out.collect(new Tuple2<>(token,1)));            }        }).keyBy(0).sum(1)                .print().setParallelism(1);        env.execute("CheckpointApp");    }}

运行结果为

(a,1)(a,2)(a,3)

这个是没有问题的,现在一旦程序挂掉,再次启动程序的时候,我们再做相同的处理,结果不变。

但如果我们并不希望这样的结果,我们希望得到的结果是

(a,4)(a,5)(a,6)

保留之前挂掉前的结果继续累加

public class CheckpointApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(5000);        //非内存的外部扩展        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);        //State以文件方式存储        env.setStateBackend(new FsStateBackend("hdfs://172.18.114.236:8020/backend"));        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS)));        DataStreamSource stream = env.socketTextStream("host1", 9999);        stream.map(x -> {if (x.contains("pk")) {throw new RuntimeException("出bug了...");            }else {return x;            }        }).flatMap(new FlatMapFunction>() {@Override            public void flatMap(String value, Collector> out) throws Exception {                String[] splits = value.split(",");                Stream.of(splits).forEach(token -> out.collect(new Tuple2<>(token,1)));            }        }).keyBy(0).sum(1)                .print().setParallelism(1);        env.execute("CheckpointApp");    }}

pom中调整运行的主类

   com.guanjian.flink.java.test.CheckpointApp

打包上传服务器flink的test的目录

修改flink的conf目录下的flink-conf.yaml,补充以下内容

state.backend: filesystemstate.checkpoints.dir: hdfs://172.18.114.236:8020/backendstate.savepoints.dir: hdfs://172.18.114.236:8020/backend

在HDFS中新建backend目录

hdfs dfs -mkdir /backend

重启Flink,开启

nc -lk 9999

第一次提交方式不变

./flink run -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar

继续之前的输入

a,a,a

此时停掉flink提交的程序,会在hdfs中发现一个很多数字的文件夹

现在我们再次启动程序,不过跟之前有些不同

./flink run -s hdfs://172.18.114.236:8020/backend/4db93b564e17b3806230f7c2d053121e/chk-5 -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar

此时在nc中继续输入

a,a,a

运行结果就达到了我们的预期

Scala代码

import java.util.concurrent.TimeUnitimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.time.Timeimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.runtime.state.filesystem.FsStateBackendimport org.apache.flink.streaming.api.environment.CheckpointConfigobject CheckpointApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.enableCheckpointing(5000)    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)    env.setStateBackend(new FsStateBackend("hdfs://172.18.114.236:8020/backend"))    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS)))val stream = env.socketTextStream("host1",9999)    stream.map(x => {      if (x.contains("pk")) {throw new RuntimeException("出bug了...")      } else {        x      }    }).flatMap(_.split(","))      .map((_,1))      .keyBy(0)      .sum(1)      .print().setParallelism(1)    env.execute("CheckpointApp")  }}

RocksDBStateBackend

要使用RocksDBBackend需要先添加依赖

   org.apache.flink   flink-statebackend-rocksdb_2.11   ${flink.version}
public class CheckpointApp {public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.enableCheckpointing(5000);        //非内存的外部扩展        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);        //State以RockDB数据库存储,并刷到hdfs上面去        env.setStateBackend(new RocksDBStateBackend("hdfs://172.18.114.236:8020/backend/rocksDB",true));        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS)));        DataStreamSource stream = env.socketTextStream("host1", 9999);        stream.map(x -> {if (x.contains("pk")) {throw new RuntimeException("出bug了...");            }else {return x;            }        }).flatMap(new FlatMapFunction>() {@Override            public void flatMap(String value, Collector> out) throws Exception {                String[] splits = value.split(",");                Stream.of(splits).forEach(token -> out.collect(new Tuple2<>(token,1)));            }        }).keyBy(0).sum(1)                .print().setParallelism(1);        env.execute("CheckpointApp");    }}

打包上传服务器flink的test目录下

创建hdfs的目录

hdfs dfs -mkdir /backend/rocksDB

配置flink的flink-conf.yaml,修改和添加以下内容

state.backend: rocksdbstate.checkpoints.dir: hdfs://172.18.114.236:8020/backend/rocksDBstate.savepoints.dir: hdfs://172.18.114.236:8020/backend/rocksDBstate.backend.incremental: truestate.backend.rocksdb.checkpoint.transfer.thread.num: 1state.backend.rocksdb.localdir: /raid/db/flink/checkpointsstate.backend.rocksdb.timer-service.factory: HEAP

重启Flink.执行

nc -lk 9999

第一次提交方式不变

./flink run -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar

继续之前的输入

a,a,a

此时停掉flink提交的程序,会在hdfs中发现一个很多数字的文件夹

在某台集群服务器上,这里只能说是某台,不一定是你提交任务的那台服务器,可以看到rocksdb的本地数据文件

rocksdbbackend是先将数据存储到该处,再刷到hdfs中的

再次启动程序

./flink run -s hdfs://172.18.114.236:8020/backend/rocksDB/6277c8adfba91c72baa384a0d23581d9/chk-64 -m yarn-cluster -yn 1 ../test/flink-train-java-1.0.jar

此时输入

a,a,a

此时我们去观察结果跟之前相同

Scala代码

import java.util.concurrent.TimeUnitimport org.apache.flink.api.common.restartstrategy.RestartStrategiesimport org.apache.flink.api.common.time.Timeimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.api.scala._import org.apache.flink.contrib.streaming.state.RocksDBStateBackendimport org.apache.flink.streaming.api.environment.CheckpointConfigobject CheckpointApp {  def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment    env.enableCheckpointing(5000)    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)    env.setStateBackend(new RocksDBStateBackend("hdfs://172.18.114.236:8020/backend/rocksDB",true))    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.of(5, TimeUnit.SECONDS)))val stream = env.socketTextStream("host1",9999)    stream.map(x => {      if (x.contains("pk")) {throw new RuntimeException("出bug了...")      } else {        x      }    }).flatMap(_.split(","))      .map((_,1))      .keyBy(0)      .sum(1)      .print().setParallelism(1)    env.execute("CheckpointApp")  }}

监控与调优

HistoryServer

HistoryServer是用来查看已经运行过的Job的信息

在flink的conf目录下编辑flink-conf.yaml,添加一下内容

jobmanager.archive.fs.dir: hdfs://172.18.114.236:8020/completed-jobs/historyserver.web.address: 0.0.0.0historyserver.web.port: 8082historyserver.archive.fs.dir: hdfs://172.18.114.236:8020/completed-jobs/historyserver.archive.fs.refresh-interval: 10000

在bin目录下运行

./historyserver.sh start

在浏览器中访问 外网ip:8082,可以看到一个Web界面(刚进来的时候这里是没有内容的,我这里是运行了一个Job以后留下的)

照例,我们运行一个任务,结束后,可以看到以下的信息

在hdfs中也可以看到任务保留下来的信息

信息有提供REST API接口可以用Json格式进行访问,例如

"Flink技术的使用方法有哪些"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0