千家信息网

SparkStreaming消费kafka数据

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。实例代码package com.fwmagic.testimpo
千家信息网最后更新 2025年01月23日SparkStreaming消费kafka数据

概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。

实例代码

package com.fwmagic.testimport com.alibaba.fastjson.{JSON, JSONException}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}import org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}import org.slf4j.LoggerFactory/**  * created by fwmagic  */object RealtimeEtl {  privateval logger = LoggerFactory.getLogger(PVUV.getClass)  def main(args: Array[String]): Unit = {    System.setProperty("HADOOP_USER_NAME", "hadoop")    val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]")    val spark = SparkSession.builder().config(conf).getOrCreate()    val streamContext = new StreamingContext(spark.sparkContext, Seconds(5))    //直连方式相当于跟kafka的Topic至直接连接    //"auto.offset.reset:earliest(每次重启重新开始消费),latest(重启时会从最新的offset开始读取)    val kafkaParams = Map[String, Object](      "bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092",      "key.deserializer" -> classOf[StringDeserializer],      "value.deserializer" -> classOf[StringDeserializer],      "group.id" -> "fwmagic",      "auto.offset.reset" -> "latest",      "enable.auto.commit" -> (false: java.lang.Boolean)    )    val topics = Array("access")    val kafkaDStream = KafkaUtils.createDirectStream[String, String](      streamContext,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)    )    //如果使用SparkStream和Kafka直连方式整合,生成的kafkaDStream必须调用foreachRDD    kafkaDStream.foreachRDD(kafkaRDD => {      if (!kafkaRDD.isEmpty()) {        //获取当前批次的RDD的偏移量        val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges        //拿出kafka中的数据        val lines = kafkaRDD.map(_.value())        //将lines字符串转换成json对象        val logBeanRDD = lines.map(line => {          var logBean: LogBean = null          try {            logBean = JSON.parseObject(line, classOf[LogBean])          } catch {            case e: JSONException => {              //logger记录              logger.error("json解析错误!line:" + line, e)            }          }          logBean        })        //过滤        val filteredRDD = logBeanRDD.filter(_ != null)        //将RDD转化成DataFrame,因为RDD中装的是case class        import spark.implicits._        val df = filteredRDD.toDF()        df.show()        //将数据写到hdfs中:hdfs://hd1:9000/360        df.repartition(1).write.mode(SaveMode.Append).parquet(args(0))        //提交当前批次的偏移量,偏移量最后写入kafka        kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)      }    })    //启动    streamContext.start()    streamContext.awaitTermination()    streamContext.stop()  }}case class LogBean(time:String,                   longitude:Double,                   latitude:Double,                   openid:String,                   page:String,                   evnet_type:Int)

依赖环境(pom.xml)

    4.0.0    com.fwmagic.360    fwmagic-360    1.0            1.8        1.8        2.11.7        2.2.2        2.7.7        UTF-8                                    org.scala-lang            scala-library            ${scala.version}                                    org.apache.spark            spark-core_2.11            ${spark.version}                                    org.apache.spark            spark-sql_2.11            ${spark.version}                                    org.apache.spark            spark-streaming_2.11            ${spark.version}                            org.apache.spark            spark-streaming-kafka-0-10_2.11            ${spark.version}                                    org.apache.hadoop            hadoop-client            ${hadoop.version}                                    org.apache.hadoop            hadoop-client            ${hadoop.version}                            com.alibaba            fastjson            1.2.39                                                                                        net.alchim31.maven                    scala-maven-plugin                    3.2.2                                                                    org.apache.maven.plugins                    maven-compiler-plugin                    3.5.1                                                                        net.alchim31.maven                scala-maven-plugin                                                            scala-compile-first                        process-resources                                                    add-source                            compile                                                                                        scala-test-compile                        process-test-resources                                                    testCompile                                                                                                    org.apache.maven.plugins                maven-compiler-plugin                                                            compile                                                    compile                                                                                                                org.apache.maven.plugins                maven-shade-plugin                2.4.3                                                            package                                                    shade                                                                                                                                                *:*                                                                            META-INF/*.SF                                        META-INF/*.DSA                                        META-INF/*.RSA                                                                                                                                                                                    
0