Kafka+SparkStream+Hive的项目实现方法是什么
发表于:2025-02-05 作者:千家信息网编辑
千家信息网最后更新 2025年02月05日,本篇内容主要讲解"Kafka+SparkStream+Hive的项目实现方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Kafka+SparkSt
千家信息网最后更新 2025年02月05日Kafka+SparkStream+Hive的项目实现方法是什么
本篇内容主要讲解"Kafka+SparkStream+Hive的项目实现方法是什么",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"Kafka+SparkStream+Hive的项目实现方法是什么"吧!
目前的项目中需要将kafka队列的数据实时存到hive表中。
import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.types.{StringType, StructField, StructType}import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}import org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistentimport org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
def main(args: Array[String]): Unit = { // val conf = new SparkConf() // conf.setMaster("local") // conf.setAppName("SparkStreamingOnKafkaDirect") val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate() val ssc = new StreamingContext(spark.sparkContext, Durations.seconds(3)) //设置日志级别 ssc.sparkContext.setLogLevel("Error") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "MyGroupId", // /** * 当没有初始的offset,或者当前的offset不存在,如何处理数据 * earliest :自动重置偏移量为最小偏移量 * latest:自动重置偏移量为最大偏移量【默认】 * none:没有找到以前的offset,抛出异常 */ "auto.offset.reset" -> "earliest", /** * 当设置 enable.auto.commit为false时,不会自动向kafka中保存消费者offset.需要异步的处理完数据之后手动提交 */ "enable.auto.commit" -> (false: java.lang.Boolean) //默认是true ) //设置Kafka的topic val topics = Array("test") //创建与Kafka的连接,接收数据 /*这里接收到数据的样子 2019-09-26 1569487411604 1235 497 Kafka Register 2019-09-26 1569487411604 1235 497 Kafka Register 2019-09-26 1569487414838 390 778 Flink View */ val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, // Subscribe[String, String](topics, kafkaParams) ) //对接收到的数据进行处理,打印出来接收到的key跟value,最后放回的是valueval transStrem: DStream[String] = stream.map(record => { val key_value = (record.key, record.value) println("receive message key = " + key_value._1) println("receive message value = " + key_value._2) key_value._2 }) //这里用了一下动态创建的Schema val structType: StructType = StructType(List[StructField]( StructField("Date_", StringType, nullable = true), StructField("Timestamp_", StringType, nullable = true), StructField("UserID", StringType, nullable = true), StructField("PageID", StringType, nullable = true), StructField("Channel", StringType, nullable = true), StructField("Action", StringType, nullable = true) )) //因为foreachRDD可以拿到封装到DStream中的rdd,可以对里面的rdd进行, /*代码解释: 先从foreach中拿到一条数据,,在函数map中对接收来的数据用 "\n" 进行切分,放到Row中,用的是动态创建Schema,因为我们需要再将数据存储到hive中,所以需要Schema。 因为map是transformance算子,所以用rdd.count()触发一下 spark.createDataFrame:创建一个DataFrame,因为要注册一个临时表,必须用到DataFrame frame.createOrReplaceTempView("t1"):注册临时表 spark.sql("use spark"):使用 hive 的 spark 库 result.write.mode(SaveMode.Append).saveAsTable("test_kafka"):将数据放到 test_kafka 中 */ transStrem.foreachRDD(one => { val rdd: RDD[Row] = one.map({ a => val arr = a.toString.split("\t") Row(arr(0).toString, arr(1).toString, arr(2).toString, arr(3).toString, arr(4).toString, arr(5).toString) }) rdd.count() val frame: DataFrame = spark.createDataFrame(rdd, structType) // println(" Scheme: "+frame.printSchema()) frame.createOrReplaceTempView("t1") // spark.sql("select * from t1").show() spark.sql("use spark") spark.sql("select * from t1"). write.mode(SaveMode.Append).saveAsTable("test_kafka") } ) /** * 以上业务处理完成之后,异步的提交消费者offset,这里将 enable.auto.commit 设置成false,就是使用kafka 自己来管理消费者offset * 注意这里,获取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset时,必须从 源头读取过来的 stream中获取,不能从经过stream转换之后的DStream中获取。 */ stream.foreachRDD { rdd => val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) } ssc.start() ssc.awaitTermination() ssc.stop() }
到此,相信大家对"Kafka+SparkStream+Hive的项目实现方法是什么"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
数据
方法
项目
偏移
消费者
处理
消费
内容
动态
学习
实用
更深
最大
最小
业务
代码
兴趣
函数
动向
实时
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
服务器可以创建角色吗
软件开发湖南哪一所专科比较好
表格中怎么添加数据库的内容
服务器如何查询cpu
软件开发有轻松点的工作吗
db2命令行连接数据库
网络代理服务器ip
白猫计划软件开发
软件开发 输入输出
汉字数据库
北京市高考数据库
网络安全提供商有哪些公司
云数据库插入多条记录
360杀毒有服务器版么
黄冈租车软件开发
浪潮 服务器
国家网络安全日是什么时候
手机三国杀服务器
徐州运营软件开发商家
软件开发应该交几个点的税
目前软件开发语言有哪些
内蒙算力服务器租赁报价单
数据库主机%什么意思
纸箱软件开发维护
深圳乐天网络技术有限公司
数据库表字段重命名
求职信网络技术范文300字
天津途至臻网络技术有限公司
江门电玩城游戏软件开发公司
铜川软件开发报价