基于redis的分布式锁怎么实现
发表于:2025-01-23 作者:千家信息网编辑
千家信息网最后更新 2025年01月23日,本篇内容介绍了"基于redis的分布式锁怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!基于r
千家信息网最后更新 2025年01月23日基于redis的分布式锁怎么实现
本篇内容介绍了"基于redis的分布式锁怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
基于redis的分布式锁
/***分布式锁工厂类*/public class RedisLockUtil { private static final Logger logger = Logger.getLogger(RedisLockUtil.class); private static Object schemeLock = new Object(); private static Mapinstances = new ConcurrentHashMap(); public static RedisLockUtil getInstance(String schema){ RedisLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ LockObserver lo = new LockObserver(schema); u = new RedisLockUtil(schema,lo); instances.put(schema, u); } } } return u; } private Object mutexLock = new Object(); private Map mutexLockMap = new ConcurrentHashMap(); private Map cache = new ConcurrentHashMap (); private DelayQueue dq = new DelayQueue (); private AbstractLockObserver lo; public RedisLockUtil(String schema, AbstractLockObserver lo){ Thread th = new Thread(lo); th.setDaemon(false); th.setName("Lock Observer:"+schema); th.start(); clearUselessLocks(schema); this.lo = lo; } public void clearUselessLocks(String schema){ Thread th = new Thread(new Runnable(){ @Override public void run() { while(!SystemExitListener.isOver()){ try { RedisReentrantLock t = dq.take(); if(t.clear()){ String key = t.getKey(); synchronized(getMutex(key)){ cache.remove(key); } } t.resetCleartime(); } catch (InterruptedException e) { } } } }); th.setDaemon(true); th.setName("Lock cleaner:"+schema); th.start(); } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private RedisReentrantLock getLock(String key,boolean addref){ RedisReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } if(addref){ if(!lock.incRef()){ synchronized(getMutex(key)){ lock = cache.get(key); if(!lock.incRef()){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s,false).unlock(); } } /** * 尝试加锁 * 如果当前线程已经拥有该锁的话,直接返回,表示不用再次加锁,此时不应该再调用unlock进行解锁 * * @param key * @return * @throws Exception * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) { return lock(key,-1); } public LockStat lock(String key,int timeout) { RedisReentrantLock ll = getLock(key,true); ll.incRef(); try{ if(ll.isOwner(false)){ ll.descrRef(); return LockStat.NONEED; } if(ll.lock(timeout)){ return LockStat.SUCCESS; }else{ ll.descrRef(); if(ll.setCleartime()){ dq.put(ll); } return null; } }catch(LockNotExistsException e){ ll.descrRef(); return lock(key,timeout); }catch(RuntimeException e){ ll.descrRef(); throw e; } } public void unlock(String key,LockStat stat) { unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive){ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ RedisReentrantLock lock = getLock(key,false); boolean candestroy = lock.unlock(); if(candestroy && !keepalive){ if(lock.setCleartime()){ dq.put(lock); } } } } public static enum LockStat{ NONEED, SUCCESS }}
/**
*分布式锁本地代理类
*/
public class RedisReentrantLock implements Delayed{
private static final Logger logger = Logger.getLogger(RedisReentrantLock.class);
private ReentrantLock reentrantLock = new ReentrantLock();
private RedisLock redisLock;
private long timeout = 3*60;
private CountDownLatch lockcount = new CountDownLatch(1);
private String key;
private AbstractLockObserver observer;
private int ref = 0;
private Object refLock = new Object();
private boolean destroyed = false;
private long cleartime = -1;
public RedisReentrantLock(String key,AbstractLockObserver observer) {
this.key = key;
this.observer = observer;
initWriteLock();
}
public boolean isDestroyed() {
return destroyed;
}
private synchronized void initWriteLock(){
redisLock = new RedisLock(key,new LockListener(){
@Override
public void lockAcquired() {
lockcount.countDown();
}
@Override
public long getExpire() {
return 0;
}
@Override
public void lockError() {
/*synchronized(mutex){
mutex.notify();
}*/
lockcount.countDown();
}
},observer);
}
public boolean incRef(){
synchronized(refLock){
if(destroyed) return false;
ref ++;
}
return true;
}
public void descrRef(){
synchronized(refLock){
ref --;
}
}
public boolean clear() {
if(destroyed) return true;
synchronized(refLock){
if(ref > 0){
return false;
}
destroyed = true;
redisLock.clear();
redisLock = null;
return true;
}
}
public boolean lock(long timeout) throws LockNotExistsException{
if(timeout <= 0) timeout = this.timeout;
//incRef();
reentrantLock.lock();//多线程竞争时,先拿到第一层锁
if(redisLock == null){
reentrantLock.unlock();
//descrRef();
throw new LockNotExistsException();
}
try{
lockcount = new CountDownLatch(1);
boolean res = redisLock.trylock(timeout);
if(!res){
lockcount.await(timeout, TimeUnit.SECONDS);
//mutex.wait(timeout*1000);
if(!redisLock.doExpire()){
reentrantLock.unlock();
return false;
}
}
return true;
}catch(InterruptedException e){
reentrantLock.unlock();
return false;
}
}
public boolean lock() throws LockNotExistsException {
return lock(timeout);
}
public boolean unlock(){
if(!isOwner(true)) {
try{
throw new RuntimeException("big ================================================ error.key:"+key);
}catch(Exception e){
logger.error("err:"+e,e);
}
return false;
}
try{
redisLock.unlock();
reentrantLock.unlock();//多线程竞争时,释放最外层锁
}catch(RuntimeException e){
reentrantLock.unlock();//多线程竞争时,释放最外层锁
throw e;
}finally{
descrRef();
}
return canDestroy();
}
public boolean canDestroy(){
synchronized(refLock){
return ref <= 0;
}
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public boolean isOwner(boolean check) {
synchronized(refLock){
if(redisLock == null) {
logger.error("reidsLock is null:key="+key);
return false;
}
boolean a = reentrantLock.isHeldByCurrentThread();
boolean b = redisLock.isOwner();
if(check){
if(!a || !b){
logger.error(key+";a:"+a+";b:"+b);
}
}
return a && b;
}
}
public boolean setCleartime() {
synchronized(this){
if(cleartime>0) return false;
this.cleartime = System.currentTimeMillis() + 10*1000;
return true;
}
}
public void resetCleartime(){
synchronized(this){
this.cleartime = -1;
}
}
@Override
public int compareTo(Delayed object) {
if(object instanceof RedisReentrantLock){
RedisReentrantLock t = (RedisReentrantLock)object;
long l = this.cleartime - t.cleartime;
if(l > 0) return 1 ; //比当前的小则返回1,比当前的大则返回-1,否则为0
else if(l < 0 ) return -1;
else return 0;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return d;
}
}
/***使用Redis实现的分布式锁*基本工作原理如下:*1. 使用setnx(key,时间戮+超时),如果设置成功,则直接拿到锁*2. 如果设置不成功,获取key的值v1(它的到期时间戮),跟当前时间对比,看是否已经超时*3. 如果超时(说明拿到锁的结点已经挂掉),v2=getset(key,时间戮+超时+1),判断v2是否等于v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试(200MS)*/public class RedisLock implements LockListener{ private String key; private boolean owner = false; private AbstractLockObserver observer = null; private LockListener lockListener = null; private boolean waiting = false; private long expire;//锁超时时间,以秒为单位 private boolean expired = false; public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) { this.key = key; this.lockListener = lockListener; this.observer = observer; } public boolean trylock(long expire) { synchronized(this){ if(owner){ return true; } this.expire = expire; this.expired = false; if(!waiting){ owner = observer.tryLock(key,expire); if(!owner){ waiting = true; observer.addLockListener(key, this); } } return owner; } } public boolean isOwner() { return owner; } public void unlock() { synchronized(this){ observer.unLock(key); owner = false; } } public void clear() { synchronized(this){ if(waiting) { observer.removeLockListener(key); waiting = false; } } } public boolean doExpire(){ synchronized(this){ if(owner) return true; if(expired) return false; expired = true; clear(); } return false; } @Override public void lockAcquired() { synchronized(this){ if(expired){ unlock(); return; } owner = true; waiting = false; } lockListener.lockAcquired(); } @Override public long getExpire() { return this.expire; } @Override public void lockError() { synchronized(this){ owner = false; waiting = false; lockListener.lockError(); } }}
public class LockObserver extends AbstractLockObserver implements Runnable{ private CacheRedisClient client; private Object mutex = new Object(); private MaplockMap = new ConcurrentHashMap(); private boolean stoped = false; private long interval = 500; private boolean terminated = false; private CountDownLatch doneSignal = new CountDownLatch(1); public LockObserver(String schema){ client = new CacheRedisClient(schema); SystemExitListener.addTerminateListener(new ExitHandler(){ public void run() { stoped = true; try { doneSignal.await(); } catch (InterruptedException e) { } } }); } public void addLockListener(String key,LockListener listener){ if(terminated){ listener.lockError(); return; } synchronized(mutex){ lockMap.put(key, listener); } } public void removeLockListener(String key){ synchronized(mutex){ lockMap.remove(key); } } @Override public void run() { while(!terminated){ long p1 = System.currentTimeMillis(); Map clone = new HashMap(); synchronized(mutex){ clone.putAll(lockMap); } Set keyset = clone.keySet(); if(keyset.size() > 0){ ConnectionFactory.setSingleConnectionPerThread(keyset.size()); for(String key : keyset){ LockListener ll = clone.get(key); try{ if(tryLock(key,ll.getExpire())) { ll.lockAcquired(); removeLockListener(key); } }catch(Exception e){ ll.lockError(); removeLockListener(key); } } ConnectionFactory.releaseThreadConnection(); }else{ if(stoped){ terminated = true; doneSignal.countDown(); return; } } try { long p2 = System.currentTimeMillis(); long cost = p2 - p1; if(cost <= interval){ Thread.sleep(interval - cost); }else{ Thread.sleep(interval*2); } } catch (InterruptedException e) { } } } /** * 超时时间单位为s!!! * @param key * @param expire * @return */ public boolean tryLock(final String key,final long expireInSecond){ if(terminated) return false; final long tt = System.currentTimeMillis(); final long expire = expireInSecond * 1000; final Long ne = tt + expire; List
使用本方案实现的分布式锁,可以完美地解决锁重入问题;通过引入超时也避免了死锁问题;性能方面,笔者自测试结果如下:
500线程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
线程总时间:6553466;平均:13.106932
实际总时间:13609; 平均:0.027218
TPS达到35000,比方案1强了整整一个数量级
"基于redis的分布式锁怎么实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
时间
分布式
线程
成功
竞争
内容
单位
外层
实际
更多
知识
问题
实用
学有所成
接下来
不用
再次
原理
困境
工厂
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
丛台区网络安全和信息化委员会
服务器安全攻击案例
公众号软件开发服务费用
数据库查询相同字段的语句
怎么获得steam服务器
阿里云服务器如何打印
nas服务器做移动硬盘
佳能 打印服务器
vbn数据库
服务器如何查询电源型号
数据库前沿开发技术
工业数据库安全标准
细菌泛基因组数据库
数据库数据用web柱状图
泰安网络安全等级保护测评采购
青铜峡app软件开发
软件开发与设计先学什么条件
两个服务器可以跨服吗
金蝶服务器显示不了加密狗
阿里云服务器主机有什么用
路北区电子网络技术不二之选
网络安全学院难度
redis 游戏服务器
腾讯云服务器域名绑定
网络技术自主创新
网络安全法的几个层面
常用的数据库系统
网络安全四个方面特性
数据库实训项目经历描述
腾讯云数据库mysql费用