千家信息网

SparkStreaming消费kafka数据

发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。实例代码package com.fwmagic.testimpo
千家信息网最后更新 2025年02月23日SparkStreaming消费kafka数据

概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。

实例代码

package com.fwmagic.testimport com.alibaba.fastjson.{JSON, JSONException}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}import org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}import org.slf4j.LoggerFactory/**  * created by fwmagic  */object RealtimeEtl {  privateval logger = LoggerFactory.getLogger(PVUV.getClass)  def main(args: Array[String]): Unit = {    System.setProperty("HADOOP_USER_NAME", "hadoop")    val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]")    val spark = SparkSession.builder().config(conf).getOrCreate()    val streamContext = new StreamingContext(spark.sparkContext, Seconds(5))    //直连方式相当于跟kafka的Topic至直接连接    //"auto.offset.reset:earliest(每次重启重新开始消费),latest(重启时会从最新的offset开始读取)    val kafkaParams = Map[String, Object](      "bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092",      "key.deserializer" -> classOf[StringDeserializer],      "value.deserializer" -> classOf[StringDeserializer],      "group.id" -> "fwmagic",      "auto.offset.reset" -> "latest",      "enable.auto.commit" -> (false: java.lang.Boolean)    )    val topics = Array("access")    val kafkaDStream = KafkaUtils.createDirectStream[String, String](      streamContext,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)    )    //如果使用SparkStream和Kafka直连方式整合,生成的kafkaDStream必须调用foreachRDD    kafkaDStream.foreachRDD(kafkaRDD => {      if (!kafkaRDD.isEmpty()) {        //获取当前批次的RDD的偏移量        val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges        //拿出kafka中的数据        val lines = kafkaRDD.map(_.value())        //将lines字符串转换成json对象        val logBeanRDD = lines.map(line => {          var logBean: LogBean = null          try {            logBean = JSON.parseObject(line, classOf[LogBean])          } catch {            case e: JSONException => {              //logger记录              logger.error("json解析错误!line:" + line, e)            }          }          logBean        })        //过滤        val filteredRDD = logBeanRDD.filter(_ != null)        //将RDD转化成DataFrame,因为RDD中装的是case class        import spark.implicits._        val df = filteredRDD.toDF()        df.show()        //将数据写到hdfs中:hdfs://hd1:9000/360        df.repartition(1).write.mode(SaveMode.Append).parquet(args(0))        //提交当前批次的偏移量,偏移量最后写入kafka        kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)      }    })    //启动    streamContext.start()    streamContext.awaitTermination()    streamContext.stop()  }}case class LogBean(time:String,                   longitude:Double,                   latitude:Double,                   openid:String,                   page:String,                   evnet_type:Int)

依赖环境(pom.xml)

    4.0.0    com.fwmagic.360    fwmagic-360    1.0            1.8        1.8        2.11.7        2.2.2        2.7.7        UTF-8                                    org.scala-lang            scala-library            ${scala.version}                                    org.apache.spark            spark-core_2.11            ${spark.version}                                    org.apache.spark            spark-sql_2.11            ${spark.version}                                    org.apache.spark            spark-streaming_2.11            ${spark.version}                            org.apache.spark            spark-streaming-kafka-0-10_2.11            ${spark.version}                                    org.apache.hadoop            hadoop-client            ${hadoop.version}                                    org.apache.hadoop            hadoop-client            ${hadoop.version}                            com.alibaba            fastjson            1.2.39                                                                                        net.alchim31.maven                    scala-maven-plugin                    3.2.2                                                                    org.apache.maven.plugins                    maven-compiler-plugin                    3.5.1                                                                        net.alchim31.maven                scala-maven-plugin                                                            scala-compile-first                        process-resources                                                    add-source                            compile                                                                                        scala-test-compile                        process-test-resources                                                    testCompile                                                                                                    org.apache.maven.plugins                maven-compiler-plugin                                                            compile                                                    compile                                                                                                                org.apache.maven.plugins                maven-shade-plugin                2.4.3                                                            package                                                    shade                                                                                                                                                *:*                                                                            META-INF/*.SF                                        META-INF/*.DSA                                        META-INF/*.RSA                                                                                                                                                                                    
数据 偏移 消费 例子 批次 方式 中装 代码 功能 字符 字符串 实例 实时 对象 概要 消息 环境 错误 UTF-8 存储 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 为什么投屏连接服务器失败 数据库 开源库 xshow软件服务器配置 淄博市公安局网络安全保卫分局 服务器cpu哪款好 公安部网络安全保卫局张俊兵 深圳智能巡检软件开发多少钱 raid服务器红灯和绿灯区别 区块链软件开发公司排名 女生投华为网络技术工程师 服务器远程管理密码重置 魔兽世界怀旧服如何更换服务器 对校园网网络安全的问题 网络安全知识 灯谜 网络安全工程师的硬性要求 网络安全沈鑫剡课后习题答案 阿里云服务器学生购买 没有源代码没有数据库能用吗 csmar数据库是国泰安么 亳州学院网络安全宣传活动 一般企业网络安全工程师 怎么用mysql附加数据库 资阳市公安局网络安全副局长 常用的网络技术安全分为 宽城区网络技术服务质量保证 陕西服务器机柜哪里买 虹口区网络安全资质申请费用 ios是软件开发吗 hp服务器管理口密码 服务器操作系统linux占比
0