SparkStreaming消费kafka数据
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。实例代码package com.fwmagic.testimpo
千家信息网最后更新 2025年01月23日SparkStreaming消费kafka数据
概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。
实例代码
package com.fwmagic.testimport com.alibaba.fastjson.{JSON, JSONException}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}import org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}import org.slf4j.LoggerFactory/** * created by fwmagic */object RealtimeEtl { privateval logger = LoggerFactory.getLogger(PVUV.getClass) def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hadoop") val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]") val spark = SparkSession.builder().config(conf).getOrCreate() val streamContext = new StreamingContext(spark.sparkContext, Seconds(5)) //直连方式相当于跟kafka的Topic至直接连接 //"auto.offset.reset:earliest(每次重启重新开始消费),latest(重启时会从最新的offset开始读取) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "fwmagic", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("access") val kafkaDStream = KafkaUtils.createDirectStream[String, String]( streamContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) //如果使用SparkStream和Kafka直连方式整合,生成的kafkaDStream必须调用foreachRDD kafkaDStream.foreachRDD(kafkaRDD => { if (!kafkaRDD.isEmpty()) { //获取当前批次的RDD的偏移量 val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges //拿出kafka中的数据 val lines = kafkaRDD.map(_.value()) //将lines字符串转换成json对象 val logBeanRDD = lines.map(line => { var logBean: LogBean = null try { logBean = JSON.parseObject(line, classOf[LogBean]) } catch { case e: JSONException => { //logger记录 logger.error("json解析错误!line:" + line, e) } } logBean }) //过滤 val filteredRDD = logBeanRDD.filter(_ != null) //将RDD转化成DataFrame,因为RDD中装的是case class import spark.implicits._ val df = filteredRDD.toDF() df.show() //将数据写到hdfs中:hdfs://hd1:9000/360 df.repartition(1).write.mode(SaveMode.Append).parquet(args(0)) //提交当前批次的偏移量,偏移量最后写入kafka kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }) //启动 streamContext.start() streamContext.awaitTermination() streamContext.stop() }}case class LogBean(time:String, longitude:Double, latitude:Double, openid:String, page:String, evnet_type:Int)
依赖环境(pom.xml)
4.0.0 com.fwmagic.360 fwmagic-360 1.0 1.8 1.8 2.11.7 2.2.2 2.7.7 UTF-8 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-sql_2.11 ${spark.version} org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-10_2.11 ${spark.version} org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-client ${hadoop.version} com.alibaba fastjson 1.2.39 net.alchim31.maven scala-maven-plugin 3.2.2 org.apache.maven.plugins maven-compiler-plugin 3.5.1 net.alchim31.maven scala-maven-plugin scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile org.apache.maven.plugins maven-compiler-plugin compile compile org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA
数据
偏移
消费
例子
批次
方式
中装
代码
功能
字符
字符串
实例
实时
对象
概要
消息
环境
错误
UTF-8
存储
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
吉屋网络技术怎么样
网络安全项目经理日常工作
杭州的网络安全
百分百网络技术有限公司
网络安全宣传活动周征文
西安爱易网络技术待遇
网络安全基础考题
sql销售管理数据库设计
随机抽样数据库
魔雀软件开发者简介
达梦数据库怎么添加关键字
手机上传照片服务器python
艾欧里亚服务器在哪
王者荣耀换服务器后还能开黑吗
c 中对数据库选取一列
病毒删除数据库
数据库为null还是为
软件开发市场前景分析
教育软件开发公司的名字
hp服务器怎么做阵列
轻量应用服务器部署博客
国内哪些邮箱服务器提供商
中科曙光服务器性价比
上海生产管理软件开发商
常见的保证网络安全的方法
北仑计算机软件开发外包
dell 服务器 黑屏
黄炜珈网络安全
学生软件开发公司
力控 连接网络数据库