spark作业怎么实现
发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇"spark作业怎么实现"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"spark
千家信息网最后更新 2025年02月03日spark作业怎么实现
这篇"spark作业怎么实现"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"spark作业怎么实现"文章吧。
将sample.log的数据发送到Kafka中,经过Spark Streaming处理,将数据格式变为以下形式:commandid | houseid | gathertime | srcip | destip |srcport| destport | domainname | proxytype | proxyip | proxytype | title | content | url | logid在发送到kafka的另一个队列中要求:1、sample.log => 读文件,将数据发送到kafka队列中2、从kafka队列中获取数据(0.10 接口不管理offset),变更数据格式3、处理后的数据在发送到kafka另一个队列中分析1 使用课程中的redis工具类管理offset2 读取日志数据发送数据到topic13 消费主题,将数据的分割方式修改为竖线分割,再次发送到topic2
1.OffsetsWithRedisUtils
package home.oneimport java.utilimport org.apache.kafka.common.TopicPartitionimport org.apache.spark.streaming.kafka010.OffsetRangeimport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}import scala.collection.mutableobject OffsetsWithRedisUtils { // 定义Redis参数 privateval redisHost = "linux123" privateval redisPort = 6379 // 获取Redis的连接 privateval config = new JedisPoolConfig // 最大空闲数 config.setMaxIdle(5) // 最大连接数 config.setMaxTotal(10) privateval pool = new JedisPool(config, redisHost, redisPort, 10000) private def getRedisConnection: Jedis = pool.getResource privateval topicPrefix = "kafka:topic" // Key:kafka:topic:TopicName:groupid private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid" // 根据 key 获取offsets def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = { val jedis: Jedis = getRedisConnection val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic => val key = getKey(topic, groupId) import scala.collection.JavaConverters._ // 将获取到的redis数据由Java的map转换为scala的map,数据格式为{key:[{partition,offset}]} jedis.hgetAll(key) .asScala .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong } } // 归还资源 jedis.close() offsets.flatten.toMap } // 将offsets保存到Redis中 def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = { // 获取连接 val jedis: Jedis = getRedisConnection // 组织数据 offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))} .groupBy(_._1) .foreach{case (topic, buffer) => val key: String = getKey(topic, groupId) import scala.collection.JavaConverters._ // 同样将scala的map转换为Java的map存入redis中 val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava // 保存数据 jedis.hmset(key, maps) } jedis.close() }}
KafkaProducer
package home.oneimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.serialization.StringSerializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object KafkaProducer { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]") val sc = new SparkContext(conf) // 读取sample.log文件数据 val lines: RDD[String] = sc.textFile("data/sample.log") // 定义 kafka producer参数 val prop = new Properties() // kafka的访问地址 prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") // key和value的序列化方式 prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) // 将读取到的数据发送到mytopic1 lines.foreachPartition{iter => // 初始化KafkaProducer val producer = new KafkaProducer[String, String](prop) iter.foreach{line => // 封装数据 val record = new ProducerRecord[String, String]("mytopic1", line) // 发送数据 producer.send(record) } producer.close() } }}
3.HomeOne
package home.oneimport java.util.Propertiesimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}object HomeOne { val log = Logger.getLogger(this.getClass) def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.ERROR) val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) // 需要消费的topic val topics: Array[String] = Array("mytopic1") val groupid = "mygroup1" // 定义kafka相关参数 val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid) // 从Redis获取offset val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid) // 创建DStream val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, // 从kafka中读取数据 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets) ) // 转换后的数据发送到另一个topic dstream.foreachRDD { rdd => if (!rdd.isEmpty) { // 获取消费偏移量 val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // 处理数据发送到topic2 rdd.foreachPartition(process) // 将offset保存到Redis OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid) } } // 启动作业 ssc.start() // 持续执行 ssc.awaitTermination() } // 将处理后的数据发送到topic2 def process(iter: Iterator[ConsumerRecord[String, String]]) = { iter.map(line => parse(line.value)) .filter(!_.isEmpty) .foreach(line => sendMsg2Topic(line, "mytopic2")) } // 调用kafka生产者发送消息 def sendMsg2Topic(msg: String, topic: String): Unit = { val producer = new KafkaProducer[String, String](getKafkaProducerParameters()) val record = new ProducerRecord[String, String](topic, msg) producer.send(record) } // 修改数据格式,将逗号分隔变成竖线分割 def parse(text: String): String = { try { val arr = text.replace("<<>>", "").split(",") if (arr.length != 15) return "" arr.mkString("|") } catch { case e: Exception => log.error("解析数据出错!", e) "" } } // 定义kafka消费者的配置信息 def getKafkaConsumerParameters(groupid: String): Map[String, Object] = { Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> groupid, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean), ) } // 定义生产者的kafka配置 def getKafkaProducerParameters(): Properties = { val prop = new Properties() prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092") prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) prop }}
2
/*假设机场的数据如下:1, "SFO"2, "ORD"3, "DFW"机场两两之间的航线及距离如下:1, 2,18002, 3, 8003, 1, 1400用 GraphX 完成以下需求:求所有的顶点求所有的边求所有的triplets求顶点数求边数求机场距离大于1000的有几个,有哪些按所有机场之间的距离排序(降序),输出结果 */
代码:
import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.graphx.{Edge, Graph, VertexId}import org.apache.spark.rdd.RDDobject TwoHome { def main(args: Array[String]): Unit = { // 初始化 val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]") val sc = new SparkContext(conf) sc.setLogLevel("warn") //初始化数据 val vertexArray: Array[(Long, String)] = Array((1L, "SFO"), (2L, "ORD"), (3L, "DFW")) val edgeArray: Array[Edge[Int]] = Array( Edge(1L, 2L, 1800), Edge(2L, 3L, 800), Edge(3L, 1L, 1400) ) //构造vertexRDD和edgeRDD val vertexRDD: RDD[(VertexId, String)] = sc.makeRDD(vertexArray) val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray) //构造图 val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD) //所有的顶点 println("所有顶点:") graph.vertices.foreach(println) //所有的边 println("所有边:") graph.edges.foreach(println) //所有的triplets println("所有三元组信息:") graph.triplets.foreach(println) //求顶点数 val vertexCnt = graph.vertices.count() println(s"总顶点数:$vertexCnt") //求边数 val edgeCnt = graph.edges.count() println(s"总边数:$edgeCnt") //机场距离大于1000的 println("机场距离大于1000的边信息:") graph.edges.filter(_.attr > 1000).foreach(println) //按所有机场之间的距离排序(降序) println("降序排列所有机场之间距离") graph.edges.sortBy(-_.attr).collect().foreach(println) }}
运行结果
以上就是关于"spark作业怎么实现"这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注行业资讯频道。
数据
机场
内容
作业
之间
格式
队列
处理
消费
信息
参数
点数
顶点
最大
文件
文章
方式
生产者
知识
竖线
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
瓷都起名软件开发
hak5网络安全
数据库300字总结
公司信息网网络安全
精臣b21无法连接服务器
数据库组成关键字的属性值
个人网络安全十大防范点
数据库默认值设置什么
科技互联网评说
网页数据库服务器连接超时
暗网网站会被服务器
脸脸会网络技术有限公司上市
数据库外键在哪
网络安全管理制度和规则
网络安全知识手抄报的字
梦幻西游所有服务器开区时间
软件开发很难嘛
灵犀科技互联网理财提现
怎么重启网站服务器
浙江网络技术转让优点
软件开发流程建设
全国中小学生网络安全知识
软件开发师培训费用
Java将变量输入数据库
db2使用命令删除数据库
2网络安全警示案例
计算机技术网络安全专业
网络安全要先学什么
数据库用哪个关键字查询数据
魔兽世界 最新服务器