问题:
如何在秒杀环境下保证库存不卖超?
如何在保证整点开抢的优惠券能不多不少的发放完毕?
解决方案:
1. 使用MySQL中的库存字段进行扣减。在高并发的情况下由于业务代码的处理延迟,可能导致卖超,如果数据库还有读写分离,那就更加灾难了。
2. 将库存数据读取到内存中加锁进行扣减。在单机器部署的情况下可以很完美的解决问题,但是在分布式系统中,每个独立实例的扣减最终汇总很难完美统一。
3. 利用Redis的单线程特性集中扣减。可以很好的解决分布式系统中集中扣减的问题,但也是由于单线程的原因,单核CPU始终有极限,并发超过一定阈值的时候,由于串行计算的原因,前端需要等待很长时间才能拿到结果。
本文将详细讨论在Redis中进行库存扣减的实践操作。
如果像优惠券这种场景,每次只能领用一张,直接使用decrby对剩余数量减一即可,如果剩余数量小于0,说明领完了,也就领取失败。
直接上代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | // DecreaseStock 根据优惠券ID和需要扣减的库存数量进行扣减并返回是否成功扣减和剩余可发放优惠券数量 func DecreaseStock(couponID string, decreaseNum int64) (bool, int64) { remainCountKey := CouponRemainCountRedisKey(couponID) remainCount, err := redisClient.DecrBy(context.Background(), remainCountKey, decreaseNum).Result() if err != nil { fmt.Println(err.Error()) return false, 0 } if remainCount < 0 { return false, remainCount } return true, remainCount } // InitCouponRemain 在开始发放之前将优惠券余量放入Redis中 func InitCouponRemain(couponID string, remainCount int64) (bool, error) { remainCountKey := CouponRemainCountRedisKey(couponID) // 此处模拟优惠券仅领取二十四小时 _, err := redisClient.Set(context.Background(), remainCountKey, remainCount, time.Hour*24).Result() if err != nil { return false, err } return true, nil } // CouponRemainCountRedisKey 组装缓存key func CouponRemainCountRedisKey(couponID string) string { return "CouponRemainCount_" + couponID } |
初始化Redis连接和数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | func init() { connectRedis() ok, err := InitCouponRemain("3", 1000) if !ok || err != nil { fmt.Println(ok, err) } } var redisClient *redis.Client var redisClientOnce = sync.Once{} func connectRedis() *redis.Client { redisClientOnce.Do(func() { dbHost, _ := web.AppConfig.String("aliyun.redis.endpoint") dbPort, _ := web.AppConfig.String("aliyun.redis.port") dbPassword, _ := web.AppConfig.String("aliyun.redis.password") dbNum, _ := web.AppConfig.Int("aliyun.redis.dbnum") redisClient = redis.NewClient(&redis.Options{ Addr: dbHost + ":" + dbPort, Password: dbPassword, DB: dbNum, ReadTimeout: 60 * time.Second, WriteTimeout: 60 * time.Second, }) }) return redisClient } |
实际调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 | // TestStockDecrease 测试减少库存 func TestStockDecrease() { for i := 0; i < 2000; i++ { go func() { ok, remainCount := DecreaseStock("3", 1) if ok { fmt.Println("领取成功,剩余数量:", remainCount) } else { fmt.Println("领取失败,多余请求:", -remainCount) } }() } } |
当结果到0的时候刚好执行正常逻辑1000次,最终结果为-1000,也就是有1000个请求没有领到优惠券。
但是问题来了,这里是理想的每次只减一的使用场景,如果是卖货,同时可以卖三件,五件,100多件呢?
假设此时库存还剩80,但是有客户想买100件,按照上面的代码,非但这位客户的100件买不了,80个库存也会凭空消失,就会造成库存假卖完。
怎么解决呢?有没有办法判断原子的判断库存如果有,且大于当前需要的时候再减呢?
答案是有的,这里我们借助lua脚本来实现这个操作。
LUA脚本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | const STOCK_LUA_SCRIPT string = ` if (redis.call('exists', KEYS[1]) == 1) then local stock = tonumber(redis.call('get', KEYS[1])); local num = tonumber(ARGV[1]); if (stock == -1) then return -1; end; if (stock >= num) then return redis.call('decrby', KEYS[1], num); end; return -2; end; return -3; ` |
这里我们约定返回规则:
大于等于0:剩余库存数
-1:无限库存
-2:库存不足
-3:没有找到库存key
适配后的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | // DecreaseStock 根据优惠券ID和需要扣减的库存数量进行扣减并返回是否成功扣减和剩余可发放优惠券数量 func DecreaseStock(couponID string, decreaseNum int64) (bool, int64) { remainCountKey := CouponRemainCountRedisKey(couponID) result, err := redisClient.Eval( context.Background(), STOCK_LUA_SCRIPT, []string{remainCountKey}, decreaseNum, ).Result() if err != nil { fmt.Println(err.Error()) return false, 0 } /* 大于等于0:剩余库存数 -1:无限库存 -2:库存不足 -3:没有找到库存key */ if remainCount, ok := result.(int64); ok { if remainCount > 0 { return true, remainCount } else if remainCount == 0 { return false, remainCount } else if remainCount == -1 { return true, remainCount } else if remainCount == -2 { return false, remainCount } else if remainCount == -3 { return false, remainCount } else { return false, 0 } } return false, 0 } |
调用代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | // TestStockDecrease 测试减少库存 func TestStockDecrease() { for i := 0; i < 1000; i++ { go func() { ok, remainCount := DecreaseStock("3", int64(rand.Int()%6)) // 随机减0-5的库存 if ok { fmt.Println("领取成功,剩余数量:", remainCount) } else { /* 大于等于0:剩余库存数 -1:无限库存 -2:库存不足 -3:没有找到库存key */ switch remainCount { case 0, -2: fmt.Println("库存不足") case -1: fmt.Println("无限库存") case -3: fmt.Println("没有找到库存key, 可能未初始化库存数据到Redis") default: fmt.Println("未知错误") } } }() } } |
截取最后一部分运行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | 库存不足 领取成功,剩余数量: 3 库存不足 库存不足 库存不足 领取成功,剩余数量: 14 领取成功,剩余数量: 8 库存不足 库存不足 领取成功,剩余数量: 13 领取成功,剩余数量: 18 领取成功,剩余数量: 14 领取成功,剩余数量: 27 领取成功,剩余数量: 23 库存不足 领取成功,剩余数量: 28 领取成功,剩余数量: 32 库存不足 库存不足 库存不足 领取成功,剩余数量: 38 领取成功,剩余数量: 32 库存不足 领取成功,剩余数量: 6 领取成功,剩余数量: 40 领取成功,剩余数量: 46 领取成功,剩余数量: 28 领取成功,剩余数量: 37 领取成功,剩余数量: 45 领取成功,剩余数量: 47 |
从运行结果可以看出,当库存够时,返回剩余库存数,库存不够抵扣时回返回-2,代表库存不足以减扣。
Redis配合LUA脚本非常好的保证了每次执行的原子性,在分布式系统内也不会出现库存扣减错误的问题;但在请求量巨大的情况下,由于单线程串行计算的原因,可能会导致前端等待很长时间。
执行一百万次试试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | // TestStockDecrease 测试减少库存 func TestStockDecrease() { wg := sync.WaitGroup{} startTime := time.Now().UnixMilli() for i := 0; i < 1000000; i++ { wg.Add(1) go func() { ok, remainCount := DecreaseStock("3", int64(rand.Int()%6)) // 随机减0-5的库存 if ok { fmt.Println("领取成功,剩余数量:", remainCount) } else { /* 大于等于0:剩余库存数 -1:无限库存 -2:库存不足 -3:没有找到库存key */ switch remainCount { case 0, -2: fmt.Println("库存不足") case -1: fmt.Println("无限库存") case -3: fmt.Println("没有找到库存key, 可能未初始化库存数据到Redis") default: fmt.Println("未知错误") } } wg.Done() }() } wg.Wait() fmt.Println("总体耗时:", time.Now().UnixMilli()-startTime, "ms") } |
运行结果:
1 | 总体耗时: 25980 ms |
从运行结果可知,执行一百万次库存扣减计算需要26秒左右,如果想要客户端在这个体量或者以上的量下能够快速返回,可以考虑开多个Redis实例,将库存分成多份,分别进行扣减。
1. 充分利用Redis的单线程特性和LUA脚本,可以实现原子的库存扣减,并且计算效率相当高。
2. 需要注意的是,Redis的单核计算在量特别大的时候等待时长会增加,可以考虑开多个实例,将库存分到不同的实例进行扣减后再汇总。
程序猿老龚(龚杰洪)原创,版权所有,转载请注明出处.