千家信息网

spark streaming测试之四设置窗口大小接收数据

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,测试思路:首先,使用网络数据发送程序发送数据;然后,运行spark程序;观察效果。说明:1. 这里也需要设置检查点目录2. 这里有四个参数:前两个分别是监听的端口和每隔多少毫秒接收一次数据;第三个参数
千家信息网最后更新 2025年01月22日spark streaming测试之四设置窗口大小接收数据

测试思路:

首先,使用网络数据发送程序发送数据;

然后,运行spark程序;

观察效果。

说明:

1. 这里也需要设置检查点目录

2. 这里有四个参数:

前两个分别是监听的端口和每隔多少毫秒接收一次数据;

第三个参数是接收前多少毫秒的数据;(详细请参见window具体含义)

第四个参数是每隔多少毫秒接收一次数据。


sparkStreamingimport org.apache.log4j.{LoggerLevel}import org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.{SecondsStreamingContext}import org.apache.spark.{SparkContextSparkConf}import org.apache.spark.streaming.StreamingContext._WindowWordCount {  def main(args: Array[]){    Logger.().setLevel(Level.)    Logger.().setLevel(Level.)    conf = SparkConf().setAppName().setMaster()    sc = SparkContext(conf)    ssc = StreamingContext(sc())    ssc.checkpoint()    val lines = ssc.socketTextStream(args(0),args(1).toInt,      StorageLevel.MEMORY_ONLY_SER)    words = lines.flatMap(_.split())    //windows operator    val wordCounts = words.map(x=>(x,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),      Seconds(args(2).toInt),Seconds(args(3).toInt))    wordCounts.print()    ssc.start()    ssc.awaitTermination()  }}
0