python如何实现对kafka的基本操作
发表于:2025-02-02 作者:千家信息网编辑
千家信息网最后更新 2025年02月02日,这篇文章主要为大家展示了"python如何实现对kafka的基本操作",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"python如何实现对kafka的基本操
千家信息网最后更新 2025年02月02日python如何实现对kafka的基本操作
这篇文章主要为大家展示了"python如何实现对kafka的基本操作",内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下"python如何实现对kafka的基本操作"这篇文章吧。
-- coding:utf-8 --
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time
bootstrap_servers = []
class OperateKafka:
def init(self,bootstrap_servers,topic):
self.bootstrap_servers = bootstrap_servers
self.topic = topic
"""生产者"""def produce(self): producer = KafkaProducer(bootstrap_servers=self.bootstrap_servers) for i in range(4): msg = "msg%d" %i producer.send(self.topic,key=str(i),value=msg) producer.close()"""一个消费者消费一个topic"""def consume(self): #consumer = KafkaConsumer(self.topic,auto_offset_reset='earliest',group_id="testgroup",bootstrap_servers=self.bootstrap_servers) consumer = KafkaConsumer(self.topic,bootstrap_servers=self.bootstrap_servers) print consumer.partitions_for_topic(self.topic) #获取test主题的分区信息print consumer.topics() #获取主题列表print consumer.subscription() #获取当前消费者订阅的主题print consumer.assignment() #获取当前消费者topic、分区信息print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量consumer.seek(TopicPartition(topic=self.topic, partition=0), 1) #重置偏移量,从第1个偏移量消费 for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic,message.partition,message.offset, message.key,message.value))"""一个消费者订阅多个topic """def consume2(self): consumer = KafkaConsumer(bootstrap_servers=['192.168.124.201:9092'])consumer.subscribe(topics=('TEST','TEST2')) #订阅要消费的主题print consumer.topics()print consumer.position(TopicPartition(topic='TEST', partition=0)) #获取当前主题的最新偏移量for message in consumer: print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value))"""消费者(手动拉取消息)"""def consume3(self): consumer = KafkaConsumer(group_id="mygroup",max_poll_records=3,bootstrap_servers=['192.168.124.201:9092'])consumer.subscribe(topics=('TEST','TEST2'))while True: message = consumer.poll(timeout_ms=5) #从kafka获取消息 if message: print message time.sleep(1)
def main():
bootstrap_servers = ['192.168.124.201:9092']
topic = "TEST"
operateKafka = OperateKafka(bootstrap_servers,topic)
operateKafka.produce()
#operateKafka.consume()
#operateKafka.consume2()
operateKafka.consume3()
main()
以上是"python如何实现对kafka的基本操作"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
消费
消费者
主题
偏移
基本操作
内容
篇文章
订阅
信息
学习
帮助
多个
手动
易懂
更多
条理
消息
生产者
知识
编带
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
数据库开发怎么学
我的世界怎么得到服务器模组
数据库安全有哪些保障
销售软件开发成本
大学生网络安全比赛
网络安全小剧本8分钟搞笑
embl数据库登录网址
视频服务器软件 推荐
数据库不同用户之间同步数据
提升个人网络安全意识ppt
美国服务器虚拟主机
数据库用户密码如何设计
杭州猫科网络技术有限公司
网页打不开无法与服务器连接
毒网络安全手抄报
衡水鸿洋软件开发有限公司
nas个人网址服务器
软件连接服务器
证券公司 软件开发 意义
火狐代理服务器拒绝连接
易保网络技术工资
移动游戏进不去电信服务器
加强节日期间网络安全防护
我的世界服务器的ip
最牛数据库
铜陵系统软件开发要多少钱
湖北常规软件开发要多少钱
深圳豹变网络技术
迅捷网络安全吗
做网络安全收入