zookeeper分布式锁实现的方法是什么
发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,本篇内容介绍了"zookeeper分布式锁实现的方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所
千家信息网最后更新 2024年12月13日zookeeper分布式锁实现的方法是什么
本篇内容介绍了"zookeeper分布式锁实现的方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
一。为何使用分布式锁?
当应用服务器数量超过1台,对相同数据的访问可能造成访问冲突(特别是写冲突)。单纯使用关系数据库比如MYSQL的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,最大的缺陷就是可用性降低(性能差)。对于GLEASY这种满足大规模并发访问请求的应用来说,使用数据库事务来实现数据库就有些捉襟见肘了。另外对于一些不依赖数据库的应用,比如分布式文件系统,为了保证同一文件在大量读写操作情况下的正确性,必须引入分布式锁来约束对同一文件的并发操作。
二。对分布式锁的要求
1.高性能(分布式锁不能成为系统的性能瓶颈)
2.避免死锁(拿到锁的结点挂掉不会导致其它结点永远无法继续)
3.支持锁重入
三。方案1,基于zookeeper的分布式锁
/*** DistributedLockUtil.java* 分布式锁工厂类,所有分布式请求都由该工厂类负责**/public class DistributedLockUtil { private static Object schemeLock = new Object(); private static Object mutexLock = new Object(); private static MapmutexLockMap = new ConcurrentHashMap(); private String schema; private Map cache = new ConcurrentHashMap (); private static Map instances = new ConcurrentHashMap(); public static DistributedLockUtil getInstance(String schema){ DistributedLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ u = new DistributedLockUtil(schema); instances.put(schema, u); } } } return u; } private DistributedLockUtil(String schema){ this.schema = schema; } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private DistributedReentrantLock getLock(String key){ DistributedReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new DistributedReentrantLock(key,schema); cache.put(key, lock); } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s).unlock(); } } /** * 尝试加锁 * 如果当前线程已经拥有该锁的话,直接返回false,表示不用再次加锁,此时不应该再调用unlock进行解锁 * * @param key * @return * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) throws InterruptedException, KeeperException{ if(getLock(key).isOwner()){ return LockStat.NONEED; } getLock(key).lock(); return LockStat.SUCCESS; } public void clearLock(String key) throws InterruptedException, KeeperException{ synchronized(getMutex(key)){ DistributedReentrantLock l = cache.get(key); l.clear(); cache.remove(key); } } public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{ unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ DistributedReentrantLock lock = getLock(key); boolean hasWaiter = lock.unlock(); if(!hasWaiter && !keepalive){ synchronized(getMutex(key)){ lock.clear(); cache.remove(key); } } } } public static enum LockStat{ NONEED, SUCCESS }}
/***DistributedReentrantLock.java*本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销*/public class DistributedReentrantLock { private static final Logger logger = Logger.getLogger(DistributedReentrantLock.class); private ReentrantLock reentrantLock = new ReentrantLock(); private WriteLock writeLock; private long timeout = 3*60*1000; private final Object mutex = new Object(); private String dir; private String schema; private final ExitListener exitListener = new ExitListener(){ @Override public void execute() { initWriteLock(); } }; private synchronized void initWriteLock(){ logger.debug("初始化writeLock"); writeLock = new WriteLock(dir,new LockListener(){ @Override public void lockAcquired() { synchronized(mutex){ mutex.notify(); } } @Override public void lockReleased() { } },schema); if(writeLock != null && writeLock.zk != null){ writeLock.zk.addExitListener(exitListener); } synchronized(mutex){ mutex.notify(); } } public DistributedReentrantLock(String dir,String schema) { this.dir = dir; this.schema = schema; initWriteLock(); } public void lock(long timeout) throws InterruptedException, KeeperException { reentrantLock.lock();//多线程竞争时,先拿到第一层锁 try{ boolean res = writeLock.trylock(); if(!res){ synchronized(mutex){ mutex.wait(timeout); } if(writeLock == null || !writeLock.isOwner()){ throw new InterruptedException("锁超时"); } } }catch(InterruptedException e){ reentrantLock.unlock(); throw e; }catch(KeeperException e){ reentrantLock.unlock(); throw e; } } public void lock() throws InterruptedException, KeeperException { lock(timeout); } public void destroy() throws KeeperException { writeLock.unlock(); } public boolean unlock(){ if(!isOwner()) return false; try{ writeLock.unlock(); reentrantLock.unlock();//多线程竞争时,释放最外层锁 }catch(RuntimeException e){ reentrantLock.unlock();//多线程竞争时,释放最外层锁 throw e; } return reentrantLock.hasQueuedThreads(); } public boolean isOwner() { return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner(); } public void clear() { writeLock.clear(); }}
/***WriteLock.java*基于zk的锁实现*一个最简单的场景如下:*1.结点A请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号1*2.结点B请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号2*3.结点A获取所有结点ID,判断出来自己是最小结点号,于是获得锁*4.结点B获取所有结点ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点A)变更事件*5.结点A拿到锁,处理业务,处理完,释放锁(删除自己)*6.结点B收到结点A变更事件,判断出来自己已经是最小结点号,于是获得锁。*/public class WriteLock extends ZkPrimative { private static final Logger LOG = Logger.getLogger(WriteLock.class); private final String dir; private String id; private LockNode idName; private String ownerId; private String lastChildId; private byte[] data = {0x12, 0x34}; private LockListener callback; public WriteLock(String dir,String schema) { super(schema,true); this.dir = dir; } public WriteLock(String dir,LockListener callback,String schema) { this(dir,schema); this.callback = callback; } public LockListener getLockListener() { return this.callback; } public void setLockListener(LockListener callback) { this.callback = callback; } public synchronized void unlock() throws RuntimeException { if(zk == null || zk.isClosed()){ return; } if (id != null) { try { zk.delete(id, -1); } catch (InterruptedException e) { LOG.warn("Caught: " + e, e); //set that we have been interrupted. Thread.currentThread().interrupt(); } catch (KeeperException.NoNodeException e) { // do nothing } catch (KeeperException e) { LOG.warn("Caught: " + e, e); throw (RuntimeException) new RuntimeException(e.getMessage()). initCause(e); }finally { if (callback != null) { callback.lockReleased(); } id = null; } } } private class LockWatcher implements Watcher { public void process(WatchedEvent event) { LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + " type " + event.getType()); try { trylock(); } catch (Exception e) { LOG.warn("Failed to acquire lock: " + e, e); } } } private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) throws KeeperException, InterruptedException { Listnames = zookeeper.getChildren(dir, false); for (String name : names) { if (name.startsWith(prefix)) { id = dir + "/" + name; if (LOG.isDebugEnabled()) { LOG.debug("Found id created last time: " + id); } break; } } if (id == null) { id = zookeeper.create(dir + "/" + prefix, data, acl, EPHEMERAL_SEQUENTIAL); if (LOG.isDebugEnabled()) { LOG.debug("Created id: " + id); } } } public void clear() { if(zk == null || zk.isClosed()){ return; } try { zk.delete(dir, -1); } catch (Exception e) { LOG.error("clear error: " + e,e); } } public synchronized boolean trylock() throws KeeperException, InterruptedException { if(zk == null){ LOG.info("zk 是空"); return false; } if (zk.isClosed()) { LOG.info("zk 已经关闭"); return false; } ensurePathExists(dir); LOG.debug("id:"+id); do { if (id == null) { long sessionId = zk.getSessionId(); String prefix = "x-" + sessionId + "-"; idName = new LockNode(id); LOG.debug("idName:"+idName); } if (id != null) { List names = zk.getChildren(dir, false); if (names.isEmpty()) { LOG.warn("No children in: " + dir + " when we've just " + "created one! Lets recreate it..."); id = null; } else { SortedSet sortedNames = new TreeSet (); for (String name : names) { sortedNames.add(new LockNode(dir + "/" + name)); } ownerId = sortedNames.first().getName(); LOG.debug("all:"+sortedNames); SortedSet lessThanMe = sortedNames.headSet(idName); LOG.debug("less than me:"+lessThanMe); if (!lessThanMe.isEmpty()) { LockNode lastChildName = lessThanMe.last(); lastChildId = lastChildName.getName(); if (LOG.isDebugEnabled()) { LOG.debug("watching less than me node: " + lastChildId); } Stat stat = zk.exists(lastChildId, new LockWatcher()); if (stat != null) { return Boolean.FALSE; } else { LOG.warn("Could not find the" + " stats for less than me: " + lastChildName.getName()); } } else { if (isOwner()) { if (callback != null) { callback.lockAcquired(); } return Boolean.TRUE; } } } } } while (id == null); return Boolean.FALSE; } public String getDir() { return dir; } public boolean isOwner() { return id != null && ownerId != null && id.equals(ownerId); } public String getId() { return this.id; }}
使用本方案实现的分布式锁,可以很好地解决锁重入的问题,而且使用会话结点来避免死锁;性能方面,根据笔者自测结果,加锁解锁各一次算是一个操作,本方案实现的分布式锁,TPS大概为2000-3000,性能比较一般
"zookeeper分布式锁实现的方法是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
结点
分布式
数据
线程
性能
数据库
应用
最小
文件
竞争
方法
最大
事件
事务
内容
外层
工厂
情况
更多
知识
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
网络安全知识答题沈阳
济宁互联网养老软件开发公司
临夏县公安局网络安全
印度网络安全性
成都玖鉴互联网科技有限公司咋样
计算机网络技术人员证书作用
手边网络技术有限公司
第四界网络安全宣传周
江苏海航网络技术服务热线
web服务器修复方案
网络安全保卫大队徐炜事迹
网络技术是青春饭吗
数据库与系统工程师
58同城服务器机柜
公安信息网络安全管理含义
淘宝平台服务器是哪家
重庆服务器虚拟化迁移服务器
研究信息网络安全工作
航天开票服务器管理系统如何清卡
宾馆落实网络安全审计依据
中专计算机网络技术试卷
果然叼 服务器
数据库 满足安全 方便
正益互联网科技有限公司官网
软件开发与芯片的关系
点读笔云服务器登录失败
数据库应用及其特点
青浦区数据软件开发质量保障
服务器最大带宽
网络安全小卫士小知识