千家信息网

Springboot整合Redis如何实现超卖问题

发表于:2025-01-21 作者:千家信息网编辑
千家信息网最后更新 2025年01月21日,这篇文章将为大家详细讲解有关Springboot整合Redis如何实现超卖问题,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。超卖简单代码写一段简单正常的超卖逻辑代码
千家信息网最后更新 2025年01月21日Springboot整合Redis如何实现超卖问题

这篇文章将为大家详细讲解有关Springboot整合Redis如何实现超卖问题,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

    超卖简单代码

    写一段简单正常的超卖逻辑代码,多个用户同时操作同一段数据,探究出现的问题。

    Redis中存储一项数据信息,请求对应接口,获取商品数量信息;
    商品数量信息如果大于0,则扣减1,重新存储Redis中;
    运行代码测试问题。

    /** * Redis数据库操作,超卖问题模拟 * @author  * */@RestControllerpublic class RedisController {                // 引入String类型redis操作模板        @Autowired        private StringRedisTemplate stringRedisTemplate;          // 测试数据设置接口        @RequestMapping("/setStock")        public String setStock() {                stringRedisTemplate.opsForValue().set("stock", "100");                return "ok";        }                // 模拟商品超卖代码        @RequestMapping("/deductStock")        public String deductStock() {                // 获取Redis数据库中的商品数量                Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));                // 减库存                if(stock > 0) {                        int realStock = stock -1;                        stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));                        System.out.println("商品扣减成功,剩余商品:"+realStock);                }else {                        System.out.println("库存不足.....");                }                return "end";        }}

    超卖问题

    单服务器单应用情况下

    在单应用模式下,使用jmeter压测。

    测试结果:

    每个请求相当于一个线程,当几个线程同时拿到数据时,线程A拿到库存为84,这个时候线程B也进入程序,并且抢占了CPU,访问库存为84,最后两个线程都对库存减一,导致最后修改为83,实际上多卖出去了一件

    既然线程和线程之间,数据处理不一致,能否使用synchronized加锁测试?

    设置synchronized

    依旧还是先测试单服务器

    // 模拟商品超卖代码,        // 设置synchronized同步锁        @RequestMapping("/deductStock1")        public String deductStock1() {                synchronized (this) {                        // 获取Redis数据库中的商品数量                        Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));                        // 减库存                        if(stock > 0) {                                int realStock = stock -1;                                stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));                                System.out.println("商品扣减成功,剩余商品:"+realStock);                        }else {                                System.out.println("库存不足.....");                        }                }                return "end";        }

    数量100

    重新压测,得到的日志信息如下所示:

    在单机模式下,添加synchronized关键字,的确能够避免商品的超卖现象!

    但是在分布式微服务中,针对该服务设置了集群,synchronized依旧还能保证数据的正确性吗?

    假设多个请求,被注册中心负载均衡,每个微服务中的该处理接口,都添加有synchronized,

    依然会出现类似的超卖问题:

    synchronized只是针对单一服务器JVM进行加锁,但是分布式是很多个不同的服务器,导致两个线程或多个在不同服务器上共同对商品数量信息做了操作!


    Redis实现分布式锁

    在Redis中存在一条命令setnx (set if not exists)

    setnx key value
    如果不存在key,则可以设置成功;否则设置失败。

    修改处理接口,增加key

    // 模拟商品超卖代码        @RequestMapping("/deductStock2")        public String deductStock2() {                // 创建一个key,保存至redis                String key = "lock";                // setnx                // 由于redis是一个单线程,执行命令采取"队列"形式排队!                // 优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败。                boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");                // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false                if (!result) {                        // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!                        return "err";                }                                // 获取Redis数据库中的商品数量                Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));                // 减库存                if(stock > 0) {                        int realStock = stock -1;                        stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));                        System.out.println("商品扣减成功,剩余商品:"+realStock);                }else {                        System.out.println("库存不足.....");                }         // 程序执行完成,则删除这个key                stringRedisTemplate.delete(key);                 return "end";        }

    1、请求进入接口中,如果redis中不存在key,则会新建一个setnx;如果存在,则不会新建,同时返回错误编码,不会继续执行抢购逻辑。
    2、当创建成功后,执行抢购逻辑。
    3、抢购逻辑执行完成后,删除数据库中对应的setnxkey。让其他请求能够设置并操作。

    这种逻辑来说比之前单一使用syn合理的多,但是如果执行抢购操作中出现了异常,导致这个key无法被删除。以至于其他处理请求,一直无法拿到key,程序逻辑死锁!

    可以采取try … finally进行操作

    /**         * 模拟商品超卖代码 设置         *         * @return         */        @RequestMapping("/deductStock3")        public String deductStock3() {                // 创建一个key,保存至redis                String key = "lock";                // setnx                // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败                boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock");                // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false                if (!result) {                        // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示!                        return "err";                }                 try {                        // 获取Redis数据库中的商品数量                        Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));                        // 减库存                        if (stock > 0) {                                int realStock = stock - 1;                                stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));                                System.out.println("商品扣减成功,剩余商品:" + realStock);                        } else {                                System.out.println("库存不足.....");                        }                } finally {                        // 程序执行完成,则删除这个key                        // 放置于finally中,保证即使上述逻辑出问题,也能del掉                        stringRedisTemplate.delete(key);                }                 return "end";        }

    这个逻辑相比上面其他的逻辑来说,显得更加的严谨。

    但是,如果一套服务器,因为断电、系统崩溃等原因出现宕机,导致本该执行finally中的语句未成功执行完成!!同样出现key一直存在,导致死锁

    通过超时间解决上述问题

    在设置成功setnx后,以及抢购代码逻辑执行前,增加key的限时。

    /**         * 模拟商品超卖代码 设置setnx保证分布式环境下,数据处理安全行问题;
    * 但如果某个代码段执行异常,导致key无法清理,出现死锁,添加try...finally;
    * 如果某个服务因某些问题导致释放key不能执行,导致死锁,此时解决思路为:增加key的有效时间;
    * 为了保证设置key的值和设置key的有效时间,两条命令构成同一条原子命令,将下列逻辑换成其他代码。 * * @return */ @RequestMapping("/deductStock4") public String deductStock4() { // 创建一个key,保存至redis String key = "lock"; // setnx // 由于redis是一个单线程,执行命令采取队列形式排队!优先进入队列的命令先执行,由于是setnx,第一个执行后,其他操作执行失败 //boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock"); //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, "this is lock", 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } // 设置key有效时间 //stringRedisTemplate.expire(key, 10, TimeUnit.SECONDS); try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 stringRedisTemplate.delete(key); } return "end"; }

    但是上述代码的逻辑中依旧会有问题:

    如果处理逻辑中,出现超时问题。
    当逻辑执行时,时间超过设定key有效时间,此时会出现什么问题?

    从上图可以清楚的发现问题:
    如果一个请求执行时间超过了key的有效时间。
    新的请求执行过来时,必然可以拿到key并设置时间;
    此时的redis中保存的key并不是请求1的key,而是别的请求设置的。
    当请求1执行完成后,此处删除key,删除的是别的请求设置的key!

    依然出现了key形同虚设的问题!如果失效一直存在,超卖问题依旧不会解决。

    通过key设置值匹配的方式解决形同虚设问题

    既然出现key形同虚设的现象,是否可以增加条件,当finally中需要执行删除操作时,获取数据判断值是否是该请求中对应的,如果是则删除,不是则不管!

    修改上述代码如下所示:

    /**         * 模拟商品超卖代码 
    * 解决`deductStock6`中,key形同虚设的问题。 * * @return */ @RequestMapping("/deductStock5") public String deductStock5() { // 创建一个key,保存至redis String key = "lock"; String lock_value = UUID.randomUUID().toString(); // setnx //让设置key和设置key的有效时间都可以同时执行 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key, lock_value, 10, TimeUnit.SECONDS); // 当不存在key时,可以设置成功,回执true;如果存在key,则无法设置,返回false if (!result) { // 前端监测,redis中存在,则不能让这个抢购操作执行,予以提示! return "err"; } try { // 获取Redis数据库中的商品数量 Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // 减库存 if (stock > 0) { int realStock = stock - 1; stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock)); System.out.println("商品扣减成功,剩余商品:" + realStock); } else { System.out.println("库存不足....."); } } finally { // 程序执行完成,则删除这个key // 放置于finally中,保证即使上述逻辑出问题,也能del掉 // 判断redis中该数据是否是这个接口处理时的设置的,如果是则删除 if(lock_value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(key))) { stringRedisTemplate.delete(key); } } return "end"; }

    由于获得锁的线程必须执行完减库存逻辑才能释放锁,所以在此期间所有其他的线程都会由于没获得锁,而直接结束程序,导致有很多库存根本没有卖出去,所以这里应该可以优化,让没获得锁的线程等待,或者循环检查锁


    最终版

    我们将锁封装到一个实体类中,然后加入两个方法,加锁和解锁

    @Componentpublic class RedisLock {    private final Logger log = LoggerFactory.getLogger(this.getClass());     private final long acquireTimeout = 10*1000;    // 获取锁之前的超时时间(获取锁的等待重试时间)    private final int timeOut = 20;   // 获取锁之后的超时时间(防止死锁)     @Autowired    private StringRedisTemplate stringRedisTemplate;  // 引入String类型redis操作模板     /**     * 获取分布式锁     * @return 锁标识     */    public boolean getRedisLock(String lockName,String lockValue) {        // 1.计算获取锁的时间        Long endTime = System.currentTimeMillis() + acquireTimeout;        // 2.尝试获取锁        while (System.currentTimeMillis() < endTime) {            //3. 获取锁成功就设置过期时间 让设置key和设置key的有效时间都可以同时执行            boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockName, lockValue, timeOut, TimeUnit.SECONDS);            if (result) {                return true;            }        }        return false;    }      /**     * 释放分布式锁     * @param lockName 锁名称     * @param lockValue 锁值     */    public void unRedisLock(String lockName,String lockValue) {        if(lockValue.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(lockName))) {            stringRedisTemplate.delete(lockName);        }    }}
    @RestControllerpublic class RedisController {                // 引入String类型redis操作模板        @Autowired        private StringRedisTemplate stringRedisTemplate;        @Autowired        private RedisLock redisLock;          @RequestMapping("/setStock")        public String setStock() {                stringRedisTemplate.opsForValue().set("stock", "100");                return "ok";        }         @RequestMapping("/deductStock")        public String deductStock() {                // 创建一个key,保存至redis                String key = "lock";                String lock_value = UUID.randomUUID().toString();                try {                        boolean redisLock = this.redisLock.getRedisLock(key, lock_value);//获取锁                        if (redisLock)                        {                                // 获取Redis数据库中的商品数量                                Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));                                // 减库存                                if (stock > 0) {                                        int realStock = stock - 1;                                        stringRedisTemplate.opsForValue().set("stock", String.valueOf(realStock));                                        System.out.println("商品扣减成功,剩余商品:" + realStock);                                } else {                                        System.out.println("库存不足.....");                                }                        }                } finally {                        redisLock.unRedisLock(key,lock_value);   //释放锁                }                return "end";        }}

    可以看到失败的线程不会直接结束,而是会尝试重试,一直到重试结束时间,才会结束


    实际上这个最终版依然存在3个问题

    1、在finally流程中,由于是先判断在处理。如果判断条件结束后,获取到的结果为true。但是在执行del操作前,此时jvm在执行GC操作(为了保证GC操作获取GC roots根完全,会暂停java程序),导致程序暂停。GC操作执行完成后(暂停恢复后),执行del操作,但是此时的key还在当前加锁的key么?

    2、问题如图所示

    关于"Springboot整合Redis如何实现超卖问题"这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。

    0