千家信息网

SparkStreaming整合kafka

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,项目架构:日志数据---->flume----->kafka-------->spark streaming---------->mysql/redis/hbase前置条件:安装zookeeper安装
千家信息网最后更新 2025年02月01日SparkStreaming整合kafka

项目架构

日志数据---->flume----->kafka-------->spark streaming---------->mysql/redis/hbase


前置条件

  • 安装zookeeper
  • 安装flume
  • 安装kafak
  • hadoop实现高可用

(1)实现flume收集数据到kafka

启动kafak:nohup kafka-server-start.sh \/application/kafka_2.11-1.1.0/config/server.properties \1>/home/hadoop/logs/kafka_std.log \2>/home/hadoop/logs/kafka_err.log &
创建一个没有的kafaktopic:kafka-topics.sh \--create \--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka \--replication-factor 3 \--partitions 3 \--topic zy-flume-kafka
查看是否创建成功:kafka-topics.sh \--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka \--describe \--topic zy-flume-kafka


配置flume的采集方案

第一级:exec-avro.conf

agent1.sources = r1agent1.channels = c1agent1.sinks = k1#define sourcesagent1.sources.r1.type = execagent1.sources.r1.command = tail -F /application/flume-1.8.0-bin/data/sample.log#define channelsagent1.channels.c1.type = memoryagent1.channels.c1.capacity = 1000agent1.channels.c1.transactionCapacity = 100#define sinkagent1.sinks.k1.type = avroagent1.sinks.k1.hostname = hadoop02agent1.sinks.k1.port = 3212#bind sources and sink to channelagent1.sources.r1.channels = c1agent1.sinks.k1.channel = c1

第二级:avro-kafka.conf

agent2.sources = r2agent2.channels = c2agent2.sinks = k2#define sourcesagent2.sources.r2.type = avroagent2.sources.r2.bind = hadoop02agent2.sources.r2.port = 3212#define channelsagent2.channels.c2.type = memoryagent2.channels.c2.capacity = 1000agent2.channels.c2.transactionCapacity = 100#define sinkagent2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSinkagent2.sinks.k2.brokerList = hadoop01:9092,hadoop02:9092,hadoop03:9092agent2.sinks.k2.topic = zy-flume-kafkaagent2.sinks.k2.batchSize = 4agent2.sinks.k2.requiredAcks = 1#bind sources and sink to channelagent2.sources.r2.channels = c2agent2.sinks.k2.channel = c2

启动flume
hadoop02:

flume-ng agent \--conf /application/flume-1.8.0-bin/conf/ \--name agent2 \--conf-file /application/flume-1.8.0-bin/flume_sh/avro-kafka.conf \-Dflume.root.logger=DEBUG,console

hadoop01:

flume-ng agent \--conf /application/flume-1.8.0-bin/conf/ \--name agent1 \--conf-file /application/flume-1.8.0-bin/flume_sh/exec-avro.conf \-Dflume.root.logger=DEBUG,console

注意:一定要先启动第二级在启动第一级


测试
启动一个kafakconsumer

kafka-console-consumer.sh \--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \--from-beginning \--topic zy-flume-kafka

向监控文件下添加数据:tail -10 sample.temp>>sample.log
观察kafkaconsumer:消费到数据!!

(2)实现sparkStreaming读取kafka中数据并处理

 SparkStreaming整合kafka有两种方式:
   - receiver +checkpoint方式
   - direct +zookeeper方式

1)receiver +checkpoint方式

代码

/**  * 基于Receiver的方式去读取kafka中的数据  */object _01SparkKafkaReceiverOps {    def main(args: Array[String]): Unit = {        //判断程序传入的参数个数是否正确        //2 hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka first zy-flume-kafka        if (args == null || args.length < 4) {            println(                """                  |Parameter Errors! Usage:                      |batchInterval        : 批次间隔时间                  |zkQuorum             : zookeeper url地址                  |groupId              : 消费组的id                  |topic                : 读取的topic                """.stripMargin)            System.exit(-1)        }        //获取程序传入的参数        val Array(batchInterval, zkQuorum, groupId, topic) = args        //1.构建程序入口        val conf: SparkConf = new SparkConf()            .setMaster("local[2]")            .setAppName("_01SparkKafkaReceiverOps")        val ssc =new StreamingContext(conf,Seconds(2))        /**2.使用Receiver方式读取数据          * @param ssc          * @param zkQuorum          * @param groupId          * @param topics          * @param storageLevel  default: StorageLevel.MEMORY_AND_DISK_SER_2          * @return DStream of (Kafka message key, Kafka message value)          */        val topics = topic.split("\\s+").map((_,3)).toMap        //2.读取数据        val message: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topics)        //3.打印数据        message.print()        //4.提交任务        ssc.start()        ssc.awaitTermination()    }}

注意(receiver +checkpoint):
 - kafka中的topic和sparkstreaming中生成的RDD分区没有关系,在KafkaUtils.createStream中增加分区数只会增加单个receiver的线程数,不会增加spark的并行度
 - 可以创建多个kafka的输入DStream,使用不同的group和topic,使用多个receiver并行接收数据
 - 如果启用了HDFS等有容错的存储系统,并且启用了写入日,则接收到的数据已经被复制到日志中。

2)direct +zookeeper方式

代码实现

package com.zy.streamingimport kafka.common.TopicAndPartitionimport kafka.message.MessageAndMetadataimport kafka.serializer.StringDecoderimport org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}import org.apache.curator.retry.ExponentialBackoffRetryimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}import org.apache.spark.streaming.{Seconds, StreamingContext}/**  * 使用zk来管理的消费的偏移量,确保当SparkStreaming挂掉之后在重启的时候,  * 能够从正确的offset偏移量的位置开始消费,而不是从头开始消费  */object  SparkStreamingDriverHAOps {    //设置zookeeper中存放偏移量的位置    val zkTopicOffsetPath="/offset"    //获取zookeeper的编程入口    val client:CuratorFramework={        val client=CuratorFrameworkFactory.builder()                .connectString("hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka")                .namespace("2019_1_7")            .retryPolicy(new ExponentialBackoffRetry(1000,3))            .build()        client.start()        client    }    def main(args: Array[String]): Unit = {        //屏蔽日志        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)        Logger.getLogger("org.project-spark").setLevel(Level.WARN)        //2 direct zy-flume-kafka        if(args==null||args.length<3){            println(                """                  |Parameter Errors! Usage:                     |batchInterval        : 批次间隔时间                  |groupId              : 消费组的id                  |topic                : 读取的topic                """.stripMargin)            System.exit(-1)        }        //获取传入的参数        val Array(batchInterval,groupId,topic)=args        //1.构建程序入口        val conf: SparkConf = new SparkConf()            .setMaster("local[2]")            .setAppName("SparkStreamingDriverHAOps")        val ssc =new StreamingContext(conf,Seconds(batchInterval.toLong))        //连接kafka的参数        val kafkaParams=Map(            "bootstrap.servers"->"hadoop01:9092,hadoop02:9092,hadoop03:9092", //集群入口            "auto.offset.reset"->"smallest" //消费方式        )        //2.创建kafka的messageval message:DStream[(String,String)]=createMessage(topic,groupId,ssc,kafkaParams)        //3.业务处理,这里主要是介绍如何kafka整合sparkStreaming,所以这里不做业务处理        message.foreachRDD(rdd=>{            if(!rdd.isEmpty()){                println(                    """                      |####################>_<####################                    """.stripMargin+rdd.count())            }            //更新偏移量            storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges,groupId)        })        //4.启动程序        ssc.start()        ssc.awaitTermination()    }    /**      * 创建kafka对应的message      * 分两种情况:      *  1.第一次消费的时候,从zk中读取不到偏移量      *  2.之后的消费从zk中才能读取到偏移量      */    def createMessage(topic: String, groupId: String, ssc: StreamingContext, kafkaParams: Map[String, String]): InputDStream[(String, String)] = {        //获取偏移量,以及判断是否是第一次消费        val (fromOffsets,flag)=getFromOffsets(topic, groupId)        var message:InputDStream[(String, String)] = null        //构建kafka对应的message        if(flag){ //标记位,使用zk中得到的对应的partition偏移量信息,如果有为true            /**              * recordClass: Class[R],              * kafkaParams: JMap[String, String],              * fromOffsets: JMap[TopicAndPartition, JLong],              * messageHandler: JFunction[MessageAndMetadata[K, V], R]              */            val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)            message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)        }else{  //如果是第一次读取为false            /**              * createDirectStream[              * String, key的类型              * String, value的类型              * StringDecoder, key的序列化的类型              * StringDecoder] value的序列化的类型              *              */            message=KafkaUtils.createDirectStream[String,                String,                StringDecoder                , StringDecoder](ssc,kafkaParams,topic.split("\\s+").toSet)        }        message    }    //获取对应的topic中的每一个partition的偏移量信息    def getFromOffsets(topic: String, groupId: String):(Map[TopicAndPartition, Long], Boolean)= {        //构建存储offset的位置信息的路径        val zkPath=s"${zkTopicOffsetPath}/${topic}/${groupId}"        //判断当前路径是否存在,不存在则创建        nsureZKPathExists(zkPath)        //获取所有分区中存储的offset信息        import scala.collection.JavaConversions._        val offsets=for{p<-client.getChildren.forPath(zkPath)}yield{            val offset=new String(client.getData.forPath(s"${zkPath}/${p}")).toLong            (new TopicAndPartition(topic,p.toInt),offset)        }        //如果未空表示第一次读取,无偏移量信息        if(offsets.isEmpty){            (offsets.toMap,false)        }else{            (offsets.toMap,true)        }    }    def storeOffsets(offsetRanges: Array[OffsetRange], groupId: String): Unit = {        for(offsetRange<-offsetRanges){            val partition=offsetRange.partition            val topic=offsetRange.topic            //获取偏移量            val offset=offsetRange.untilOffset            //构建存放偏移量的znodeval path=s"${zkTopicOffsetPath}/${topic}/${groupId}/${partition}"            //判断是否存在,不存在则创建            nsureZKPathExists(path)            client.setData().forPath(path,(""+offset).getBytes())        }    }    def nsureZKPathExists(zkPath: String) = {        //如果为空的话就创建        if(client.checkExists().forPath(zkPath)==null){            //如果父目录不存在,连父目录一起创建            client.create().creatingParentsIfNeeded().forPath(zkPath)        }    }}

注意(direct +zookeeper):
 - 不需要创建多个输入kafka流并将其合并,使用directStream,spark Streaming将创建于使用kafka分区一样多的RDD分区,这些分区的数据全部从kafka并行读取数据,kafka和RDD分区之间有一对一的映射关系。
 - Direct方式没有接收器,不需要预先写入日志,只要kafka数据保留时间足够长就行
 - 保证了正好一次的消费语义(offset保存在zookeeper中)

0