千家信息网

zookeeper分布式锁实现的方法是什么

发表于:2024-12-13 作者:千家信息网编辑
千家信息网最后更新 2024年12月13日,本篇内容介绍了"zookeeper分布式锁实现的方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所
千家信息网最后更新 2024年12月13日zookeeper分布式锁实现的方法是什么

本篇内容介绍了"zookeeper分布式锁实现的方法是什么"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一。为何使用分布式锁?
当应用服务器数量超过1台,对相同数据的访问可能造成访问冲突(特别是写冲突)。单纯使用关系数据库比如MYSQL的应用可以借助于事务来实现锁,也可以使用版本号等实现乐观锁,最大的缺陷就是可用性降低(性能差)。对于GLEASY这种满足大规模并发访问请求的应用来说,使用数据库事务来实现数据库就有些捉襟见肘了。另外对于一些不依赖数据库的应用,比如分布式文件系统,为了保证同一文件在大量读写操作情况下的正确性,必须引入分布式锁来约束对同一文件的并发操作。

二。对分布式锁的要求
1.高性能(分布式锁不能成为系统的性能瓶颈)
2.避免死锁(拿到锁的结点挂掉不会导致其它结点永远无法继续)
3.支持锁重入

三。方案1,基于zookeeper的分布式锁

/*** DistributedLockUtil.java* 分布式锁工厂类,所有分布式请求都由该工厂类负责**/public class DistributedLockUtil {        private static Object schemeLock = new Object();        private static Object mutexLock = new Object();        private static Map mutexLockMap = new ConcurrentHashMap();           private String schema;        private Map cache  = new ConcurrentHashMap();                private static Map instances = new ConcurrentHashMap();        public static DistributedLockUtil getInstance(String schema){                DistributedLockUtil u = instances.get(schema);                if(u==null){                        synchronized(schemeLock){                                u = instances.get(schema);                                if(u == null){                                        u = new DistributedLockUtil(schema);                                        instances.put(schema, u);                                }                        }                }                return u;        }                private DistributedLockUtil(String schema){                this.schema = schema;        }                private Object getMutex(String key){                Object mx = mutexLockMap.get(key);                if(mx == null){                        synchronized(mutexLock){                                mx = mutexLockMap.get(key);                                if(mx==null){                                        mx = new Object();                                        mutexLockMap.put(key,mx);                                }                        }                }                return mx;        }                private DistributedReentrantLock getLock(String key){                DistributedReentrantLock lock = cache.get(key);                if(lock == null){                        synchronized(getMutex(key)){                                lock = cache.get(key);                                if(lock == null){                                        lock = new DistributedReentrantLock(key,schema);                                        cache.put(key, lock);                                }                        }                }                return lock;        }                public void reset(){                for(String s : cache.keySet()){                        getLock(s).unlock();                }        }                /**         * 尝试加锁         * 如果当前线程已经拥有该锁的话,直接返回false,表示不用再次加锁,此时不应该再调用unlock进行解锁         *          * @param key         * @return         * @throws InterruptedException         * @throws KeeperException         */        public LockStat lock(String key) throws InterruptedException, KeeperException{                if(getLock(key).isOwner()){                        return LockStat.NONEED;                }                getLock(key).lock();                return LockStat.SUCCESS;        }                public void clearLock(String key) throws InterruptedException, KeeperException{                synchronized(getMutex(key)){                        DistributedReentrantLock l = cache.get(key);                        l.clear();                        cache.remove(key);                }        }                       public void unlock(String key,LockStat stat) throws InterruptedException, KeeperException{                unlock(key,stat,false);        }        public void unlock(String key,LockStat stat,boolean keepalive) throws InterruptedException, KeeperException{                if(stat == null) return;                if(LockStat.SUCCESS.equals(stat)){                        DistributedReentrantLock lock =  getLock(key);                        boolean hasWaiter = lock.unlock();                        if(!hasWaiter && !keepalive){                                synchronized(getMutex(key)){                                        lock.clear();                                        cache.remove(key);                                }                        }                }        }                public static enum LockStat{                NONEED,                SUCCESS        }}



/***DistributedReentrantLock.java*本地线程之间锁争用,先使用虚拟机内部锁机制,减少结点间通信开销*/public class DistributedReentrantLock {        private static final Logger logger =  Logger.getLogger(DistributedReentrantLock.class);    private ReentrantLock reentrantLock = new ReentrantLock();    private WriteLock writeLock;    private long timeout = 3*60*1000;        private final Object mutex = new Object();    private String dir;    private String schema;        private final ExitListener exitListener = new ExitListener(){                @Override                public void execute() {                        initWriteLock();                }        };                private synchronized void initWriteLock(){                logger.debug("初始化writeLock");                writeLock = new WriteLock(dir,new LockListener(){                        @Override                        public void lockAcquired() {                                synchronized(mutex){                                        mutex.notify();                                }                        }                        @Override                        public void lockReleased() {                        }                                },schema);                                if(writeLock != null && writeLock.zk != null){                        writeLock.zk.addExitListener(exitListener);                }                                synchronized(mutex){                        mutex.notify();                }        }            public DistributedReentrantLock(String dir,String schema) {             this.dir = dir;            this.schema = schema;            initWriteLock();    }    public void lock(long timeout) throws InterruptedException, KeeperException {        reentrantLock.lock();//多线程竞争时,先拿到第一层锁        try{                boolean res = writeLock.trylock();                if(!res){                        synchronized(mutex){                                        mutex.wait(timeout);                                }                        if(writeLock == null || !writeLock.isOwner()){                                throw new InterruptedException("锁超时");                        }                }        }catch(InterruptedException e){                reentrantLock.unlock();                throw e;        }catch(KeeperException e){                reentrantLock.unlock();                throw e;        }    }        public void lock() throws InterruptedException, KeeperException {            lock(timeout);    }    public void destroy()  throws KeeperException {            writeLock.unlock();    }        public boolean unlock(){            if(!isOwner()) return false;        try{                writeLock.unlock();                reentrantLock.unlock();//多线程竞争时,释放最外层锁        }catch(RuntimeException e){                reentrantLock.unlock();//多线程竞争时,释放最外层锁                throw e;        }                return reentrantLock.hasQueuedThreads();    }    public boolean isOwner() {        return reentrantLock.isHeldByCurrentThread() && writeLock.isOwner();    }        public void clear() {                writeLock.clear();        }}



/***WriteLock.java*基于zk的锁实现*一个最简单的场景如下:*1.结点A请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号1*2.结点B请求加锁,在特定路径下注册自己(会话自增结点),得到一个ID号2*3.结点A获取所有结点ID,判断出来自己是最小结点号,于是获得锁*4.结点B获取所有结点ID,判断出来自己不是最小结点,于是监听小于自己的最大结点(结点A)变更事件*5.结点A拿到锁,处理业务,处理完,释放锁(删除自己)*6.结点B收到结点A变更事件,判断出来自己已经是最小结点号,于是获得锁。*/public class WriteLock extends ZkPrimative {   private static final Logger LOG =  Logger.getLogger(WriteLock.class);    private final String dir;    private String id;    private LockNode idName;    private String ownerId;    private String lastChildId;    private byte[] data = {0x12, 0x34};    private LockListener callback;        public WriteLock(String dir,String schema) {        super(schema,true);        this.dir = dir;    }        public WriteLock(String dir,LockListener callback,String schema) {            this(dir,schema);        this.callback = callback;    }    public LockListener getLockListener() {        return this.callback;    }        public void setLockListener(LockListener callback) {        this.callback = callback;    }    public synchronized void unlock() throws RuntimeException {            if(zk == null || zk.isClosed()){                    return;            }        if (id != null) {            try {                     zk.delete(id, -1);               } catch (InterruptedException e) {                LOG.warn("Caught: " + e, e);                //set that we have been interrupted.               Thread.currentThread().interrupt();            } catch (KeeperException.NoNodeException e) {                // do nothing            } catch (KeeperException e) {                LOG.warn("Caught: " + e, e);                throw (RuntimeException) new RuntimeException(e.getMessage()).                    initCause(e);            }finally {                if (callback != null) {                    callback.lockReleased();                }                id = null;            }        }    }        private class LockWatcher implements Watcher {        public void process(WatchedEvent event) {            LOG.debug("Watcher fired on path: " + event.getPath() + " state: " +                     event.getState() + " type " + event.getType());            try {                trylock();            } catch (Exception e) {                LOG.warn("Failed to acquire lock: " + e, e);            }        }    }        private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir)         throws KeeperException, InterruptedException {        List names = zookeeper.getChildren(dir, false);        for (String name : names) {            if (name.startsWith(prefix)) {                id = dir + "/" + name;                if (LOG.isDebugEnabled()) {                    LOG.debug("Found id created last time: " + id);                }                break;            }        }        if (id == null) {            id = zookeeper.create(dir + "/" + prefix, data,                     acl, EPHEMERAL_SEQUENTIAL);            if (LOG.isDebugEnabled()) {                LOG.debug("Created id: " + id);            }        }    }        public void clear() {                if(zk == null || zk.isClosed()){                    return;            }                try {                        zk.delete(dir, -1);                } catch (Exception e) {                         LOG.error("clear error: " + e,e);                }         }            public synchronized boolean trylock() throws KeeperException, InterruptedException {            if(zk == null){                    LOG.info("zk 是空");                    return false;            }        if (zk.isClosed()) {                LOG.info("zk 已经关闭");            return false;        }        ensurePathExists(dir);                LOG.debug("id:"+id);        do {            if (id == null) {                long sessionId = zk.getSessionId();                String prefix = "x-" + sessionId + "-";                idName = new LockNode(id);                LOG.debug("idName:"+idName);            }            if (id != null) {                List names = zk.getChildren(dir, false);                if (names.isEmpty()) {                    LOG.warn("No children in: " + dir + " when we've just " +                    "created one! Lets recreate it...");                    id = null;                } else {                    SortedSet sortedNames = new TreeSet();                    for (String name : names) {                        sortedNames.add(new LockNode(dir + "/" + name));                    }                    ownerId = sortedNames.first().getName();                    LOG.debug("all:"+sortedNames);                    SortedSet lessThanMe = sortedNames.headSet(idName);                    LOG.debug("less than me:"+lessThanMe);                    if (!lessThanMe.isEmpty()) {                            LockNode lastChildName = lessThanMe.last();                        lastChildId = lastChildName.getName();                        if (LOG.isDebugEnabled()) {                            LOG.debug("watching less than me node: " + lastChildId);                        }                        Stat stat = zk.exists(lastChildId, new LockWatcher());                        if (stat != null) {                            return Boolean.FALSE;                        } else {                            LOG.warn("Could not find the" +                                            " stats for less than me: " + lastChildName.getName());                        }                    } else {                        if (isOwner()) {                            if (callback != null) {                                callback.lockAcquired();                            }                            return Boolean.TRUE;                        }                    }                }            }        }        while (id == null);        return Boolean.FALSE;    }    public String getDir() {        return dir;    }    public boolean isOwner() {        return id != null && ownerId != null && id.equals(ownerId);    }    public String getId() {       return this.id;    }}

使用本方案实现的分布式锁,可以很好地解决锁重入的问题,而且使用会话结点来避免死锁;性能方面,根据笔者自测结果,加锁解锁各一次算是一个操作,本方案实现的分布式锁,TPS大概为2000-3000,性能比较一般

"zookeeper分布式锁实现的方法是什么"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0