千家信息网

Redis怎么实现分布式锁和等待序列

发表于:2025-01-31 作者:千家信息网编辑
千家信息网最后更新 2025年01月31日,这篇文章主要介绍了Redis怎么实现分布式锁和等待序列,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。在集群下,经常会因为同时处理发生资
千家信息网最后更新 2025年01月31日Redis怎么实现分布式锁和等待序列

这篇文章主要介绍了Redis怎么实现分布式锁和等待序列,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 ReentrankLock 这些锁的作用范围都是 JVM ,说白了在集群下没啥用。这时我们就需要能在多台 JVM 之间决定执行顺序的锁了,现在分布式锁主要有 redis 、 Zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。

背景

最近在做一个消费 Kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 Redis 的实现方式(因为网上例子多)

分析

redis 实现的分布式锁,实现原理是 set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式

  • 丢弃

  • 等待重试 由于我们的系统需要这些数据,那么只能重新尝试获取。这里使用 redis 的 List 类型实现等待序列的作用

代码

直接上代码 其实直接redis的工具类就可以解决了

package com.testimport redis.clients.jedis.Jedis;import java.util.Collections;import java.util.List;/** * @desc redis队列实现方式 * @anthor  * @date  **/public class RedisUcUitl {  private static final String LOCK_SUCCESS = "OK";  private static final String SET_IF_NOT_EXIST = "NX";  private static final String SET_WITH_EXPIRE_TIME = "PX";  private static final Long RELEASE_SUCCESS = 1L;  private RedisUcUitl() {  }  /**   * logger   **/  /**   * 存储redis队列顺序存储 在队列首部存入   *   * @param key  字节类型   * @param value 字节类型   */  public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {    return jedis.lpush(key, value);    }  /**   * 移除列表中最后一个元素 并将改元素添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时   *   * @param srckey   * @param dstkey   * @param timeout 0 表示永不超时   * @return   */  public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {    return jedis.brpoplpush(srckey, dstkey, timeout);  }  /**   * 返回制定的key,起始位置的redis数据   * @param redisKey   * @param start   * @param end -1 表示到最后   * @return   */  public static List lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {        return jedis.lrange(redisKey, start, end);  }  /**   * 删除key   * @param redisKey   */  public static void delete(Jedis jedis, final byte[] redisKey) {         return jedis.del(redisKey);  }  /**   * 尝试加锁   * @param lockKey key名称   * @param requestId 身份标识   * @param expireTime 过期时间   * @return   */  public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);    return LOCK_SUCCESS.equals(result);  }  /**   * 释放锁   * @param lockKey key名称   * @param requestId 身份标识   * @return   */  public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {    final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";    jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));    return RELEASE_SUCCESS.equals(result);  }}

业务逻辑主要代码如下

1.先消耗队列中的

while(true){  // 消费队列  try{    // 被放入redis队列的数据 序列化后的    byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);    if(bytes == null || bytes.isEmpty()){      // 队列中没数据时退出      break;    }    // 反序列化对象    Map singleMap = (Map) ObjectSerialUtil.bytesToObject(bytes);    // 塞入唯一的值 防止被其他线程误解锁    String requestId = UUID.randomUUID().toString();    boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);    if(lockGetFlag){      // 成功获取锁 进行业务处理      //TODO      // 处理完毕释放锁       boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);    }else{      // 未能获得锁放入等待队列     RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));      }      }catch(Exception e){    break;  }  }

2.处理最新接到的数据

同样是走尝试获取锁,获取不到放入队列的流程

一般序列化用 fastJson 之列的就可以了,这里用的是 JDK 自带的,工具类如下

public class ObjectSerialUtil {  private ObjectSerialUtil() {//    工具类  }  /**   * 将Object对象序列化为byte[]   *   * @param obj 对象   * @return byte数组   * @throws Exception   */  public static byte[] objectToBytes(Object obj) throws IOException {    ByteArrayOutputStream bos = new ByteArrayOutputStream();    ObjectOutputStream oos = new ObjectOutputStream(bos);    oos.writeObject(obj);    byte[] bytes = bos.toByteArray();    bos.close();    oos.close();    return bytes;  }  /**   * 将bytes数组还原为对象   *   * @param bytes   * @return   * @throws Exception   */  public static Object bytesToObject(byte[] bytes) {    try {      ByteArrayInputStream bin = new ByteArrayInputStream(bytes);      ObjectInputStream ois = new ObjectInputStream(bin);      return ois.readObject();    } catch (Exception e) {      throw new BaseException("反序列化出错!", e);    }  }}

感谢你能够认真阅读完这篇文章,希望小编分享的"Redis怎么实现分布式锁和等待序列"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

队列 序列 数据 处理 分布式 同时 对象 方式 时候 篇文章 类型 线程 问题 成功 代码 工具 顺序 尝试 消费 业务 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 seer数据库列线图绘制 想做网络游戏服务器多少钱 恩牛网络技术实习生 软件开发外包项目网 华为服务器16核cpu多少钱 oneclick服务器推荐 苹果怎么解决网络安全问题 什么负责协调网络安全 GUI向数据库添加数据 成都第三方软件开发报价 网络安全 能源 眼镜行业软件开发的发展趋势 编完程序之后如何跑数据库 怎么跳过服务器防护狗 姑苏区运营网络技术咨询热线 电影票房数据库技术学校 服务器装数据库装哪个系统版本 公安网络安全管理工作 民生银行软件开发中心年工资总额 开机 服务器正在运行中 点灯猫软件开发 南京一鸣云搜网络技术差 网络安全防护体系建设要求 华为服务器16核cpu多少钱 金山区特殊软件开发 数据通信与ip网络技术实验操作 全文数据库的检索方法有哪些 单位网络安全工作领导小组 才正软件开发有限公司 带10台电脑的服务器要多少钱
0