SparkStreaming消费kafka数据
发表于:2025-02-23 作者:千家信息网编辑
千家信息网最后更新 2025年02月23日,概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。实例代码package com.fwmagic.testimpo
千家信息网最后更新 2025年02月23日SparkStreaming消费kafka数据
概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取、过滤、转换,然后存储到HDFS中。
实例代码
package com.fwmagic.testimport com.alibaba.fastjson.{JSON, JSONException}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.{SaveMode, SparkSession}import org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}import org.slf4j.LoggerFactory/** * created by fwmagic */object RealtimeEtl { privateval logger = LoggerFactory.getLogger(PVUV.getClass) def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "hadoop") val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]") val spark = SparkSession.builder().config(conf).getOrCreate() val streamContext = new StreamingContext(spark.sparkContext, Seconds(5)) //直连方式相当于跟kafka的Topic至直接连接 //"auto.offset.reset:earliest(每次重启重新开始消费),latest(重启时会从最新的offset开始读取) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "fwmagic", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("access") val kafkaDStream = KafkaUtils.createDirectStream[String, String]( streamContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) //如果使用SparkStream和Kafka直连方式整合,生成的kafkaDStream必须调用foreachRDD kafkaDStream.foreachRDD(kafkaRDD => { if (!kafkaRDD.isEmpty()) { //获取当前批次的RDD的偏移量 val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges //拿出kafka中的数据 val lines = kafkaRDD.map(_.value()) //将lines字符串转换成json对象 val logBeanRDD = lines.map(line => { var logBean: LogBean = null try { logBean = JSON.parseObject(line, classOf[LogBean]) } catch { case e: JSONException => { //logger记录 logger.error("json解析错误!line:" + line, e) } } logBean }) //过滤 val filteredRDD = logBeanRDD.filter(_ != null) //将RDD转化成DataFrame,因为RDD中装的是case class import spark.implicits._ val df = filteredRDD.toDF() df.show() //将数据写到hdfs中:hdfs://hd1:9000/360 df.repartition(1).write.mode(SaveMode.Append).parquet(args(0)) //提交当前批次的偏移量,偏移量最后写入kafka kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } }) //启动 streamContext.start() streamContext.awaitTermination() streamContext.stop() }}case class LogBean(time:String, longitude:Double, latitude:Double, openid:String, page:String, evnet_type:Int)
依赖环境(pom.xml)
4.0.0 com.fwmagic.360 fwmagic-360 1.0 1.8 1.8 2.11.7 2.2.2 2.7.7 UTF-8 org.scala-lang scala-library ${scala.version} org.apache.spark spark-core_2.11 ${spark.version} org.apache.spark spark-sql_2.11 ${spark.version} org.apache.spark spark-streaming_2.11 ${spark.version} org.apache.spark spark-streaming-kafka-0-10_2.11 ${spark.version} org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-client ${hadoop.version} com.alibaba fastjson 1.2.39 net.alchim31.maven scala-maven-plugin 3.2.2 org.apache.maven.plugins maven-compiler-plugin 3.5.1 net.alchim31.maven scala-maven-plugin scala-compile-first process-resources add-source compile scala-test-compile process-test-resources testCompile org.apache.maven.plugins maven-compiler-plugin compile compile org.apache.maven.plugins maven-shade-plugin 2.4.3 package shade *:* META-INF/*.SF META-INF/*.DSA META-INF/*.RSA
数据
偏移
消费
例子
批次
方式
中装
代码
功能
字符
字符串
实例
实时
对象
概要
消息
环境
错误
UTF-8
存储
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
为什么投屏连接服务器失败
数据库 开源库
xshow软件服务器配置
淄博市公安局网络安全保卫分局
服务器cpu哪款好
公安部网络安全保卫局张俊兵
深圳智能巡检软件开发多少钱
raid服务器红灯和绿灯区别
区块链软件开发公司排名
女生投华为网络技术工程师
服务器远程管理密码重置
魔兽世界怀旧服如何更换服务器
对校园网网络安全的问题
网络安全知识 灯谜
网络安全工程师的硬性要求
网络安全沈鑫剡课后习题答案
阿里云服务器学生购买
没有源代码没有数据库能用吗
csmar数据库是国泰安么
亳州学院网络安全宣传活动
一般企业网络安全工程师
怎么用mysql附加数据库
资阳市公安局网络安全副局长
常用的网络技术安全分为
宽城区网络技术服务质量保证
陕西服务器机柜哪里买
虹口区网络安全资质申请费用
ios是软件开发吗
hp服务器管理口密码
服务器操作系统linux占比