千家信息网

spark读取kafka数据流

发表于:2025-01-24 作者:千家信息网编辑
千家信息网最后更新 2025年01月24日,spark读取kafka数据流提供了两种方式createDstream和createDirectStream。两者区别如下:1、KafkaUtils.createDstream构造函数为KafkaUt
千家信息网最后更新 2025年01月24日spark读取kafka数据流

spark读取kafka数据流提供了两种方式createDstream和createDirectStream。

两者区别如下:

1、KafkaUtils.createDstream

构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )
使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在Spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上
A、创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量
B、对于不同的group和topic可以使用多个receivers创建不同的DStream
C、如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)

2.KafkaUtils.createDirectStream

区别Receiver接收数据,这种方式定期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api
优点:
A、 简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。
B、高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中
C、恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具


public void adclick(){

SparkConf conf = new SparkConf()

.setAppName("")

.setMaster("");

JavaStreamingContext jssc = new JavaStreamingContext(conf,Durations.seconds(10));

jssc.checkpoint("");

Map kafkaParams = new HashMap();

kafkaParams.put("metadata.broker.list", ConfigurationManager.getProperty("metadata.broker.list"));

String kafkaTopics = ConfigurationManager.getProperty("kafkaTopics");

String[] kafkaTopicsSplits = kafkaTopics.split(",");

Set tops = new HashSet();

for(String xx:kafkaTopicsSplits){

tops.add(xx);

}

JavaPairInputDStream adRealTimeDStream = KafkaUtils.

createDirectStream(

jssc,

String.class,

String.class,

StringDecoder.class,

StringDecoder.class,

kafkaParams,

tops);

jssc.start();

jssc.awaitTermination();

jssc.close();

}


数据 偏移 方式 处理 消费 不同 一致 多个 消费者 高层 高层次 存储 数据流 个数 主体 优点 会创 传统 函数 可能性 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 一些新兴的互联网科技 虹口区通用软件开发零售价格 青岛定制软件开发推荐 数据库中如何定义数组 coc 分服务器 奥比岛手游服务器问题 广州恒通互联网科技 软件开发项目立项记录 计算机网络技术所应对的职业 网络安全行政执法的重要性 济南移动软件开发公司 电话软件开发具体地址 中央网络安全与信息化提出了 微擎修改数据库账号 光遇全物品服务器连接失败怎么办 同花顺数据库财报数据 关于部队网络安全的案例分析 南京信息网络技术创新服务 软件开发qq怎么群发 论述数据库技术 涉密软件开发资质公司 服务器架设传奇卡顿 医院信息系统软件开发流程 怒火一刀通用服务器下载 维普期刊全文数据库本地 文献数据库o l 青海网络安全大赛田晓程 巩义市力恒网络技术服务有限公司 能干吗 开展网络安全活动的通知 大学网络安全与信息化工作总结
0