Zookeeper接口kazoo的示例分析
发表于:2025-01-18 作者:千家信息网编辑
千家信息网最后更新 2025年01月18日,小编给大家分享一下Zookeeper接口kazoo的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!具体介绍如下。z
千家信息网最后更新 2025年01月18日Zookeeper接口kazoo的示例分析
小编给大家分享一下Zookeeper接口kazoo的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
具体介绍如下。
zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo。因此如何使用kazoo开发基于python的分布式程序是必须掌握的。
1.安装kazoo
yum install python-pippip install kazoo
安装过程中会出现一些python依赖包未安装的情况,安装即可。
2.运行kazoo基础例子kazoo_basic.py
import timefrom kazoo.client import KazooClientfrom kazoo.client import KazooStatedef main(): zk=KazooClient(hosts='127.0.0.1:2182') zk.start() @zk.add_listener def my_listener(state): if state == KazooState.LOST: print("LOST") elif state == KazooState.SUSPENDED: print("SUSPENDED") else: print("Connected") #Creating Nodes # Ensure a path, create if necessary zk.ensure_path("/my/favorite") # Create a node with data zk.create("/my/favorite/node", b"") zk.create("/my/favorite/node/a", b"A") #Reading Data # Determine if a node exists if zk.exists("/my/favorite"): print("/my/favorite is existed") @zk.ChildrenWatch("/my/favorite/node") def watch_children(children): print("Children are now: %s" % children) # Above function called immediately, and from then on @zk.DataWatch("/my/favorite/node") def watch_node(data, stat): print("Version: %s, data: %s" % (stat.version, data.decode("utf-8"))) # Print the version of a node and its data data, stat = zk.get("/my/favorite/node") print("Version: %s, data: %s" % (stat.version, data.decode("utf-8"))) # List the children children = zk.get_children("/my/favorite/node") print("There are %s children with names %s" % (len(children), children)) #Updating Data zk.set("/my/favorite", b"some data") #Deleting Nodes zk.delete("/my/favorite/node/a") #Transactions transaction = zk.transaction() transaction.check('/my/favorite/node', version=-1) transaction.create('/my/favorite/node/b', b"B") results = transaction.commit() print ("Transaction results is %s" % results) zk.delete("/my/favorite/node/b") zk.delete("/my", recursive=True) time.sleep(2) zk.stop()if __name__ == "__main__": try: main() except Exception, ex: print "Ocurred Exception: %s" % str(ex) quit()
运行结果:
Children are now: [u'a']Version: 0, data: Version: 0, data: There are 1 children with names [u'a']Children are now: []Transaction results is [True, u'/my/favorite/node/b']Children are now: [u'b']Children are now: []No handlers could be found for logger "kazoo.recipe.watchers"LOST
以上程序运行了基本kazoo接口命令,包括创建删除加watcher等操作,通过调试并对比zookeeper服务节点znode目录结构的变化,就可以理解具体的操作结果。
3.运行通过kazoo实现的分布式锁程序kazoo_lock.py
import logging, os, timefrom kazoo.client import KazooClientfrom kazoo.client import KazooStatefrom kazoo.recipe.lock import Lockclass ZooKeeperLock(): def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1): self.hosts = hosts self.id_str = id_str self.zk_client = None self.timeout = timeout self.logger = logger self.name = lock_name self.lock_handle = None self.create_lock() def create_lock(self): try: self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout) self.zk_client.start(timeout=self.timeout) except Exception, ex: self.init_ret = False self.err_str = "Create KazooClient failed! Exception: %s" % str(ex) logging.error(self.err_str) return try: lock_path = os.path.join("/", "locks", self.name) self.lock_handle = Lock(self.zk_client, lock_path) except Exception, ex: self.init_ret = False self.err_str = "Create lock failed! Exception: %s" % str(ex) logging.error(self.err_str) return def destroy_lock(self): #self.release() if self.zk_client != None: self.zk_client.stop() self.zk_client = None def acquire(self, blocking=True, timeout=None): if self.lock_handle == None: return None try: return self.lock_handle.acquire(blocking=blocking, timeout=timeout) except Exception, ex: self.err_str = "Acquire lock failed! Exception: %s" % str(ex) logging.error(self.err_str) return None def release(self): if self.lock_handle == None: return None return self.lock_handle.release() def __del__(self): self.destroy_lock()def main(): logger = logging.getLogger() logger.setLevel(logging.INFO) sh = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') sh.setFormatter(formatter) logger.addHandler(sh) zookeeper_hosts = "127.0.0.1:2182" lock_name = "test" lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger) ret = lock.acquire() if not ret: logging.info("Can't get lock! Ret: %s", ret) return logging.info("Get lock! Do something! Sleep 10 secs!") for i in range(1, 11): time.sleep(1) print str(i) lock.release()if __name__ == "__main__": try: main() except Exception, ex: print "Ocurred Exception: %s" % str(ex) quit()
将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。
以上是"Zookeeper接口kazoo的示例分析"这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注行业资讯频道!
接口
运行
分布式
程序
篇文章
示例
分析
内容
结果
utf-8
开发
服务
不怎么
主流
例子
同时
命令
基础
多个
大部分
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
菏泽平台软件开发哪家好
闵行区管理软件开发内容
南京问鼎网络技术
互联网科技改变智慧门诊
重庆服务器电源制造商
金山区品牌软件开发厂家价格
战网提示我们从服务器检索
网站源代码含数据库
读计算机可以软件开发
建立公司小程序营销数据库申请
r星服务器更新慢
服装库存软件开发
数据库应用技术试验心得
网络技术子健
上海夺畅网络技术有限公
网络安全3个部分
公司从事软件开发
北京网络技术代理商
局域网数据库服务器连接
目前网络安全吗
网络安全授权范围
南邮ip网络技术基础试卷
信息网络安全和保密培训会议
甘肃省酒泉市做软件开发找工作
数据库如何做测试
我的世界服务器池cpu
两会网络安全工作总结
网络安全教育宣传手抄报竖版
刀箱服务器
嘉定区互联网软件开发质量保证