千家信息网

ApacheFlink中Flink数据流编程是怎样的

发表于:2024-11-11 作者:千家信息网编辑
千家信息网最后更新 2024年11月11日,这期内容当中小编将会给大家带来有关ApacheFlink中Flink数据流编程是怎样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。数据源可以通过StreamExe
千家信息网最后更新 2024年11月11日ApacheFlink中Flink数据流编程是怎样的

这期内容当中小编将会给大家带来有关ApacheFlink中Flink数据流编程是怎样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

数据源可以通过StreamExecutionEnvironment.addSource(sourceFunction)方式来创建,Flink也提供了一些内置的数据源方便使用,例如readTextFile(path) readFile(),当然,也可以写一个自定义的数据源(可以通过实现SourceFunction方法,但是无法并行执行。或者实现可以并行实现的接口ParallelSourceFunction或者继承RichParallelSourceFunction)

入门

首先做一个简单入门,建立一个DataStreamSourceApp

Scala

object DataStreamSourceApp {  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    socketFunction(env)        env.execute("DataStreamSourceApp")  }  def socketFunction(env: StreamExecutionEnvironment): Unit = {    val data=env.socketTextStream("192.168.152.45", 9999)    data.print()  }}

这个方法将会从socket中读取数据,因此我们需要在192.168.152.45中开启服务:

nc -lk 9999

然后运行DataStreamSourceApp,在服务器上输入:

iie4bu@swarm-manager:~$ nc -lk 9999apacheflinkspark

在控制台中也会输出:

3> apache4> flink1> spark

前面的 341表示的是并行度。可以通过设置setParallelism来操作:

data.print().setParallelism(1)

Java

public class JavaDataStreamSourceApp {    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();        socketFunction(environment);        environment.execute("JavaDataStreamSourceApp");    }    public static void socketFunction(StreamExecutionEnvironment executionEnvironment){        DataStreamSource data = executionEnvironment.socketTextStream("192.168.152.45", 9999);        data.print().setParallelism(1);    }}

自定义添加数据源方式

Scala

实现SourceFunction接口

这种方式不能并行处理。

新建一个自定义数据源

class CustomNonParallelSourceFunction extends SourceFunction[Long]{  var count=1L  var isRunning = true  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {    while (isRunning){      ctx.collect(count)      count+=1      Thread.sleep(1000)    }  }  override def cancel(): Unit = {    isRunning = false  }}

这个方法首先定义一个初始值count=1L,然后执行的run方法,方法主要是输出count,并且执行加一操作,当执行cancel方法时结束。调用方法如下:

  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    //    socketFunction(env)    nonParallelSourceFunction(env)    env.execute("DataStreamSourceApp")  }  def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {    val data=env.addSource(new CustomNonParallelSourceFunction())    data.print()  }

输出结果就是控制台一直输出count值。

无法设置并行度,除非设置并行度是1.

val data=env.addSource(new CustomNonParallelSourceFunction()).setParallelism(3)

那么控制台报错:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source        at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)        at com.vincent.course05.DataStreamSourceApp$.nonParallelSourceFunction(DataStreamSourceApp.scala:16)        at com.vincent.course05.DataStreamSourceApp$.main(DataStreamSourceApp.scala:11)        at com.vincent.course05.DataStreamSourceApp.main(DataStreamSourceApp.scala)

继承ParallelSourceFunction方法

import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction}class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{  var isRunning = true  var count = 1L  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {    while(isRunning){      ctx.collect(count)      count+=1      Thread.sleep(1000)    }  }  override def cancel(): Unit = {    isRunning=false  }}

方法的功能跟上面是一样的。main方法如下:

  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    //    socketFunction(env)//    nonParallelSourceFunction(env)    parallelSourceFunction(env)    env.execute("DataStreamSourceApp")  }  def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = {    val data=env.addSource(new CustomParallelSourceFunction()).setParallelism(3)    data.print()  }

可以设置并行度3,输出结果如下:

2> 11> 12> 12> 23> 23> 23> 34> 34> 3

继承RichParallelSourceFunction方法

class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] {  var isRunning = true  var count = 1L  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {    while (isRunning) {      ctx.collect(count)      count += 1      Thread.sleep(1000)    }  }  override def cancel(): Unit = {    isRunning = false  }}
  def main(args: Array[String]): Unit = {    val env = StreamExecutionEnvironment.getExecutionEnvironment    //    socketFunction(env)    //    nonParallelSourceFunction(env)//    parallelSourceFunction(env)    richParallelSourceFunction(env)    env.execute("DataStreamSourceApp")  }  def richParallelSourceFunction(env: StreamExecutionEnvironment): Unit = {    val data = env.addSource(new CustomRichParallelSourceFunction()).setParallelism(3)    data.print()  }

Java

实现SourceFunction接口

import org.apache.flink.streaming.api.functions.source.SourceFunction;public class JavaCustomNonParallelSourceFunction implements SourceFunction {    boolean isRunning = true;    long count = 1;    @Override    public void run(SourceFunction.SourceContext ctx) throws Exception {        while (isRunning) {            ctx.collect(count);            count+=1;            Thread.sleep(1000);        }    }    @Override    public void cancel() {        isRunning=false;    }}
    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//        socketFunction(environment);        nonParallelSourceFunction(environment);        environment.execute("JavaDataStreamSourceApp");    }    public static void nonParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){        DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction());        data.print().setParallelism(1);    }

当设置并行度时:

        DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction()).setParallelism(2);

那么报错异常:

Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source        at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55)        at com.vincent.course05.JavaDataStreamSourceApp.nonParallelSourceFunction(JavaDataStreamSourceApp.java:16)        at com.vincent.course05.JavaDataStreamSourceApp.main(JavaDataStreamSourceApp.java:10)

实现ParallelSourceFunction接口

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;public class JavaCustomParallelSourceFunction implements ParallelSourceFunction {    boolean isRunning = true;    long count = 1;    @Override    public void run(SourceContext ctx) throws Exception {        while (isRunning) {            ctx.collect(count);            count+=1;            Thread.sleep(1000);        }    }    @Override    public void cancel() {        isRunning=false;    }}
    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//        socketFunction(environment);//        nonParallelSourceFunction(environment);        parallelSourceFunction(environment);        environment.execute("JavaDataStreamSourceApp");    }    public static void parallelSourceFunction(StreamExecutionEnvironment executionEnvironment){        DataStreamSource data = executionEnvironment.addSource(new JavaCustomParallelSourceFunction()).setParallelism(2);        data.print().setParallelism(1);    }

可以设置并行度,输出结果:

1122334455

继承抽象类RichParallelSourceFunction

public class JavaCustomRichParallelSourceFunction extends RichParallelSourceFunction {    boolean isRunning = true;    long count = 1;    @Override    public void run(SourceContext ctx) throws Exception {        while (isRunning) {            ctx.collect(count);            count+=1;            Thread.sleep(1000);        }    }    @Override    public void cancel() {        isRunning=false;    }}
    public static void main(String[] args) throws Exception {        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//        socketFunction(environment);//        nonParallelSourceFunction(environment);//        parallelSourceFunction(environment);        richpParallelSourceFunction(environment);        environment.execute("JavaDataStreamSourceApp");    }    public static void richpParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){        DataStreamSource data = executionEnvironment.addSource(new JavaCustomRichParallelSourceFunction()).setParallelism(2);        data.print().setParallelism(1);    }

输出结果:

112233445566

SourceFunction ParallelSourceFunction RichParallelSourceFunction类之间的关系

上述就是小编为大家分享的ApacheFlink中Flink数据流编程是怎样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注行业资讯频道。

0