千家信息网

Dstream的创建方法

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,这篇文章主要介绍"Dstream的创建方法",在日常操作中,相信很多人在Dstream的创建方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Dstream的创建方法"
千家信息网最后更新 2025年01月23日Dstream的创建方法

这篇文章主要介绍"Dstream的创建方法",在日常操作中,相信很多人在Dstream的创建方法问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Dstream的创建方法"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

1. RDD队列(了解)

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

案例

object SparkStreaming02_RDDQueue {  def main(args: Array[String]): Unit = {    //创建配置文件对象    val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]")    //创建SparkStreaming上下文环境对象    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))    //创建队列,里面放的是RDD    val rddQueue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()    //从队列中采集数据,获取DS    val queueDS: InputDStream[Int] = ssc.queueStream(rddQueue,false)    //处理采集到的数据    val resDS: DStream[(Int, Int)] = queueDS.map((_,1)).reduceByKey(_+_)    //打印结果    resDS.print()    //启动采集器    ssc.start()    //循环创建RDD,并将创建的RDD放到队列里    for( i <- 1 to 5){      rddQueue.enqueue(ssc.sparkContext.makeRDD(6 to 10))      Thread.sleep(2000)    }    ssc.awaitTermination()  }}

2. 自定义数据源(某些场景需要自定义)

需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。

用一个案例来说明

/**  * Author: Felix  * Date: 2020/5/20  * Desc: 通过自定义数据源方式创建DStream  *     模拟从指定的网络端口获取数据  */object SparkStreaming03_CustomerReceiver {  def main(args: Array[String]): Unit = {    //创建配置文件对象    val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]")    //创建SparkStreaming上下文环境对象    val ssc: StreamingContext = new StreamingContext(conf,Seconds(3))    //通过自定义数据源创建Dstream    val myDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("hadoop202",9999))    //扁平化    val flatMapDS: DStream[String] = myDS.flatMap(_.split(" "))    //结构转换  进行计数    val mapDS: DStream[(String, Int)] = flatMapDS.map((_,1))    //聚合    val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_+_)    //打印输出    reduceDS.print    ssc.start()    ssc.awaitTermination()  }}//Receiver[T]  泛型表示的是 读取的数据类型class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){  private var socket: Socket = _  // 真正的处理接收数据的逻辑  def receive() {    try {      //创建连接      socket = new Socket(host,port)      //根据连接对象获取输入流      val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream,StandardCharsets.UTF_8))      //定义一个变量,用于接收读取到的一行数据      var input:String = null      while((input = reader.readLine())!= null){        store(input)      }    } catch {      case e: ConnectException =>        restart(s"Error connecting to $host:$port", e)        return    } finally {      onStop()    }  }  override def onStart(): Unit = {    new Thread("Socket Receiver") {      setDaemon(true)      override def run() { receive() }    }.start()  }  override def onStop(): Unit = {    synchronized {      if (socket != null) {        socket.close()        socket = null      }    }  }}

3. kafka数据源(重要)

1. 版本选型

2. Kafka 0-8 Receive模式

  1. 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

  2. 导入依赖

    org.apache.spark    spark-streaming-kafka-0-8_2.11    2.1.1
  1. 编写代码 0-8Receive模式,offset维护在zk中,程序停止后,继续生产数据,再次启动程序,仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看

object Spark04_ReceiverAPI {  def main(args: Array[String]): Unit = {    //1.创建SparkConf    val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")    //2.创建StreamingContext    val ssc = new StreamingContext(sparkConf, Seconds(3))    //3.使用ReceiverAPI读取Kafka数据创建DStream    val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,      "hadoop202:2181,hadoop203:2181,hadoop204:2181",      "bigdata",      //v表示的主题的分区数      Map("mybak" -> 2))    //4.计算WordCount并打印        new KafkaProducer[String,String]().send(new ProducerRecord[]())    val lineDStream: DStream[String] = kafkaDStream.map(_._2)    val word: DStream[String] = lineDStream.flatMap(_.split(" "))    val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1))    val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _)    wordToCountDStream.print()    //5.开启任务    ssc.start()    ssc.awaitTermination()  }}

3. Kafka 0-8 Direct模式

  1. 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

  2. 导入依赖

    org.apache.spark    spark-streaming-kafka-0-8_2.11    2.1.1
  1. 编写代码(自动维护offset1)

offset维护在checkpoint中,但是获取StreamingContext的方式需要改变,目前这种方式会丢失消息

object Spark05_DirectAPI_Auto01 {  def main(args: Array[String]): Unit = {    //1.创建SparkConf    val sparkConf: SparkConf = new SparkConf().setAppName("Spark05_DirectAPI_Auto01").setMaster("local[*]")    //2.创建StreamingContext    val ssc = new StreamingContext(sparkConf, Seconds(3))    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")    //3.准备Kafka参数    val kafkaParams: Map[String, String] = Map[String, String](      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"    )    //4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream    val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,      kafkaParams,      Set("mybak"))    //5.计算WordCount并打印    kafkaDStream.map(_._2)      .flatMap(_.split(" "))      .map((_, 1))      .reduceByKey(_ + _)      .print()    //6.开启任务    ssc.start()    ssc.awaitTermination()  }}
  1. 编写代码(自动维护offset2)

offset维护在checkpoint中,获取StreamingContext为getActiveOrCreate

这种方式缺点:

  • checkpoint小文件过多

  • checkpoint记录最后一次时间戳,再次启动的时候会把间隔时间的周期再执行一次

object Spark06_DirectAPI_Auto02 {  def main(args: Array[String]): Unit = {    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("D:\\dev\\workspace\\my-bak\\spark-bak\\cp", () => getStreamingContext)    ssc.start()    ssc.awaitTermination()  }  def getStreamingContext: StreamingContext = {    //1.创建SparkConf    val sparkConf: SparkConf = new SparkConf().setAppName("DirectAPI_Auto01").setMaster("local[*]")        //2.创建StreamingContext    val ssc = new StreamingContext(sparkConf, Seconds(3))    ssc.checkpoint("D:\\dev\\workspace\\my-bak\\spark-bak\\cp")        //3.准备Kafka参数    val kafkaParams: Map[String, String] = Map[String, String](      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"    )        //4.使用DirectAPI自动维护offset的方式读取Kafka数据创建DStream    val kafkaDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,      kafkaParams,      Set("mybak"))        //5.计算WordCount并打印    kafkaDStream.map(_._2)      .flatMap(_.split(" "))      .map((_, 1))      .reduceByKey(_ + _)      .print()        //6.返回结果    ssc  }}
  1. 编写代码(手动维护offset)

object Spark07_DirectAPI_Handler {  def main(args: Array[String]): Unit = {    //1.创建SparkConf    val conf: SparkConf = new SparkConf().setAppName("DirectAPI_Handler").setMaster("local[*]")    //2.创建StreamingContext    val ssc = new StreamingContext(conf, Seconds(3))    //3.创建Kafka参数    val kafkaParams: Map[String, String] = Map[String, String](      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop202:9092,hadoop203:9092,hadoop204:9092",      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"    )    //4.获取上一次消费的位置信息    val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long](      TopicAndPartition("mybak", 0) -> 13L,      TopicAndPartition("mybak", 1) -> 10L    )    //5.使用DirectAPI手动维护offset的方式消费数据    val kafakDStream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](      ssc,      kafkaParams,      fromOffsets,      (m: MessageAndMetadata[String, String]) => m.message())    //6.定义空集合用于存放数据的offset    var offsetRanges = Array.empty[OffsetRange]    //7.将当前消费到的offset进行保存    kafakDStream.transform { rdd =>      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges      rdd    }.foreachRDD { rdd =>      for (o <- offsetRanges) {        println(s"${o.fromOffset}-${o.untilOffset}")      }    }    //8.开启任务    ssc.start()    ssc.awaitTermination()  }}

4. Kafka 0-10 Direct模式

  1. 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

  2. 导入依赖,为了避免和0-8冲突,我们新建一个module演示

    org.apache.spark    spark-core_2.11    2.1.1    org.apache.spark    spark-streaming_2.11    2.1.1    org.apache.spark    spark-streaming-kafka-0-10_2.11    2.1.1

3)编写代码

object Spark01_DirectAPI010 {  def main(args: Array[String]): Unit = {    //1.创建SparkConf    val conf: SparkConf = new SparkConf().setAppName("DirectAPI010").setMaster("local[*]")    //2.创建StreamingContext    val ssc = new StreamingContext(conf, Seconds(3))    //3.构建Kafka参数    val kafkaParmas: Map[String, Object] = Map[String, Object](      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",      ConsumerConfig.GROUP_ID_CONFIG -> "bigdata191122",      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]    )    //4.消费Kafka数据创建流    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String, String](Set("test"), kafkaParmas))    //5.计算WordCount并打印    kafkaDStream.map(_.value())      .flatMap(_.split(" "))      .map((_, 1))      .reduceByKey(_ + _)      .print()    //6.启动任务    ssc.start()    ssc.awaitTermination()  }}

5. 消费Kafka数据模式总结

  1. 0-8 ReceiverAPI:

  • 1)专门的Executor读取数据,速度不统一

  • 2)跨机器传输数据,WAL

  • 3)Executor读取数据通过多个线程的方式,想要增加并行度,则需要多个流union

  • 4)offset存储在Zookeeper中

  1. 0-8 DirectAPI:

  • 1)Executor读取数据并计算

  • 2)增加Executor个数来增加消费的并行度

  • 3)offset存储


    • a)CheckPoint(getActiveOrCreate方式创建StreamingContext)


    • b)手动维护(有事务的存储系统)


    • c)获取offset必须在第一个调用的算子中:offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  1. 0-10 DirectAPI:

  • 1)Executor读取数据并计算

  • 2)增加Executor个数来增加消费的并行度

  • 3)offset存储


    • i.a.__consumer_offsets系统主题中


    • ii.b.手动维护(有事务的存储系统)

到此,关于"Dstream的创建方法"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0