千家信息网

Flink开发怎样进行实时处理应用程序

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,本篇文章为大家展示了Flink开发怎样进行实时处理应用程序,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。使用Flink + java实现需求环境JDK:1.8
千家信息网最后更新 2025年01月24日Flink开发怎样进行实时处理应用程序

本篇文章为大家展示了Flink开发怎样进行实时处理应用程序,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

使用Flink + java实现需求

环境

JDK:1.8

Maven:3.6.1(最低Maven 3.0.4

使用上一节中的springboot-flink-train项目

开发步骤

第一步:创建流处理上下文环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

第二步:读取数据,使用socket流方式读取数据

DataStreamSource text = env.socketTextStream("192.168.152.45", 9999);

第三步:transform

        text.flatMap(new FlatMapFunction>() {            @Override            public void flatMap(String value, Collector> out) throws Exception {                String[] tokens = value.toLowerCase().split(",");                for(String token: tokens) {                    if(token.length() > 0) {                        out.collect(new Tuple2(token, 1));                    }                }            }        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();

这里我们使用逗号分隔,然后跟批处理不同的是,这里使用keyBy(0),而不是groupBy(0)。timewindow表示每隔多久执行一次。

第四步:执行

env.execute("StreamingWCJavaApp");

整体代码如下:

/** * 使用Java API来开发Flink的实时处理应用程序 * wc统计的数据源自socket */public class StreamingWCJava02App {    public static void main(String[] args) throws Exception {        // 获取参数        int port;        try{            ParameterTool tool = ParameterTool.fromArgs(args);            port = tool.getInt("port");        } catch (Exception e) {            System.out.println("端口未设置, 使用默认端口9999");            port = 9999;        }        // step1: 获取流处理上下文环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // step2: 读取数据        DataStreamSource text = env.socketTextStream("192.168.152.45", port);        // step3: transform        text.flatMap(new FlatMapFunction>() {            @Override            public void flatMap(String value, Collector> out) throws Exception {                String[] tokens = value.toLowerCase().split(",");                for(String token: tokens) {                    if(token.length() > 0) {                        out.collect(new Tuple2(token, 1));                    }                }            }        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print();        env.execute("StreamingWCJavaApp");    }}

运行

首先在192.168.152.45上运行命令

nc -l 9999

然后在运行main方法。在192.168.152.45的nc上输入

abc,def,abc,ddd

在idea控制台输出如下:

4> (abc,2)1> (def,1)4> (ddd,1)

这个前面的"4>"表示并行度。我们可以设置setParallelism(1)来忽略这个问题。如下所示:

        text.flatMap(new FlatMapFunction>() {            @Override            public void flatMap(String value, Collector> out) throws Exception {                String[] tokens = value.toLowerCase().split(",");                for(String token: tokens) {                    if(token.length() > 0) {                        out.collect(new Tuple2(token, 1));                    }                }            }        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

这样控制台的打印结果如下:

(abc,2)(ddd,1)(def,1)

这样一个简单的demo就成功了!

重构代码

上面的代码中localhost与port需要用参数传递进来。

代码如下:

        // 获取参数        int port;        try{            ParameterTool tool = ParameterTool.fromArgs(args);            port = tool.getInt("port");        } catch (Exception e) {            System.out.println("端口未设置, 使用默认端口9999");            port = 9999;        }

使用Flink提供的ParameterTool来接收参数。

我们在运行时就可以指定参数列表了,其中的key必须以"-"或者"--"开头。

在运行时,配置参数:

这样运行就可以从外界传递参数了

使用Flink + Scala实现需求

接下来使用Scala方式实现,在项目springboot-flink-train-scala中新建StreamingWCScalaApp,内容如下:

/**  * 使用Scala开发Flink的实时处理应用程序  */object StreamingWCScalaApp {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    // 引入隐式转换    import org.apache.flink.api.scala._    val text = env.socketTextStream("192.168.152.45", 9999)    text.flatMap(_.split(","))        .map((_,1))        .keyBy(0)        .timeWindow(Time.seconds(5))        .sum(1)        .print()        .setParallelism(1)    env.execute("StreamingWCScalaApp");  }}

这种方式比java实现更加简洁。

上述内容就是Flink开发怎样进行实时处理应用程序,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注行业资讯频道。

0