千家信息网

​基于redis的分布式锁怎么实现

发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容介绍了"基于redis的分布式锁怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!基于r
千家信息网最后更新 2025年01月23日​基于redis的分布式锁怎么实现

本篇内容介绍了"基于redis的分布式锁怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

基于redis的分布式锁

/***分布式锁工厂类*/public class RedisLockUtil {        private static final Logger logger =  Logger.getLogger(RedisLockUtil.class);        private static Object schemeLock = new Object();        private static Map instances = new ConcurrentHashMap();        public static RedisLockUtil getInstance(String schema){                RedisLockUtil u = instances.get(schema);                if(u==null){                        synchronized(schemeLock){                                u = instances.get(schema);                                if(u == null){                                        LockObserver lo = new LockObserver(schema);                                                 u = new RedisLockUtil(schema,lo);                                        instances.put(schema, u);                                }                        }                }                return u;        }        private Object mutexLock = new Object();        private Map mutexLockMap = new ConcurrentHashMap();        private Map cache  = new ConcurrentHashMap();        private DelayQueue dq = new DelayQueue();        private AbstractLockObserver lo;        public RedisLockUtil(String schema, AbstractLockObserver lo){                       Thread th = new Thread(lo);                th.setDaemon(false);                th.setName("Lock Observer:"+schema);                th.start();                clearUselessLocks(schema);                this.lo = lo;        }        public void clearUselessLocks(String schema){                Thread th = new Thread(new Runnable(){                        @Override                        public void run() {                                while(!SystemExitListener.isOver()){                                        try {                                                RedisReentrantLock t = dq.take();                                                                                                if(t.clear()){                                                        String key = t.getKey();                                                        synchronized(getMutex(key)){                                                                cache.remove(key);                                                        }                                                }                                                t.resetCleartime();                                        } catch (InterruptedException e) {                                        }                                }                        }                });                th.setDaemon(true);                th.setName("Lock cleaner:"+schema);                th.start();        }        private Object getMutex(String key){                Object mx = mutexLockMap.get(key);                if(mx == null){                        synchronized(mutexLock){                                mx = mutexLockMap.get(key);                                if(mx==null){                                        mx = new Object();                                        mutexLockMap.put(key,mx);                                }                        }                }                return mx;        }        private RedisReentrantLock getLock(String key,boolean addref){                RedisReentrantLock lock = cache.get(key);                if(lock == null){                        synchronized(getMutex(key)){                                lock = cache.get(key);                                if(lock == null){                                        lock = new RedisReentrantLock(key,lo);                                        cache.put(key, lock);                                }                        }                }                if(addref){                        if(!lock.incRef()){                                synchronized(getMutex(key)){                                        lock = cache.get(key);                                        if(!lock.incRef()){                                                lock = new RedisReentrantLock(key,lo);                                                cache.put(key, lock);                                        }                                }                        }                }                return lock;        }        public void reset(){                for(String s : cache.keySet()){                        getLock(s,false).unlock();                }        }        /**         * 尝试加锁         * 如果当前线程已经拥有该锁的话,直接返回,表示不用再次加锁,此时不应该再调用unlock进行解锁         *          * @param key         * @return         * @throws Exception          * @throws InterruptedException         * @throws KeeperException         */        public LockStat lock(String key) {                return lock(key,-1);        }        public LockStat lock(String key,int timeout) {                RedisReentrantLock ll = getLock(key,true);                ll.incRef();                try{                        if(ll.isOwner(false)){                                ll.descrRef();                                return LockStat.NONEED;                        }                        if(ll.lock(timeout)){                                return LockStat.SUCCESS;                        }else{                                ll.descrRef();                                if(ll.setCleartime()){                                        dq.put(ll);                                }                                return null;                        }                }catch(LockNotExistsException e){                        ll.descrRef();                        return lock(key,timeout);                }catch(RuntimeException e){                        ll.descrRef();                        throw e;                }        }        public void unlock(String key,LockStat stat) {                unlock(key,stat,false);        }        public void unlock(String key,LockStat stat,boolean keepalive){                if(stat == null) return;                if(LockStat.SUCCESS.equals(stat)){                        RedisReentrantLock lock =  getLock(key,false);                        boolean candestroy = lock.unlock();                        if(candestroy && !keepalive){                                if(lock.setCleartime()){                                        dq.put(lock);                                }                        }                }        }        public static enum LockStat{                NONEED,                SUCCESS        }}



/**
*分布式锁本地代理类
*/
public class RedisReentrantLock implements Delayed{
private static final Logger logger = Logger.getLogger(RedisReentrantLock.class);
private ReentrantLock reentrantLock = new ReentrantLock();

private RedisLock redisLock;
private long timeout = 3*60;
private CountDownLatch lockcount = new CountDownLatch(1);

private String key;
private AbstractLockObserver observer;

private int ref = 0;
private Object refLock = new Object();
private boolean destroyed = false;

private long cleartime = -1;

public RedisReentrantLock(String key,AbstractLockObserver observer) {
this.key = key;
this.observer = observer;
initWriteLock();
}

public boolean isDestroyed() {
return destroyed;
}

private synchronized void initWriteLock(){
redisLock = new RedisLock(key,new LockListener(){
@Override
public void lockAcquired() {
lockcount.countDown();
}
@Override
public long getExpire() {
return 0;
}

@Override
public void lockError() {
/*synchronized(mutex){
mutex.notify();
}*/
lockcount.countDown();
}
},observer);
}

public boolean incRef(){
synchronized(refLock){
if(destroyed) return false;
ref ++;
}
return true;
}

public void descrRef(){
synchronized(refLock){
ref --;
}
}

public boolean clear() {
if(destroyed) return true;
synchronized(refLock){
if(ref > 0){
return false;
}
destroyed = true;
redisLock.clear();
redisLock = null;
return true;
}
}

public boolean lock(long timeout) throws LockNotExistsException{
if(timeout <= 0) timeout = this.timeout;
//incRef();
reentrantLock.lock();//多线程竞争时,先拿到第一层锁
if(redisLock == null){
reentrantLock.unlock();
//descrRef();
throw new LockNotExistsException();
}
try{
lockcount = new CountDownLatch(1);
boolean res = redisLock.trylock(timeout);
if(!res){
lockcount.await(timeout, TimeUnit.SECONDS);
//mutex.wait(timeout*1000);
if(!redisLock.doExpire()){
reentrantLock.unlock();
return false;
}
}
return true;
}catch(InterruptedException e){
reentrantLock.unlock();
return false;
}
}

public boolean lock() throws LockNotExistsException {
return lock(timeout);
}

public boolean unlock(){
if(!isOwner(true)) {
try{
throw new RuntimeException("big ================================================ error.key:"+key);
}catch(Exception e){
logger.error("err:"+e,e);
}
return false;
}
try{
redisLock.unlock();
reentrantLock.unlock();//多线程竞争时,释放最外层锁
}catch(RuntimeException e){
reentrantLock.unlock();//多线程竞争时,释放最外层锁
throw e;
}finally{
descrRef();
}
return canDestroy();
}

public boolean canDestroy(){
synchronized(refLock){
return ref <= 0;
}
}

public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}

public boolean isOwner(boolean check) {
synchronized(refLock){
if(redisLock == null) {
logger.error("reidsLock is null:key="+key);
return false;
}
boolean a = reentrantLock.isHeldByCurrentThread();
boolean b = redisLock.isOwner();
if(check){
if(!a || !b){
logger.error(key+";a:"+a+";b:"+b);
}
}
return a && b;
}
}

public boolean setCleartime() {
synchronized(this){
if(cleartime>0) return false;
this.cleartime = System.currentTimeMillis() + 10*1000;
return true;
}
}

public void resetCleartime(){
synchronized(this){
this.cleartime = -1;
}
}

@Override
public int compareTo(Delayed object) {
if(object instanceof RedisReentrantLock){
RedisReentrantLock t = (RedisReentrantLock)object;
long l = this.cleartime - t.cleartime;

if(l > 0) return 1 ; //比当前的小则返回1,比当前的大则返回-1,否则为0
else if(l < 0 ) return -1;
else return 0;
}
return 0;
}

@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return d;
}

}



/***使用Redis实现的分布式锁*基本工作原理如下:*1. 使用setnx(key,时间戮+超时),如果设置成功,则直接拿到锁*2. 如果设置不成功,获取key的值v1(它的到期时间戮),跟当前时间对比,看是否已经超时*3. 如果超时(说明拿到锁的结点已经挂掉),v2=getset(key,时间戮+超时+1),判断v2是否等于v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试(200MS)*/public class RedisLock implements LockListener{        private String key;        private boolean owner = false;        private AbstractLockObserver observer = null;        private LockListener lockListener = null;        private boolean waiting = false;        private long expire;//锁超时时间,以秒为单位        private boolean expired = false;        public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) {                this.key = key;                this.lockListener = lockListener;                this.observer = observer;        }        public boolean trylock(long expire) {                synchronized(this){                        if(owner){                                return true;                        }                        this.expire = expire;                        this.expired = false;                        if(!waiting){                                owner = observer.tryLock(key,expire);                                if(!owner){                                        waiting = true;                                        observer.addLockListener(key, this);                                }                        }                        return owner;                }        }        public boolean isOwner() {                return owner;        }        public void unlock() {                synchronized(this){                        observer.unLock(key);                        owner = false;                }        }        public void clear() {                synchronized(this){                        if(waiting) {                                observer.removeLockListener(key);                                waiting = false;                        }                }        }        public boolean doExpire(){                synchronized(this){                        if(owner) return true;                        if(expired) return false;                        expired =  true;                        clear();                }                return false;        }        @Override        public void lockAcquired() {                synchronized(this){                        if(expired){                                unlock();                                return;                        }                        owner = true;                        waiting = false;                }                lockListener.lockAcquired();        }        @Override        public long getExpire() {                return this.expire;        }        @Override        public void lockError() {                synchronized(this){                        owner = false;                        waiting = false;                        lockListener.lockError();                }        }}



public class LockObserver extends AbstractLockObserver implements Runnable{        private CacheRedisClient client;        private Object mutex = new Object();        private Map lockMap = new ConcurrentHashMap();        private boolean stoped = false;        private long interval = 500;        private boolean terminated = false;        private CountDownLatch doneSignal = new CountDownLatch(1);                public LockObserver(String schema){                client = new CacheRedisClient(schema);                                SystemExitListener.addTerminateListener(new ExitHandler(){                        public void run() {                                stoped = true;                                try {                                        doneSignal.await();                                } catch (InterruptedException e) {                                }                        }                });        }                        public void addLockListener(String key,LockListener listener){                if(terminated){                        listener.lockError();                        return;                }                synchronized(mutex){                        lockMap.put(key, listener);                }        }                public void removeLockListener(String key){                synchronized(mutex){                        lockMap.remove(key);                }        }                @Override        public void run() {                while(!terminated){                        long p1 = System.currentTimeMillis();                        Map clone = new HashMap();                        synchronized(mutex){                                clone.putAll(lockMap);                        }                                               Set keyset = clone.keySet();                        if(keyset.size() > 0){                                ConnectionFactory.setSingleConnectionPerThread(keyset.size());                                for(String key : keyset){                                        LockListener ll = clone.get(key);                                        try{                                            if(tryLock(key,ll.getExpire())) {                                                    ll.lockAcquired();                                                    removeLockListener(key);                                            }                                        }catch(Exception e){                                                ll.lockError();                                                removeLockListener(key);                                        }                                }                                ConnectionFactory.releaseThreadConnection();                        }else{                                if(stoped){                                        terminated = true;                                        doneSignal.countDown();                                        return;                                }                        }                        try {                                long p2 = System.currentTimeMillis();                                long cost = p2 - p1;                                if(cost <= interval){                                        Thread.sleep(interval - cost);                                }else{                                        Thread.sleep(interval*2);                                }                        } catch (InterruptedException e) {                        }                }                        }                        /**         * 超时时间单位为s!!!         * @param key         * @param expire         * @return         */        public boolean tryLock(final String key,final long expireInSecond){                if(terminated) return false;                final long tt = System.currentTimeMillis();                final long expire = expireInSecond * 1000;                final Long ne = tt + expire;                List mm = client.multi(key, new MultiBlock(){                        @Override                        public void execute() {                                transaction.setnxObject(key, ne);                                transaction.get(SafeEncoder.encode(key));                        }                });                Long res = (Long)mm.get(0);            if(new Long(1).equals(res)) {                    return true;            }else{                    byte[] bb = (byte[])mm.get(1);                        Long ex = client.deserialize(bb);                    if(ex == null || tt > ex){                            Long old = client.getSet(key, new Long(ne+1));                            if(old == null || (ex == null&&old==null) || (ex!=null&&ex.equals(old))){                                    return true;                            }                    }            }            return false;        }        public void unLock(String key){                client.del(key);        }}

使用本方案实现的分布式锁,可以完美地解决锁重入问题;通过引入超时也避免了死锁问题;性能方面,笔者自测试结果如下:

500线程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
线程总时间:6553466;平均:13.106932
实际总时间:13609; 平均:0.027218

TPS达到35000,比方案1强了整整一个数量级

"基于redis的分布式锁怎么实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0