千家信息网

zookeeper分布式锁实现的方法是什么

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,本篇内容介绍了"zookeeper分布式锁实现的方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所
千家信息网最后更新 2024年12月13日zookeeper分布式锁实现的方法是什么

本篇内容介绍了"zookeeper分布式锁实现的方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一。为何使用分布式锁?
当应用服务器数量超过1台,对相同数据的访问可能造成访问冲突(特别是写冲突)。单纯使用关系数据库比如MYSQL的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,最大的缺陷就是可用性降低(性能差)。对于GLEASY这种满足大规模并发访问请求的应用来说,使用数据库事务来实现数据库就有些捉襟见肘了。另外对于一些不依赖数据库的应用,比如分布式文件系统,为了保证同一文件在大量读写操作情况下的正确性,必须引入分布式锁来约束对同一文件的并发操作。

二。对分布式锁的要求
1.高性能(分布式锁不能成为系统的性能瓶颈)
2.避免死锁(拿到锁的结点挂掉不会导致其它结点永远无法继续)
3.支持锁重入

三。方案1,基于zookeeper的分布式锁

/*** DistributedLockUtil.java* 分布式锁工厂类,所有分布式请求都由该工厂类负责**/public class DistributedLockUtil {        private static Object schemeLock = new Object();        private static Object mutexLock = new Object();        private static Map mutexLockMap = new ConcurrentHashMap();           private String schema;        private Map cache  = new ConcurrentHashMap();                private static Map instances = new ConcurrentHashMap();        public static DistributedLockUtil getInstance(String schema){                DistributedLockUtil u = instances.get(schema);                if(u==null){                        synchronized(schemeLock){                                u = instances.get(schema);                                if(u == null){                                        u = new DistributedLockUtil(schema);                                        instances.put(schema, u);                                }                        }                }                return u;        }                private DistributedLockUtil(String schema){                this.schema = schema;        }                private Object getMutex(String key){                Object mx = mutexLockMap.get(key);                if(mx == null){                        synchronized(mutexLock){                                mx = mutexLockMap.get(key);                                if(mx==null){                                        mx = new Object();                                        mutexLockMap.put(key,mx);                                }                        }                }                return mx;        }                private DistributedReentrantLock getLock(String key){                DistributedReentrantLock lock = cache.get(key);                if(lock == null){                        synchronized(getMutex(key)){                                lock = cache.get(key);                                if(lock == null){                                        lock = new DistributedReentrantLock(key,schema);                                        cache.put(key, lock);                                }                        }                }                return lock;        }                public void reset(){                for(String s : cache.keySet()){                        getLock(s).unlock();                }        }                /**         * 尝试加锁         * 如果当前线程已经拥有该锁的话,直接返回false,表示不用再次加锁,此时不应该再调用unlock进行解锁         *          * @param key         * @return         * @throws InterruptedException         * @throws KeeperException         */        public LockStat lock(String key) throws InterruptedException, KeeperException{                if(getLock(key).isOwner()){                        return LockStat.NONEED;                }                getLock(key).lock();                return LockStat.SUCCESS;        }                public void clearLock(String key) throws InterruptedException, KeeperException{                synchronized(getMutex(key)){                        DistributedReentrantLock l = cache.get(key);                        l.clear();                        cache.remove(key);                }        }                       public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{                unlock(key,stat,false);        }        public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{                if(stat == null) return;                if(LockStat.SUCCESS.equals(stat)){                        DistributedReentrantLock lock =  getLock(key);                        boolean hasWaiter = lock.unlock();                        if(!hasWaiter && !keepalive){                                synchronized(getMutex(key)){                                        lock.clear();                                        cache.remove(key);                                }                        }                }        }                public static enum LockStat{                NONEED,                SUCCESS        }}



/***DistributedReentrantLock.java*本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销*/public class DistributedReentrantLock {        private static final Logger logger =  Logger.getLogger(DistributedReentrantLock.class);    private ReentrantLock reentrantLock = new ReentrantLock();    private WriteLock writeLock;    private long timeout = 3*60*1000;        private final Object mutex = new Object();    private String dir;    private String schema;        private final ExitListener exitListener = new ExitListener(){                @Override                public void execute() {                        initWriteLock();                }        };                private synchronized void initWriteLock(){                logger.debug("初始化writeLock");                writeLock = new WriteLock(dir,new LockListener(){                        @Override                        public void lockAcquired() {                                synchronized(mutex){                                        mutex.notify();                                }                        }                        @Override                        public void lockReleased() {                        }                                },schema);                                if(writeLock != null && writeLock.zk != null){                        writeLock.zk.addExitListener(exitListener);                }                                synchronized(mutex){                        mutex.notify();                }        }            public DistributedReentrantLock(String dir,String schema) {             this.dir = dir;            this.schema = schema;            initWriteLock();    }    public void lock(long timeout) throws InterruptedException, KeeperException {        reentrantLock.lock();//多线程竞争时,先拿到第一层锁        try{                boolean res = writeLock.trylock();                if(!res){                        synchronized(mutex){                                        mutex.wait(timeout);                                }                        if(writeLock == null || !writeLock.isOwner()){                                throw new InterruptedException("锁超时");                        }                }        }catch(InterruptedException e){                reentrantLock.unlock();                throw e;        }catch(KeeperException e){                reentrantLock.unlock();                throw e;        }    }        public void lock() throws InterruptedException, KeeperException {            lock(timeout);    }    public void destroy()  throws KeeperException {            writeLock.unlock();    }        public boolean unlock(){            if(!isOwner()) return false;        try{                writeLock.unlock();                reentrantLock.unlock();//多线程竞争时,释放最外层锁        }catch(RuntimeException e){                reentrantLock.unlock();//多线程竞争时,释放最外层锁                throw e;        }                return reentrantLock.hasQueuedThreads();    }    public boolean isOwner() {        return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner();    }        public void clear() {                writeLock.clear();        }}



/***WriteLock.java*基于zk的锁实现*一个最简单的场景如下:*1.结点A请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号1*2.结点B请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号2*3.结点A获取所有结点ID,判断出来自己是最小结点号,于是获得锁*4.结点B获取所有结点ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点A)变更事件*5.结点A拿到锁,处理业务,处理完,释放锁(删除自己)*6.结点B收到结点A变更事件,判断出来自己已经是最小结点号,于是获得锁。*/public class WriteLock extends ZkPrimative {   private static final Logger LOG =  Logger.getLogger(WriteLock.class);    private final String dir;    private String id;    private LockNode idName;    private String ownerId;    private String lastChildId;    private byte[] data = {0x12, 0x34};    private LockListener callback;        public WriteLock(String dir,String schema) {        super(schema,true);        this.dir = dir;    }        public WriteLock(String dir,LockListener callback,String schema) {            this(dir,schema);        this.callback = callback;    }    public LockListener getLockListener() {        return this.callback;    }        public void setLockListener(LockListener callback) {        this.callback = callback;    }    public synchronized void unlock() throws RuntimeException {            if(zk == null || zk.isClosed()){                    return;            }        if (id != null) {            try {                     zk.delete(id, -1);               } catch (InterruptedException e) {                LOG.warn("Caught: " + e, e);                //set that we have been interrupted.               Thread.currentThread().interrupt();            } catch (KeeperException.NoNodeException e) {                // do nothing            } catch (KeeperException e) {                LOG.warn("Caught: " + e, e);                throw (RuntimeException) new RuntimeException(e.getMessage()).                    initCause(e);            }finally {                if (callback != null) {                    callback.lockReleased();                }                id = null;            }        }    }        private class LockWatcher implements Watcher {        public void process(WatchedEvent event) {            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +                     event.getState() + " type " + event.getType());            try {                trylock();            } catch (Exception e) {                LOG.warn("Failed to acquire lock: " + e, e);            }        }    }        private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)         throws KeeperException, InterruptedException {        List names = zookeeper.getChildren(dir, false);        for (String name : names) {            if (name.startsWith(prefix)) {                id = dir + "/" + name;                if (LOG.isDebugEnabled()) {                    LOG.debug("Found id created last time: " + id);                }                break;            }        }        if (id == null) {            id = zookeeper.create(dir + "/" + prefix, data,                     acl, EPHEMERAL_SEQUENTIAL);            if (LOG.isDebugEnabled()) {                LOG.debug("Created id: " + id);            }        }    }        public void clear() {                if(zk == null || zk.isClosed()){                    return;            }                try {                        zk.delete(dir, -1);                } catch (Exception e) {                         LOG.error("clear error: " + e,e);                }         }            public synchronized boolean trylock() throws KeeperException, InterruptedException {            if(zk == null){                    LOG.info("zk 是空");                    return false;            }        if (zk.isClosed()) {                LOG.info("zk 已经关闭");            return false;        }        ensurePathExists(dir);                LOG.debug("id:"+id);        do {            if (id == null) {                long sessionId = zk.getSessionId();                String prefix = "x-" + sessionId + "-";                idName = new LockNode(id);                LOG.debug("idName:"+idName);            }            if (id != null) {                List names = zk.getChildren(dir, false);                if (names.isEmpty()) {                    LOG.warn("No children in: " + dir + " when we've just " +                    "created one! Lets recreate it...");                    id = null;                } else {                    SortedSet sortedNames = new TreeSet();                    for (String name : names) {                        sortedNames.add(new LockNode(dir + "/" + name));                    }                    ownerId = sortedNames.first().getName();                    LOG.debug("all:"+sortedNames);                    SortedSet lessThanMe = sortedNames.headSet(idName);                    LOG.debug("less than me:"+lessThanMe);                    if (!lessThanMe.isEmpty()) {                            LockNode lastChildName = lessThanMe.last();                        lastChildId = lastChildName.getName();                        if (LOG.isDebugEnabled()) {                            LOG.debug("watching less than me node: " + lastChildId);                        }                        Stat stat = zk.exists(lastChildId, new LockWatcher());                        if (stat != null) {                            return Boolean.FALSE;                        } else {                            LOG.warn("Could not find the" +                                            " stats for less than me: " + lastChildName.getName());                        }                    } else {                        if (isOwner()) {                            if (callback != null) {                                callback.lockAcquired();                            }                            return Boolean.TRUE;                        }                    }                }            }        }        while (id == null);        return Boolean.FALSE;    }    public String getDir() {        return dir;    }    public boolean isOwner() {        return id != null && ownerId != null && id.equals(ownerId);    }    public String getId() {       return this.id;    }}

使用本方案实现的分布式锁,可以很好地解决锁重入的问题,而且使用会话结点来避免死锁;性能方面,根据笔者自测结果,加锁解锁各一次算是一个操作,本方案实现的分布式锁,TPS大概为2000-3000,性能比较一般

"zookeeper分布式锁实现的方法是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

结点 分布式 数据 线程 性能 数据库 应用 最小 文件 竞争 方法 最大 事件 事务 内容 外层 工厂 情况 更多 知识 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 什么情况下数据库查询无响应 软件开发的测试和评价技术 mssql 演示数据库 公共卫生网络安全 以管理员身份运行数据库会闪退吗 银行网络安全工程师需要具备 东华智中源互联网科技有限公司 自动网络技术开发行业 服务器远程桌面安全工具 网络安全技术与实训参考文献 思科网络技术是什么意思 手机法连接到服务器 东营专业软件开发咨询 易语言获取网页彩票数据库 网络技术的发展轨迹思维导图 软件开发量及报价 企业家数据库 数据库怎么搜索列不为空 华远网络技术有限公司 软件开发调研报告百度文库 杨浦区技术网络技术供应 国土空间总体规划数据库工作思路 服务器修改ip地址 网络技术出来都是干什么的 关于网络安全宣传会的新闻 本钢工业网络安全 数据库怎么自动输出表格 数据库搭建技术 日常网络安全教育内容 手机游戏软件开发岗位
0