千家信息网

zookeeper分布式锁实现的方法是什么

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇内容介绍了"zookeeper分布式锁实现的方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所
千家信息网最后更新 2025年02月03日zookeeper分布式锁实现的方法是什么

本篇内容介绍了"zookeeper分布式锁实现的方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一。为何使用分布式锁?
当应用服务器数量超过1台,对相同数据的访问可能造成访问冲突(特别是写冲突)。单纯使用关系数据库比如MYSQL的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,最大的缺陷就是可用性降低(性能差)。对于GLEASY这种满足大规模并发访问请求的应用来说,使用数据库事务来实现数据库就有些捉襟见肘了。另外对于一些不依赖数据库的应用,比如分布式文件系统,为了保证同一文件在大量读写操作情况下的正确性,必须引入分布式锁来约束对同一文件的并发操作。

二。对分布式锁的要求
1.高性能(分布式锁不能成为系统的性能瓶颈)
2.避免死锁(拿到锁的结点挂掉不会导致其它结点永远无法继续)
3.支持锁重入

三。方案1,基于zookeeper的分布式锁

/*** DistributedLockUtil.java* 分布式锁工厂类,所有分布式请求都由该工厂类负责**/public class DistributedLockUtil {        private static Object schemeLock = new Object();        private static Object mutexLock = new Object();        private static Map mutexLockMap = new ConcurrentHashMap();           private String schema;        private Map cache  = new ConcurrentHashMap();                private static Map instances = new ConcurrentHashMap();        public static DistributedLockUtil getInstance(String schema){                DistributedLockUtil u = instances.get(schema);                if(u==null){                        synchronized(schemeLock){                                u = instances.get(schema);                                if(u == null){                                        u = new DistributedLockUtil(schema);                                        instances.put(schema, u);                                }                        }                }                return u;        }                private DistributedLockUtil(String schema){                this.schema = schema;        }                private Object getMutex(String key){                Object mx = mutexLockMap.get(key);                if(mx == null){                        synchronized(mutexLock){                                mx = mutexLockMap.get(key);                                if(mx==null){                                        mx = new Object();                                        mutexLockMap.put(key,mx);                                }                        }                }                return mx;        }                private DistributedReentrantLock getLock(String key){                DistributedReentrantLock lock = cache.get(key);                if(lock == null){                        synchronized(getMutex(key)){                                lock = cache.get(key);                                if(lock == null){                                        lock = new DistributedReentrantLock(key,schema);                                        cache.put(key, lock);                                }                        }                }                return lock;        }                public void reset(){                for(String s : cache.keySet()){                        getLock(s).unlock();                }        }                /**         * 尝试加锁         * 如果当前线程已经拥有该锁的话,直接返回false,表示不用再次加锁,此时不应该再调用unlock进行解锁         *          * @param key         * @return         * @throws InterruptedException         * @throws KeeperException         */        public LockStat lock(String key) throws InterruptedException, KeeperException{                if(getLock(key).isOwner()){                        return LockStat.NONEED;                }                getLock(key).lock();                return LockStat.SUCCESS;        }                public void clearLock(String key) throws InterruptedException, KeeperException{                synchronized(getMutex(key)){                        DistributedReentrantLock l = cache.get(key);                        l.clear();                        cache.remove(key);                }        }                       public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{                unlock(key,stat,false);        }        public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{                if(stat == null) return;                if(LockStat.SUCCESS.equals(stat)){                        DistributedReentrantLock lock =  getLock(key);                        boolean hasWaiter = lock.unlock();                        if(!hasWaiter && !keepalive){                                synchronized(getMutex(key)){                                        lock.clear();                                        cache.remove(key);                                }                        }                }        }                public static enum LockStat{                NONEED,                SUCCESS        }}



/***DistributedReentrantLock.java*本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销*/public class DistributedReentrantLock {        private static final Logger logger =  Logger.getLogger(DistributedReentrantLock.class);    private ReentrantLock reentrantLock = new ReentrantLock();    private WriteLock writeLock;    private long timeout = 3*60*1000;        private final Object mutex = new Object();    private String dir;    private String schema;        private final ExitListener exitListener = new ExitListener(){                @Override                public void execute() {                        initWriteLock();                }        };                private synchronized void initWriteLock(){                logger.debug("初始化writeLock");                writeLock = new WriteLock(dir,new LockListener(){                        @Override                        public void lockAcquired() {                                synchronized(mutex){                                        mutex.notify();                                }                        }                        @Override                        public void lockReleased() {                        }                                },schema);                                if(writeLock != null && writeLock.zk != null){                        writeLock.zk.addExitListener(exitListener);                }                                synchronized(mutex){                        mutex.notify();                }        }            public DistributedReentrantLock(String dir,String schema) {             this.dir = dir;            this.schema = schema;            initWriteLock();    }    public void lock(long timeout) throws InterruptedException, KeeperException {        reentrantLock.lock();//多线程竞争时,先拿到第一层锁        try{                boolean res = writeLock.trylock();                if(!res){                        synchronized(mutex){                                        mutex.wait(timeout);                                }                        if(writeLock == null || !writeLock.isOwner()){                                throw new InterruptedException("锁超时");                        }                }        }catch(InterruptedException e){                reentrantLock.unlock();                throw e;        }catch(KeeperException e){                reentrantLock.unlock();                throw e;        }    }        public void lock() throws InterruptedException, KeeperException {            lock(timeout);    }    public void destroy()  throws KeeperException {            writeLock.unlock();    }        public boolean unlock(){            if(!isOwner()) return false;        try{                writeLock.unlock();                reentrantLock.unlock();//多线程竞争时,释放最外层锁        }catch(RuntimeException e){                reentrantLock.unlock();//多线程竞争时,释放最外层锁                throw e;        }                return reentrantLock.hasQueuedThreads();    }    public boolean isOwner() {        return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner();    }        public void clear() {                writeLock.clear();        }}



/***WriteLock.java*基于zk的锁实现*一个最简单的场景如下:*1.结点A请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号1*2.结点B请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号2*3.结点A获取所有结点ID,判断出来自己是最小结点号,于是获得锁*4.结点B获取所有结点ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点A)变更事件*5.结点A拿到锁,处理业务,处理完,释放锁(删除自己)*6.结点B收到结点A变更事件,判断出来自己已经是最小结点号,于是获得锁。*/public class WriteLock extends ZkPrimative {   private static final Logger LOG =  Logger.getLogger(WriteLock.class);    private final String dir;    private String id;    private LockNode idName;    private String ownerId;    private String lastChildId;    private byte[] data = {0x12, 0x34};    private LockListener callback;        public WriteLock(String dir,String schema) {        super(schema,true);        this.dir = dir;    }        public WriteLock(String dir,LockListener callback,String schema) {            this(dir,schema);        this.callback = callback;    }    public LockListener getLockListener() {        return this.callback;    }        public void setLockListener(LockListener callback) {        this.callback = callback;    }    public synchronized void unlock() throws RuntimeException {            if(zk == null || zk.isClosed()){                    return;            }        if (id != null) {            try {                     zk.delete(id, -1);               } catch (InterruptedException e) {                LOG.warn("Caught: " + e, e);                //set that we have been interrupted.               Thread.currentThread().interrupt();            } catch (KeeperException.NoNodeException e) {                // do nothing            } catch (KeeperException e) {                LOG.warn("Caught: " + e, e);                throw (RuntimeException) new RuntimeException(e.getMessage()).                    initCause(e);            }finally {                if (callback != null) {                    callback.lockReleased();                }                id = null;            }        }    }        private class LockWatcher implements Watcher {        public void process(WatchedEvent event) {            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +                     event.getState() + " type " + event.getType());            try {                trylock();            } catch (Exception e) {                LOG.warn("Failed to acquire lock: " + e, e);            }        }    }        private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)         throws KeeperException, InterruptedException {        List names = zookeeper.getChildren(dir, false);        for (String name : names) {            if (name.startsWith(prefix)) {                id = dir + "/" + name;                if (LOG.isDebugEnabled()) {                    LOG.debug("Found id created last time: " + id);                }                break;            }        }        if (id == null) {            id = zookeeper.create(dir + "/" + prefix, data,                     acl, EPHEMERAL_SEQUENTIAL);            if (LOG.isDebugEnabled()) {                LOG.debug("Created id: " + id);            }        }    }        public void clear() {                if(zk == null || zk.isClosed()){                    return;            }                try {                        zk.delete(dir, -1);                } catch (Exception e) {                         LOG.error("clear error: " + e,e);                }         }            public synchronized boolean trylock() throws KeeperException, InterruptedException {            if(zk == null){                    LOG.info("zk 是空");                    return false;            }        if (zk.isClosed()) {                LOG.info("zk 已经关闭");            return false;        }        ensurePathExists(dir);                LOG.debug("id:"+id);        do {            if (id == null) {                long sessionId = zk.getSessionId();                String prefix = "x-" + sessionId + "-";                idName = new LockNode(id);                LOG.debug("idName:"+idName);            }            if (id != null) {                List names = zk.getChildren(dir, false);                if (names.isEmpty()) {                    LOG.warn("No children in: " + dir + " when we've just " +                    "created one! Lets recreate it...");                    id = null;                } else {                    SortedSet sortedNames = new TreeSet();                    for (String name : names) {                        sortedNames.add(new LockNode(dir + "/" + name));                    }                    ownerId = sortedNames.first().getName();                    LOG.debug("all:"+sortedNames);                    SortedSet lessThanMe = sortedNames.headSet(idName);                    LOG.debug("less than me:"+lessThanMe);                    if (!lessThanMe.isEmpty()) {                            LockNode lastChildName = lessThanMe.last();                        lastChildId = lastChildName.getName();                        if (LOG.isDebugEnabled()) {                            LOG.debug("watching less than me node: " + lastChildId);                        }                        Stat stat = zk.exists(lastChildId, new LockWatcher());                        if (stat != null) {                            return Boolean.FALSE;                        } else {                            LOG.warn("Could not find the" +                                            " stats for less than me: " + lastChildName.getName());                        }                    } else {                        if (isOwner()) {                            if (callback != null) {                                callback.lockAcquired();                            }                            return Boolean.TRUE;                        }                    }                }            }        }        while (id == null);        return Boolean.FALSE;    }    public String getDir() {        return dir;    }    public boolean isOwner() {        return id != null && ownerId != null && id.equals(ownerId);    }    public String getId() {       return this.id;    }}

使用本方案实现的分布式锁,可以很好地解决锁重入的问题,而且使用会话结点来避免死锁;性能方面,根据笔者自测结果,加锁解锁各一次算是一个操作,本方案实现的分布式锁,TPS大概为2000-3000,性能比较一般

"zookeeper分布式锁实现的方法是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

结点 分布式 数据 线程 性能 数据库 应用 最小 文件 竞争 方法 最大 事件 事务 内容 外层 工厂 情况 更多 知识 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 建立数据库基本操作csdn 网易的服务器安全问题 pi无法与服务器建立安全连接 常州营销网络技术哪家好 世界清算银行数据库 服务器硬盘指示灯红色 安装一个数据库保存在哪里 简述个人网络安全及隐患 日照智能养老软件开发专业制作 山西潮流软件开发服务价格优惠 枸杞网络技术科技 2021年网络安全知识讲座 自定义服务器丧尸模式 黑色沙漠 数据库文件 腾讯互联网科技心得 网易版服务器怎么招募管理员 计算机网络技术前途怎么样 标准拉曼光谱数据库 校外线上培训网络安全 网络安全是安全生产的内容吗 网络安全人才现状 数据库中多对多触发器怎么写 北京网络安全大会参展单位 河南濮阳联通云服务器云空间 广州正规网络技术开发展示 合肥软件开发培训达内 mac数据库设计工具 查询话费余额显示服务器无法连接 容器环境下 服务器的信息收集 甘肃人社认证服务器故障
0