千家信息网

Flink开发如何批处理应用程序

发表于:2024-10-19 作者:千家信息网编辑
千家信息网最后更新 2024年10月19日,Flink开发如何批处理应用程序,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。需求词频统计,即给一个文件,统计文件中每个
千家信息网最后更新 2024年10月19日Flink开发如何批处理应用程序

Flink开发如何批处理应用程序,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

需求

词频统计,即给一个文件,统计文件中每个单词出现的次数,分隔符是\t。这个文件内容如下:

hello    world    welcomehello    welcome

统计结果直接打印在控制台。生产环境下一般Sink到目的地。

使用Flink + java实现需求

环境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

创建项目

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local

groupId: com.vincent artifactId: springboot-flink-train version:1.0 这样就创建了一个项目,使用Idea导入这个项目,项目结构如下:

里面有两个自动为我们准备好的java类。

开发步骤

第一步:创建批处理上下文环境

// set up the batch execution environmentfinal ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

第二步:读取数据

env.readTextFile(textPath);

第三步:transform operations,例如 filter() flatMap() join() coGroup(),这是开发的核心所在,一般就是业务逻辑

第四步:execute program

具体操作

第一步:读取数据

hello welcome

第二步:每一行的数据按照指定的分隔符拆分

hellowelcome

第三步:为每一个单词赋上次数为1

(hello,1)(welcome,1)

第四步:合并操作

代码实现

/** * 使用Java API来开发Flink的批处理应用程序 */public class BatchWCJavaApp {    public static void main(String[] args) throws Exception {        String input = "E:/test/input/test.txt";        // step1: 获取运行环境        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();        // step2: 读取数据        DataSource text = env.readTextFile(input);        // step3: transform        // FlatMapFunction表示进来一个String, 转换成一个类型        text.flatMap(new FlatMapFunction>() {            /**             *             * @param value 就是一行一行的字符串             * @param out 转换成(单词,次数)             * @throws Exception             */            @Override            public void flatMap(String value, Collector> out) throws Exception {                String[] tokens = value.toLowerCase().split("\t");                for(String token: tokens) {                    if(token.length() > 0) {                        out.collect(new Tuple2(token, 1));                    }                }            }        }).groupBy(0).sum(1).print();    }}

运行结果

(world,1)(hello,2)(welcome,2)

使用Flink + scala实现需求

环境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

创建项目,跟使用java方式是一样的

mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.8.1 -DarchetypeCatalog=local

groupId: com.vincent artifactId: springboot-flink-train-scala version:1.0 这样就创建了一个项目,使用Idea导入这个项目:

接下来的开发步骤与使用java实现的开发步骤是一样的:这里给出

代码实现

import org.apache.flink.api.scala.ExecutionEnvironment/**  * 使用Scala开发Flink的批处理应用程序  */object BatchWCScalaApp {  def main(args: Array[String]): Unit = {    val input = "E:/test/input/test.txt"    val env = ExecutionEnvironment.getExecutionEnvironment    val text = env.readTextFile(input)    // 引入隐式转换    import org.apache.flink.api.scala._    text.flatMap(_.toLowerCase.split("\t"))      .filter(_.nonEmpty)      .map((_, 1))      .groupBy(0)      .sum(1)      .print()  }}

Java与Scala实现方式对比

算子与简洁性

也就是transform部分虽然原理是一样的,但是实现的方式不一样,scala更加简洁

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

0