基于redis的分布式锁怎么实现
发表于:2025-02-24 作者:千家信息网编辑
千家信息网最后更新 2025年02月24日,本篇内容介绍了"基于redis的分布式锁怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!基于r
千家信息网最后更新 2025年02月24日基于redis的分布式锁怎么实现
本篇内容介绍了"基于redis的分布式锁怎么实现"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
基于redis的分布式锁
/***分布式锁工厂类*/public class RedisLockUtil { private static final Logger logger = Logger.getLogger(RedisLockUtil.class); private static Object schemeLock = new Object(); private static Mapinstances = new ConcurrentHashMap(); public static RedisLockUtil getInstance(String schema){ RedisLockUtil u = instances.get(schema); if(u==null){ synchronized(schemeLock){ u = instances.get(schema); if(u == null){ LockObserver lo = new LockObserver(schema); u = new RedisLockUtil(schema,lo); instances.put(schema, u); } } } return u; } private Object mutexLock = new Object(); private Map mutexLockMap = new ConcurrentHashMap(); private Map cache = new ConcurrentHashMap (); private DelayQueue dq = new DelayQueue (); private AbstractLockObserver lo; public RedisLockUtil(String schema, AbstractLockObserver lo){ Thread th = new Thread(lo); th.setDaemon(false); th.setName("Lock Observer:"+schema); th.start(); clearUselessLocks(schema); this.lo = lo; } public void clearUselessLocks(String schema){ Thread th = new Thread(new Runnable(){ @Override public void run() { while(!SystemExitListener.isOver()){ try { RedisReentrantLock t = dq.take(); if(t.clear()){ String key = t.getKey(); synchronized(getMutex(key)){ cache.remove(key); } } t.resetCleartime(); } catch (InterruptedException e) { } } } }); th.setDaemon(true); th.setName("Lock cleaner:"+schema); th.start(); } private Object getMutex(String key){ Object mx = mutexLockMap.get(key); if(mx == null){ synchronized(mutexLock){ mx = mutexLockMap.get(key); if(mx==null){ mx = new Object(); mutexLockMap.put(key,mx); } } } return mx; } private RedisReentrantLock getLock(String key,boolean addref){ RedisReentrantLock lock = cache.get(key); if(lock == null){ synchronized(getMutex(key)){ lock = cache.get(key); if(lock == null){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } if(addref){ if(!lock.incRef()){ synchronized(getMutex(key)){ lock = cache.get(key); if(!lock.incRef()){ lock = new RedisReentrantLock(key,lo); cache.put(key, lock); } } } } return lock; } public void reset(){ for(String s : cache.keySet()){ getLock(s,false).unlock(); } } /** * 尝试加锁 * 如果当前线程已经拥有该锁的话,直接返回,表示不用再次加锁,此时不应该再调用unlock进行解锁 * * @param key * @return * @throws Exception * @throws InterruptedException * @throws KeeperException */ public LockStat lock(String key) { return lock(key,-1); } public LockStat lock(String key,int timeout) { RedisReentrantLock ll = getLock(key,true); ll.incRef(); try{ if(ll.isOwner(false)){ ll.descrRef(); return LockStat.NONEED; } if(ll.lock(timeout)){ return LockStat.SUCCESS; }else{ ll.descrRef(); if(ll.setCleartime()){ dq.put(ll); } return null; } }catch(LockNotExistsException e){ ll.descrRef(); return lock(key,timeout); }catch(RuntimeException e){ ll.descrRef(); throw e; } } public void unlock(String key,LockStat stat) { unlock(key,stat,false); } public void unlock(String key,LockStat stat,boolean keepalive){ if(stat == null) return; if(LockStat.SUCCESS.equals(stat)){ RedisReentrantLock lock = getLock(key,false); boolean candestroy = lock.unlock(); if(candestroy && !keepalive){ if(lock.setCleartime()){ dq.put(lock); } } } } public static enum LockStat{ NONEED, SUCCESS }}
/**
*分布式锁本地代理类
*/
public class RedisReentrantLock implements Delayed{
private static final Logger logger = Logger.getLogger(RedisReentrantLock.class);
private ReentrantLock reentrantLock = new ReentrantLock();
private RedisLock redisLock;
private long timeout = 3*60;
private CountDownLatch lockcount = new CountDownLatch(1);
private String key;
private AbstractLockObserver observer;
private int ref = 0;
private Object refLock = new Object();
private boolean destroyed = false;
private long cleartime = -1;
public RedisReentrantLock(String key,AbstractLockObserver observer) {
this.key = key;
this.observer = observer;
initWriteLock();
}
public boolean isDestroyed() {
return destroyed;
}
private synchronized void initWriteLock(){
redisLock = new RedisLock(key,new LockListener(){
@Override
public void lockAcquired() {
lockcount.countDown();
}
@Override
public long getExpire() {
return 0;
}
@Override
public void lockError() {
/*synchronized(mutex){
mutex.notify();
}*/
lockcount.countDown();
}
},observer);
}
public boolean incRef(){
synchronized(refLock){
if(destroyed) return false;
ref ++;
}
return true;
}
public void descrRef(){
synchronized(refLock){
ref --;
}
}
public boolean clear() {
if(destroyed) return true;
synchronized(refLock){
if(ref > 0){
return false;
}
destroyed = true;
redisLock.clear();
redisLock = null;
return true;
}
}
public boolean lock(long timeout) throws LockNotExistsException{
if(timeout <= 0) timeout = this.timeout;
//incRef();
reentrantLock.lock();//多线程竞争时,先拿到第一层锁
if(redisLock == null){
reentrantLock.unlock();
//descrRef();
throw new LockNotExistsException();
}
try{
lockcount = new CountDownLatch(1);
boolean res = redisLock.trylock(timeout);
if(!res){
lockcount.await(timeout, TimeUnit.SECONDS);
//mutex.wait(timeout*1000);
if(!redisLock.doExpire()){
reentrantLock.unlock();
return false;
}
}
return true;
}catch(InterruptedException e){
reentrantLock.unlock();
return false;
}
}
public boolean lock() throws LockNotExistsException {
return lock(timeout);
}
public boolean unlock(){
if(!isOwner(true)) {
try{
throw new RuntimeException("big ================================================ error.key:"+key);
}catch(Exception e){
logger.error("err:"+e,e);
}
return false;
}
try{
redisLock.unlock();
reentrantLock.unlock();//多线程竞争时,释放最外层锁
}catch(RuntimeException e){
reentrantLock.unlock();//多线程竞争时,释放最外层锁
throw e;
}finally{
descrRef();
}
return canDestroy();
}
public boolean canDestroy(){
synchronized(refLock){
return ref <= 0;
}
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public boolean isOwner(boolean check) {
synchronized(refLock){
if(redisLock == null) {
logger.error("reidsLock is null:key="+key);
return false;
}
boolean a = reentrantLock.isHeldByCurrentThread();
boolean b = redisLock.isOwner();
if(check){
if(!a || !b){
logger.error(key+";a:"+a+";b:"+b);
}
}
return a && b;
}
}
public boolean setCleartime() {
synchronized(this){
if(cleartime>0) return false;
this.cleartime = System.currentTimeMillis() + 10*1000;
return true;
}
}
public void resetCleartime(){
synchronized(this){
this.cleartime = -1;
}
}
@Override
public int compareTo(Delayed object) {
if(object instanceof RedisReentrantLock){
RedisReentrantLock t = (RedisReentrantLock)object;
long l = this.cleartime - t.cleartime;
if(l > 0) return 1 ; //比当前的小则返回1,比当前的大则返回-1,否则为0
else if(l < 0 ) return -1;
else return 0;
}
return 0;
}
@Override
public long getDelay(TimeUnit unit) {
long d = unit.convert(cleartime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return d;
}
}
/***使用Redis实现的分布式锁*基本工作原理如下:*1. 使用setnx(key,时间戮+超时),如果设置成功,则直接拿到锁*2. 如果设置不成功,获取key的值v1(它的到期时间戮),跟当前时间对比,看是否已经超时*3. 如果超时(说明拿到锁的结点已经挂掉),v2=getset(key,时间戮+超时+1),判断v2是否等于v1,如果相等,加锁成功,否则加锁失败,等过段时间再重试(200MS)*/public class RedisLock implements LockListener{ private String key; private boolean owner = false; private AbstractLockObserver observer = null; private LockListener lockListener = null; private boolean waiting = false; private long expire;//锁超时时间,以秒为单位 private boolean expired = false; public RedisLock(String key, LockListener lockListener, AbstractLockObserver observer) { this.key = key; this.lockListener = lockListener; this.observer = observer; } public boolean trylock(long expire) { synchronized(this){ if(owner){ return true; } this.expire = expire; this.expired = false; if(!waiting){ owner = observer.tryLock(key,expire); if(!owner){ waiting = true; observer.addLockListener(key, this); } } return owner; } } public boolean isOwner() { return owner; } public void unlock() { synchronized(this){ observer.unLock(key); owner = false; } } public void clear() { synchronized(this){ if(waiting) { observer.removeLockListener(key); waiting = false; } } } public boolean doExpire(){ synchronized(this){ if(owner) return true; if(expired) return false; expired = true; clear(); } return false; } @Override public void lockAcquired() { synchronized(this){ if(expired){ unlock(); return; } owner = true; waiting = false; } lockListener.lockAcquired(); } @Override public long getExpire() { return this.expire; } @Override public void lockError() { synchronized(this){ owner = false; waiting = false; lockListener.lockError(); } }}
public class LockObserver extends AbstractLockObserver implements Runnable{ private CacheRedisClient client; private Object mutex = new Object(); private MaplockMap = new ConcurrentHashMap(); private boolean stoped = false; private long interval = 500; private boolean terminated = false; private CountDownLatch doneSignal = new CountDownLatch(1); public LockObserver(String schema){ client = new CacheRedisClient(schema); SystemExitListener.addTerminateListener(new ExitHandler(){ public void run() { stoped = true; try { doneSignal.await(); } catch (InterruptedException e) { } } }); } public void addLockListener(String key,LockListener listener){ if(terminated){ listener.lockError(); return; } synchronized(mutex){ lockMap.put(key, listener); } } public void removeLockListener(String key){ synchronized(mutex){ lockMap.remove(key); } } @Override public void run() { while(!terminated){ long p1 = System.currentTimeMillis(); Map clone = new HashMap(); synchronized(mutex){ clone.putAll(lockMap); } Set keyset = clone.keySet(); if(keyset.size() > 0){ ConnectionFactory.setSingleConnectionPerThread(keyset.size()); for(String key : keyset){ LockListener ll = clone.get(key); try{ if(tryLock(key,ll.getExpire())) { ll.lockAcquired(); removeLockListener(key); } }catch(Exception e){ ll.lockError(); removeLockListener(key); } } ConnectionFactory.releaseThreadConnection(); }else{ if(stoped){ terminated = true; doneSignal.countDown(); return; } } try { long p2 = System.currentTimeMillis(); long cost = p2 - p1; if(cost <= interval){ Thread.sleep(interval - cost); }else{ Thread.sleep(interval*2); } } catch (InterruptedException e) { } } } /** * 超时时间单位为s!!! * @param key * @param expire * @return */ public boolean tryLock(final String key,final long expireInSecond){ if(terminated) return false; final long tt = System.currentTimeMillis(); final long expire = expireInSecond * 1000; final Long ne = tt + expire; List
使用本方案实现的分布式锁,可以完美地解决锁重入问题;通过引入超时也避免了死锁问题;性能方面,笔者自测试结果如下:
500线程 tps = 35000
[root@DB1 benchtest-util]# target/benchtest/bin/TestFastRedis /data/config/util/config_0_11.properties lock 500 500000
线程总时间:6553466;平均:13.106932
实际总时间:13609; 平均:0.027218
TPS达到35000,比方案1强了整整一个数量级
"基于redis的分布式锁怎么实现"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!
时间
分布式
线程
成功
竞争
内容
单位
外层
实际
更多
知识
问题
实用
学有所成
接下来
不用
再次
原理
困境
工厂
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
任子行网络安全问董秘
手持频谱分析仪软件开发
php服务器教学视频
在什么部门申请软件开发
网络安全法是哪年哪月开启
数据库安全性实验答案
网络安全周首届
用简单数据库做的游戏
溪水源泉网络技术公司
xml中修改数据库
网络安全法 十大点
为什么维护网络安全
唯乐官方服务器关了吗
车载网络技术的定义是什么
默认服务器sptaki不可用
斐讯路由器k2显示dns服务器
吉林市网络安全知识
.小吉互联网科技有限公司
挖钻石服务器视频
数据库表连续编号
数据库创建日期的代码
怎么学高超的网络技术
怎样通过网页访问服务器软件
崇明区口碑好的数据库服务电话
网络安全团队破晓
电脑服务器正在加载就卡住了
网络安全周首届
潍城软件开发
前置机数据库中的推送
联行号mysql数据库