千家信息网

Spark Streaming笔记整理(三):DS的transformation与output操作

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,[TOC]DStream的各种transformationTransformation Meaningmap(func) 对DStream中的各个元素进行func函数操作,然后返回一个新的DSt
千家信息网最后更新 2025年01月24日Spark Streaming笔记整理(三):DS的transformation与output操作

[TOC]


DStream的各种transformation

Transformation  Meaningmap(func)   对DStream中的各个元素进行func函数操作,然后返回一个新的DStream.flatMap(func)   与map方法类似,只不过各个输入项可以被输出为零个或多个输出项filter(func)    过滤出所有函数func返回值为true的DStream元素并返回一个新的DStreamrepartition(numPartitions)  增加或减少DStream中的分区数,从而改变DStream的并行度union(otherStream)  将源DStream和输入参数为otherDStream的元素合并,并返回一个新的DStream.count()     通过对DStreaim中的各个RDD中的元素进行计数,然后返回只有一个元素的RDD构成的DStreamreduce(func)    对源DStream中的各个RDD中的元素利用func进行聚合操作,然后返回只有一个元素的RDD构成的新的DStream.countByValue()  对于元素类型为K的DStream,返回一个元素为(K,Long)键值对形式的新的DStream,Long对应的值为源DStream中各个RDD的key出现的次数reduceByKey(func, [numTasks])   利用func函数对源DStream中的key进行聚合操作,然后返回新的(K,V)对构成的DStreamjoin(otherStream, [numTasks])   输入为(K,V)、(K,W)类型的DStream,返回一个新的(K,(V,W)类型的DStreamcogroup(otherStream, [numTasks])    输入为(K,V)、(K,W)类型的DStream,返回一个新的 (K, Seq[V], Seq[W]) 元组类型的DStreamtransform(func)     通过RDD-to-RDD函数作用于源码DStream中的各个RDD,可以是任意的RDD操作,从而返回一个新的RDDupdateStateByKey(func)  根据于key的前置状态和key的新值,对key进行更新,返回一个新状态的DstreamWindow 函数: 

可以看到很多都是在RDD中已经有的transformation算子操作,所以这里只关注transform、updateStateByKey和window函数

transformation之transform操作

DStream transform

1、transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。

2、DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行join。

案例

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.rdd.RDDimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}/**  * 使用Transformation之transform来完成在线黑名单过滤  * 需求:  *     将日志数据中来自于ip["27.19.74.143", "110.52.250.126"]实时过滤掉  * 数据格式  *     27.19.74.143##2016-05-30 17:38:20##GET /static/image/common/faq.gif HTTP/1.1##200##1127  */object _06SparkStreamingTransformOps {    def main(args: Array[String]): Unit = {        if (args == null || args.length < 2) {            System.err.println(                """Parameter Errors! Usage:                    |hostname: 监听的网络socket的主机名或ip地址                  |port:    监听的网络socket的端口                """.stripMargin)            System.exit(-1)        }        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)        val conf = new SparkConf()            .setAppName(_01SparkStreamingNetWorkOps.getClass.getSimpleName)            .setMaster("local[2]")        val ssc = new StreamingContext(conf, Seconds(2))        val hostname = args(0).trim        val port = args(1).trim.toInt        //黑名单数据        val blacklist = List(("27.19.74.143", true), ("110.52.250.126", true))//        val blacklist = List("27.19.74.143", "110.52.250.126")        val blacklistRDD:RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blacklist)        val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)        // 如果用到一个DStream和rdd进行操作,无法使用dstream直接操作,只能使用transform来进行操作        val filteredDStream:DStream[String] = linesDStream.transform(rdd => {            val ip2InfoRDD:RDD[(String, String)] = rdd.map{line => {                (line.split("##")(0), line)            }}            /** A(M) B(N)两张表:              * across join              *     交叉连接,没有on条件的连接,会产生笛卡尔积(M*N条记录) 不能用              * inner join              *     等值连接,取A表和B表的交集,也就是获取在A和B中都有的数据,没有的剔除掉 不能用              * left outer join              *     外链接:最常用就是左外连接(将左表中所有的数据保留,右表中能够对应上的数据正常显示,在右表中对应不上,显示为null)              *         可以通过非空判断是左外连接达到inner join的结果              */            val joinedInfoRDD:RDD[(String, (String, Option[Boolean]))] = ip2InfoRDD.leftOuterJoin(blacklistRDD)            joinedInfoRDD.filter{case (ip, (line, joined)) => {                joined == None            }}//执行过滤操作                .map{case (ip, (line, joined)) => line}        })        filteredDStream.print()        ssc.start()        ssc.awaitTermination()        ssc.stop()  // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身    }}

nc中产生数据:

[uplooking@uplooking01 ~]$ nc -lk 489327.19.74.143##2016-05-30 17:38:20##GET /data/attachment/common/c8/common_2_verify_icon.png HTTP/1.1##200##582110.52.250.126##2016-05-30 17:38:20##GET /static/js/logging.js?y7a HTTP/1.1##200##6038.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

输出结果如下:

-------------------------------------------Time: 1526006084000 ms-------------------------------------------8.35.201.144##2016-05-30 17:38:20##GET /uc_server/avatar.php?uid=29331&size=middle HTTP/1.1##301##-

transformation之updateStateByKey操作

概述

1、Spark Streaming的updateStateByKey可以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加。

2、 updateStateByKey 解释

以DStream中的数据进行按key做reduce操作,然后对各个批次的数据进行累加在有新的数据信息进入或更新时,可以让用户保持想要的任何状。使用这个功能需要完成两步:

1) 定义状态:可以是任意数据类型

2) 定义状态更新函数:用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。对于有状态操作,要不断的把当前和历史的时间切片的RDD累加计算,随着时间的流失,计算的数据规模会变得越来越大

3、要思考的是如果数据量很大的时候,或者对性能的要求极为苛刻的情况下,可以考虑将数据放在Redis或者tachyon或者ignite上

4、注意,updateStateByKey操作,要求必须开启Checkpoint机制。

案例

Scala版

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}/**  * 状态函数updateStateByKey  *     更新key的状态(就是key对应的value)  *  * 通常的作用,计算某个key截止到当前位置的状态  *     统计截止到目前为止的word对应count  * 要想完成截止到目前为止的操作,必须将历史的数据和当前最新的数据累计起来,所以需要一个地方来存放历史数据  * 这个地方就是checkpoint目录  *  */object _07SparkStreamingUpdateStateByKeyOps {    def main(args: Array[String]): Unit = {        if (args == null || args.length < 2) {            System.err.println(                """Parameter Errors! Usage:                    |hostname: 监听的网络socket的主机名或ip地址                  |port:    监听的网络socket的端口                """.stripMargin)            System.exit(-1)        }        val hostname = args(0).trim        val port = args(1).trim.toInt        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)        val conf = new SparkConf()            .setAppName(_07SparkStreamingUpdateStateByKeyOps.getClass.getSimpleName)            .setMaster("local[2]")        val ssc = new StreamingContext(conf, Seconds(2))        ssc.checkpoint("hdfs://ns1/checkpoint/streaming/usb")        // 接收到的当前批次的数据        val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)        // 这是记录下来的当前批次的数据        val rbkDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)        val usbDStream:DStream[(String, Int)]  = rbkDStream.updateStateByKey(updateFunc)        usbDStream.print()        ssc.start()        ssc.awaitTermination()        ssc.stop()  // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身    }    /**      * @param seq 当前批次的key对应的数据      * @param history 历史key对应的数据,可能有可能没有      * @return      */    def updateFunc(seq: Seq[Int], history: Option[Int]): Option[Int] = {        var sum = seq.sum        if(history.isDefined) {            sum += history.get        }        Option[Int](sum)    }}

nc产生数据:

[uplooking@uplooking01 ~]$ nc -lk 4893hello hellohello you hello he hello me

输出结果如下:

-------------------------------------------Time: 1526009358000 ms-------------------------------------------(hello,2)18/05/11 11:29:18 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009338000: -------------------------------------------Time: 1526009360000 ms-------------------------------------------(hello,5)(me,1)(you,1)(he,1)18/05/11 11:29:20 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in hdfs://ns1/checkpoint/streaming/usb/receivedBlockMetadata older than 1526009340000: -------------------------------------------Time: 1526009362000 ms-------------------------------------------(hello,5)(me,1)(you,1)(he,1)
Java版

用法略有不同,主要是 状态更新函数的写法上有区别,如下:

package cn.xpleaf.bigdata.spark.java.streaming.p1;import com.google.common.base.Optional;import org.apache.log4j.Level;import org.apache.log4j.Logger;import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import scala.Tuple2;import java.util.Arrays;import java.util.List;public class _02SparkStreamingUpdateStateByKeyOps {    public static void main(String[] args) {        if(args == null || args.length < 2) {            System.err.println("Parameter Errors! Usage:  ");            System.exit(-1);        }        Logger.getLogger("org.apache.spark").setLevel(Level.OFF);        SparkConf conf = new SparkConf()                .setAppName(_02SparkStreamingUpdateStateByKeyOps.class.getSimpleName())                .setMaster("local[2]");        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(2));        jsc.checkpoint("hdfs://ns1/checkpoint/streaming/usb");        String hostname = args[0].trim();        int port = Integer.valueOf(args[1].trim());        JavaReceiverInputDStream lineDStream = jsc.socketTextStream(hostname, port);//默认的持久化级别:MEMORY_AND_DISK_SER_2        JavaDStream wordsDStream = lineDStream.flatMap(new FlatMapFunction() {            @Override            public Iterable call(String line) throws Exception {                return Arrays.asList(line.split(" "));            }        });        JavaPairDStream pairsDStream = wordsDStream.mapToPair(word -> {            return new Tuple2(word, 1);        });        JavaPairDStream rbkDStream = pairsDStream.reduceByKey(new Function2() {            @Override            public Integer call(Integer v1, Integer v2) throws Exception {                return v1 + v2;            }        });        // 做历史的累计操作        JavaPairDStream usbDStream = rbkDStream.updateStateByKey(new Function2, Optional, Optional>() {            @Override            public Optional call(List current, Optional history) throws Exception {                int sum = 0;                for (int i : current) {                    sum += i;                }                if (history.isPresent()) {                    sum += history.get();                }                return Optional.of(sum);            }        });        usbDStream.print();        jsc.start();//启动流式计算        jsc.awaitTermination();//等待执行结束        jsc.close();    }}

transformation之window操作

DStream window 滑动窗口

Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

1.红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

2.这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

所以基于窗口的操作,需要指定2个参数:

window length - The duration of the window (3 in the figure)slide interval - The interval at which the window-based operation is performed (2 in the figure). 1.窗口大小,个人感觉是一段时间内数据的容器。2.滑动间隔,就是我们可以理解的cron表达式吧。举个例子吧:还是以最著名的wordcount举例,每隔10秒,统计一下过去30秒过来的数据。// Reduce last 30 seconds of data, every 10 seconds  val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

DSstream window滑动容器功能

window 对每个滑动窗口的数据执行自定义的计算countByWindow 对每个滑动窗口的数据执行count操作reduceByWindow 对每个滑动窗口的数据执行reduce操作reduceByKeyAndWindow 对每个滑动窗口的数据执行reduceByKey操作countByValueAndWindow 对每个滑动窗口的数据执行countByValue操作

案例

测试代码如下:

package cn.xpleaf.bigdata.spark.scala.streaming.p1import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}import org.apache.spark.streaming.{Seconds, StreamingContext}/**  *窗口函数window  *   每隔多长时间(滑动频率slideDuration)统计过去多长时间(窗口长度windowDuration)中的数据  * 需要注意的就是窗口长度和滑动频率  * windowDuration = M*batchInterval,    slideDuration = N*batchInterval  */object _08SparkStreamingWindowOps {    def main(args: Array[String]): Unit = {        if (args == null || args.length < 2) {            System.err.println(                """Parameter Errors! Usage:                    |hostname: 监听的网络socket的主机名或ip地址                  |port:    监听的网络socket的端口                """.stripMargin)            System.exit(-1)        }        val hostname = args(0).trim        val port = args(1).trim.toInt        Logger.getLogger("org.apache.spark").setLevel(Level.OFF)        val conf = new SparkConf()            .setAppName(_08SparkStreamingWindowOps.getClass.getSimpleName)            .setMaster("local[2]")        val ssc = new StreamingContext(conf, Seconds(2))        // 接收到的当前批次的数据        val linesDStream:ReceiverInputDStream[String] = ssc.socketTextStream(hostname, port)        val pairsDStream:DStream[(String, Int)] =linesDStream.flatMap(_.split(" ")).map((_, 1))        // 每隔4s,统计过去6s中产生的数据        val retDStream:DStream[(String, Int)] = pairsDStream.reduceByKeyAndWindow(_+_, windowDuration = Seconds(6), slideDuration = Seconds(4))        retDStream.print()        ssc.start()        ssc.awaitTermination()        ssc.stop()  // stop中的boolean参数,设置为true,关闭该ssc对应的SparkContext,默认为false,只关闭自身    }}

nc产生数据:

[uplooking@uplooking01 ~]$ nc -lk 4893hello youhello hehello mehello youhello he

输出结果如下:

-------------------------------------------Time: 1526016316000 ms-------------------------------------------(hello,4)(me,1)(you,2)(he,1)-------------------------------------------Time: 1526016320000 ms-------------------------------------------(hello,5)(me,1)(you,2)(he,2)-------------------------------------------Time: 1526016324000 ms-------------------------------------------

DStream的output操作以及foreachRDD

DStream output操作

1、print 打印每个batch中的前10个元素,主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。

2、saveAsTextFile(prefix, [suffix]) 将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix]

3、saveAsObjectFile 同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。

4、saveAsHadoopFile 同上,将数据保存到Hadoop文件中

5、foreachRDD 最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。

DStream foreachRDD详解

相关内容其实在Spark开发调优中已经有相关的说明。

通常在foreachRDD中,都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储。

误区一:在RDD的foreach操作外部,创建Connection

这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象,实际上一般是不支持序列化的,也就无法被传输。

dstream.foreachRDD { rdd =>  val connection = createNewConnection()   rdd.foreach { record => connection.send(record)  }}
误区二:在RDD的foreach操作内部,创建Connection

这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。而通常来说,Connection的创建,是很消耗性能的。

dstream.foreachRDD { rdd =>  rdd.foreach { record =>    val connection = createNewConnection()    connection.send(record)    connection.close()  }}
DStream foreachRDD合理使用

合理方式一:使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象,这样就相当于是,为RDD的每个partition创建一个Connection对象,节省资源的多了。

dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = createNewConnection()    partitionOfRecords.foreach(record => connection.send(record))    connection.close()  }}

合理方式二:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。这样的话,甚至在多个RDD的partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。

dstream.foreachRDD { rdd =>  rdd.foreachPartition { partitionOfRecords =>    val connection = ConnectionPool.getConnection()    partitionOfRecords.foreach(record => connection.send(record))    ConnectionPool.returnConnection(connection)    }}
foreachRDD 与foreachPartition实现实战

需要注意的是:

(1)、你最好使用forEachPartition函数来遍历RDD,并且在每台Work上面创建数据库的connection。

(2)、如果你的数据库并发受限,可以通过控制数据的分区来减少并发。

(3)、在插入MySQL的时候最好使用批量插入。

(4),确保你写入的数据库过程能够处理失败,因为你插入数据库的过程可能会经过网络,这可能导致数据插入数据库失败。

(5)、不建议将你的RDD数据写入到MySQL等关系型数据库中。

这部分内容其实可以参考开发调优部分的案例,只是那里并没有foreachRDD,因为其并没有使用DStream,但是原理是一样的,因为最终都是针对RDD来进行操作的。

数据 函数 状态 元素 时间 参数 就是 数据库 网络 对象 批次 类型 更新 监听 历史 方式 输入 文件 案例 结果 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 数据库面试15句 数据库查询所有人的年龄 一汽红旗软件开发中心工作水平 邯郸办公系统软件开发哪家可靠 在数据库中sc是什么表 如何打开服务器管理其 重庆计算机网络技术专升本院校 浪潮服务器服务商在哪个地方 电脑账户数据库密码 忘了 无锡软件开发费用 不同数据库的表如何关联查询 湖南烟草网络安全信息 2022西安软件开发工资 服务器上的光驱怎么装 国内低调科技公司互联网公司 绍兴三维管理软件开发 深圳的软件开发培训机构 医院互联网科技公司 做好网络安全和信息保密工作 动态云服务器 崇明区重型网络技术销售价格 用户网络安全服务需求 南通佳录互联网科技 江苏省网络安全短信 登录游戏时服务器连接失败怎么办 网络安全高三学生怎么做演讲稿 江门互联网软件开发是什么 融创互联网科技有限公司 网络安全罚款金额 世界互联网科技发源地成语
0