千家信息网

spark作业怎么实现

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,这篇"spark作业怎么实现"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"spark
千家信息网最后更新 2025年02月03日spark作业怎么实现

这篇"spark作业怎么实现"文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇"spark作业怎么实现"文章吧。

将sample.log的数据发送到Kafka中,经过Spark Streaming处理,将数据格式变为以下形式:commandid | houseid | gathertime | srcip | destip |srcport| destport | domainname | proxytype | proxyip | proxytype | title | content | url | logid在发送到kafka的另一个队列中要求:1、sample.log => 读文件,将数据发送到kafka队列中2、从kafka队列中获取数据(0.10 接口不管理offset),变更数据格式3、处理后的数据在发送到kafka另一个队列中分析1 使用课程中的redis工具类管理offset2 读取日志数据发送数据到topic13 消费主题,将数据的分割方式修改为竖线分割,再次发送到topic2

1.OffsetsWithRedisUtils

package home.oneimport java.utilimport org.apache.kafka.common.TopicPartitionimport org.apache.spark.streaming.kafka010.OffsetRangeimport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}import scala.collection.mutableobject OffsetsWithRedisUtils {  // 定义Redis参数  privateval redisHost = "linux123"  privateval redisPort = 6379  // 获取Redis的连接  privateval config = new JedisPoolConfig  // 最大空闲数  config.setMaxIdle(5)  // 最大连接数  config.setMaxTotal(10)  privateval pool = new JedisPool(config, redisHost, redisPort, 10000)  private def getRedisConnection: Jedis = pool.getResource  privateval topicPrefix = "kafka:topic"  // Key:kafka:topic:TopicName:groupid  private def getKey(topic: String, groupid: String) = s"$topicPrefix:$topic:$groupid"  // 根据 key 获取offsets  def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {    val jedis: Jedis = getRedisConnection    val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map { topic =>      val key = getKey(topic, groupId)      import scala.collection.JavaConverters._      // 将获取到的redis数据由Java的map转换为scala的map,数据格式为{key:[{partition,offset}]}      jedis.hgetAll(key)        .asScala        .map { case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong }    }    // 归还资源    jedis.close()    offsets.flatten.toMap  }  // 将offsets保存到Redis中  def saveOffsetsToRedis(offsets: Array[OffsetRange], groupId: String): Unit = {    // 获取连接    val jedis: Jedis = getRedisConnection    // 组织数据    offsets.map{range => (range.topic, (range.partition.toString, range.untilOffset.toString))}      .groupBy(_._1)      .foreach{case (topic, buffer) =>        val key: String = getKey(topic, groupId)        import scala.collection.JavaConverters._        // 同样将scala的map转换为Java的map存入redis中        val maps: util.Map[String, String] = buffer.map(_._2).toMap.asJava        // 保存数据        jedis.hmset(key, maps)      }    jedis.close()  }}
  1. KafkaProducer

package home.oneimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.serialization.StringSerializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}object KafkaProducer {  def main(args: Array[String]): Unit = {    Logger.getLogger("org").setLevel(Level.ERROR)    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")    val sc = new SparkContext(conf)    // 读取sample.log文件数据    val lines: RDD[String] = sc.textFile("data/sample.log")    // 定义 kafka producer参数    val prop = new Properties()    // kafka的访问地址    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")    // key和value的序列化方式    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])    // 将读取到的数据发送到mytopic1    lines.foreachPartition{iter =>      // 初始化KafkaProducer      val producer = new KafkaProducer[String, String](prop)      iter.foreach{line =>        // 封装数据        val record = new ProducerRecord[String, String]("mytopic1", line)        // 发送数据        producer.send(record)      }      producer.close()    }  }}

3.HomeOne

package home.oneimport java.util.Propertiesimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}import org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{Seconds, StreamingContext}object HomeOne {  val log = Logger.getLogger(this.getClass)  def main(args: Array[String]): Unit = {    Logger.getLogger("org").setLevel(Level.ERROR)    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName).setMaster("local[*]")    val ssc = new StreamingContext(conf, Seconds(5))    // 需要消费的topic    val topics: Array[String] = Array("mytopic1")    val groupid = "mygroup1"    // 定义kafka相关参数    val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupid)    // 从Redis获取offset    val fromOffsets = OffsetsWithRedisUtils.getOffsetsFromRedis(topics, groupid)    // 创建DStream    val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(      ssc,      LocationStrategies.PreferConsistent,      // 从kafka中读取数据      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, fromOffsets)    )    // 转换后的数据发送到另一个topic    dstream.foreachRDD { rdd =>      if (!rdd.isEmpty) {        // 获取消费偏移量        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges        // 处理数据发送到topic2        rdd.foreachPartition(process)        // 将offset保存到Redis        OffsetsWithRedisUtils.saveOffsetsToRedis(offsetRanges, groupid)      }    }    // 启动作业    ssc.start()    // 持续执行    ssc.awaitTermination()  }  // 将处理后的数据发送到topic2  def process(iter: Iterator[ConsumerRecord[String, String]]) = {    iter.map(line => parse(line.value))      .filter(!_.isEmpty)      .foreach(line => sendMsg2Topic(line, "mytopic2"))  }  // 调用kafka生产者发送消息  def sendMsg2Topic(msg: String, topic: String): Unit = {    val producer = new KafkaProducer[String, String](getKafkaProducerParameters())    val record = new ProducerRecord[String, String](topic, msg)    producer.send(record)  }  // 修改数据格式,将逗号分隔变成竖线分割  def parse(text: String): String = {    try {      val arr = text.replace("<<>>", "").split(",")      if (arr.length != 15) return ""      arr.mkString("|")    } catch {      case e: Exception =>        log.error("解析数据出错!", e)        ""    }  }  // 定义kafka消费者的配置信息  def getKafkaConsumerParameters(groupid: String): Map[String, Object] = {    Map[String, Object](      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "linux121:9092",      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],      ConsumerConfig.GROUP_ID_CONFIG -> groupid,      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),    )  }  // 定义生产者的kafka配置  def getKafkaProducerParameters(): Properties = {    val prop = new Properties()    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux121:9092")    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])    prop  }}

2

/*假设机场的数据如下:1, "SFO"2, "ORD"3, "DFW"机场两两之间的航线及距离如下:1, 2,18002, 3, 8003, 1, 1400用 GraphX 完成以下需求:求所有的顶点求所有的边求所有的triplets求顶点数求边数求机场距离大于1000的有几个,有哪些按所有机场之间的距离排序(降序),输出结果 */

代码:

import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.graphx.{Edge, Graph, VertexId}import org.apache.spark.rdd.RDDobject TwoHome {  def main(args: Array[String]): Unit = {    // 初始化    val conf = new SparkConf().setAppName(this.getClass.getCanonicalName.init).setMaster("local[*]")    val sc = new SparkContext(conf)    sc.setLogLevel("warn")    //初始化数据    val vertexArray: Array[(Long, String)] = Array((1L, "SFO"), (2L, "ORD"), (3L, "DFW"))    val edgeArray: Array[Edge[Int]] = Array(      Edge(1L, 2L, 1800),      Edge(2L, 3L, 800),      Edge(3L, 1L, 1400)    )    //构造vertexRDD和edgeRDD    val vertexRDD: RDD[(VertexId, String)] = sc.makeRDD(vertexArray)    val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)    //构造图    val graph: Graph[String, Int] = Graph(vertexRDD, edgeRDD)    //所有的顶点    println("所有顶点:")    graph.vertices.foreach(println)    //所有的边    println("所有边:")    graph.edges.foreach(println)    //所有的triplets    println("所有三元组信息:")    graph.triplets.foreach(println)    //求顶点数    val vertexCnt = graph.vertices.count()    println(s"总顶点数:$vertexCnt")    //求边数    val edgeCnt = graph.edges.count()    println(s"总边数:$edgeCnt")    //机场距离大于1000的    println("机场距离大于1000的边信息:")    graph.edges.filter(_.attr > 1000).foreach(println)    //按所有机场之间的距离排序(降序)    println("降序排列所有机场之间距离")    graph.edges.sortBy(-_.attr).collect().foreach(println)  }}

运行结果

以上就是关于"spark作业怎么实现"这篇文章的内容,相信大家都有了一定的了解,希望小编分享的内容对大家有帮助,若想了解更多相关的知识内容,请关注行业资讯频道。

0