坚信许多老师都听闻过分布式系统锁,但也前述上逗留在基本概念的认知上,这首诗会从分布式系统锁的应用领域情景说起,从与此同时实现的视角上广度探究redis怎样与此同时实现分布式系统锁。
一、超买难题
他们先上看超买的基本概念: 当小精灵库存量吻合0时,假如数个买主与此同时退款买回此小精灵,或是店面前台在架数目小于库房前述数目,Sonbhadra再次出现超买现像。超买现像其本质上是买好了比库房中数目更多的小精灵。
责任编辑主要就化解超买难题的第二种,与此同时王承恩买回小精灵时,导致超买。
试验标识符
所以超买难题是怎样造成的呢?他们预备几段标识符展开试验:
@Autowired private StringRedisTemplate stringRedisTemplate; /** * 第二种与此同时实现,民主化内就存有缓存安全可靠难题 * 能只开启两个民主化试验 */ @RequestMapping(“/deduct_stock1”) public void deductStock1(){ String stock = stringRedisTemplate.opsForValue().get(“stock”); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //增设库存量减1 int realStock = stockNum – 1; stringRedisTemplate.opsForValue().set(“stock”,realStock + “”); System.out.println(“增设库存量” + realStock); }else{ System.out.println(“库存量严重不足”); } }
这段标识符中,
String stock = stringRedisTemplate.opsForValue().get(“stock”); int stockNum = Integer.parseInt(stock);
接下去,推论库存量数与否小于0:
假如小于0,将库存量数减一,透过set指示,FECredis这儿没采用redis的decrement指示,即使此指示在redis单缓存数学模型下是缓存安全可靠的,而为的是能演示缓存不安全可靠的情形将其分作两步操作方式//增设库存量减1 int realStock = stockNum – 1; stringRedisTemplate.opsForValue().set(“stock”,realStock + “”); System.out.println(“增设库存量” + realStock);
假如小于等于0,提示库存量严重不足JMeter试验
透过JMeter展开并发试验,看下会不会再次出现超买的难题:
1.开启tomcat
这种情形下,只需要开启两个tomcat就会再次出现超买。他们先开启两个tomcat在8080端口上。
2.下载JMeter
Apache JMeter是Apache组织开发的基于Java的压力试验工具。 从官网上下载即可: https://jmeter.apache.org/download_jmeter.cgi 下载完之后解压,运行bin目录下的jmeter.bat,显示如下界面:
假如嫌字体太小,能选择放大:
3.配置JMeter
在Test Plan上点击右键,创建缓存组(Thread Group)
配置一下具体参数:
Number of Threads 与此同时并发缓存数Ramp-Up Period(in-seconds) 代表隔多长时间执行,0代表与此同时并发。假设缓存数为100, 估计的点击率为每秒10次, 所以估计的理想ramp-up period 是 100/10 = 10 秒Loop Count 循环次数这儿给出500是为的是直接试验并发500抢,看看能不能正好把500个货物抢完。
添加Http请求:
添加请求URL:
添加聚合结果,用来显示整体的运行情形:
到此为止JMeter的配置结束。
4.增设库存量量
开启redis-server,采用redis-client连接:
把库存量数增设为500。
5.开始试验
点击运行按钮,开启试验:
首先他们看到聚合报告里输出的结果:
错误率0%,样本数500,证明500个请求都已经执行,但是发现控制台输出如下:
很显然,一份商品都被卖了多次,这显然是不合理的。
原因分析
现在他们只开启了两个tomcat,在单jvm民主化的情形下,tomcat会采用缓存池接收请求:
显示的都是500,然后两个缓存就继续展开扣减库存量操作方式,得出499FECredis中,在这个过程中,显然存有缓存安全可靠的难题。同两个商品被卖出了2份,超买难题就再次出现了。
二、加锁优化
synchronized锁
要保证单jvm中缓存安全可靠,最简单直接的方式是添加synchronized关键字,所以这样行不行呢,他们来做两个试验:
/** * 第二种与此同时实现,采用synchronized加锁 * 能只开启两个民主化测试 */ @RequestMapping(“/deduct_stock2”) public void deductStock2(){ synchronized (this){ String stock = stringRedisTemplate.opsForValue().get(“stock”); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //增设库存量减1 int realStock = stockNum – 1; stringRedisTemplate.opsForValue().set(“stock”,realStock + “”); System.out.println(“增设库存量” + realStock); }else{ System.out.println(“库存量严重不足”); } } }
在展开扣减库存量前,先透过synchronized关键字,对资源加锁,这样就只有两个缓存能进入到扣减库存量的标识符块中。来试验一下:
重置库存量
set stock 500
修改接口地址
试验
能看到,库存量被扣减为0,并且没再次出现超买的情形(增设了500库存量,并且500个人抢,正好抢完)。 但是这种方案显然是不行的,在生产环境上假如部署数个tomcat实例,所以就会再次出现如下情形:
数个民主化无法共享jvm内存中的锁,所以会再次出现多把锁,这种情形下也会再次出现超买难题。
三、分布式系统锁的与此同时实现
多Tomcat实例下的超买演示
接下去他们演示一下怎样在数个Tomcat情形下,演示超买的难题:
1.开启两个tomcat服务
在IDEA中配置两个spring boot的开启项,采用vm参数指定不同的端口号
-Dserver.port=8080
2.配置nginx
编写~/nginx_redis/conf/nginx.conf如下:
user nginx;worker_processes 1;error_log /var/log/nginx/error.log warn;pid /var/run/nginx.pid;events { worker_connections 1024;}http { include /etc/nginx/mime.types; default_type application/octet-stream; log_format main $remote_addr – $remote_user [$time_local] “$request” $status $body_bytes_sent “$http_referer” “$http_user_agent” “$http_x_forwarded_for”;upstream redislock{ server 192.168.226.1:8080 weight=1; server 192.168.226.1:8081 weight=1;} server { listen 80; server_name localhost; location /{ root html; proxy_pass http://redislock; }} access_log /var/log/nginx/access.log main; sendfile on; #tcp_nopush on; keepalive_timeout 65; #gzip on; include /etc/nginx/conf.d/*.conf;}
192.168.226.1这是我宿主机的IP
预备两个虚拟机(也能采用windows下的nginx),采用docker开启nginx:
docker pull nginxdocker run -di -p 10085:80 –name nginx-redis-hc -v ~/nginx_redis/html:/usr/share/nginx/html -v ~/nginx_redis/conf/nginx.conf:/etc/nginx/nginx.conf -v ~/nginx_redis/logs:/var/log/nginx nginx
在宿主机下采用虚拟机的IP地址:10085访问nginx,假如再次出现如下页面就代表成功:
3.试验
修改接口地址为nginx:
运行查看两个tomcat的控制台:
tomcat1tomcat2没将库存量清空,证明存有超买难题。
手动与此同时实现分布式系统锁
采用redis手动与此同时实现分布式系统锁,需要用到指示setnx。先来介绍一下setnx:
SETNX key value[]
可用版本: >= 1.0.0
时间复杂度: O(1)
只在键 key 不存有的情形下, 将键 key 的值增设为 value 。
若键 key 已经存有, 则 SETNX 指示不做任何动作。
SETNX 是『SET if Not eXists』(假如不存有,则 SET)的简写。
返回值
指示在增设成功时返回 1 , 增设失败时返回 0 。
标识符示例
redis> EXISTS job # job 不存有# job 不存有(integer) 0redis> SETNX job “programmer” # job 增设成功(integer) 1redis> SETNX job “code-farmer” # 尝试覆盖 job ,失败(integer) 0redis> GET job # 没被覆盖
采用redis构建分布式系统锁流程如下:
image.png
缓存1申请锁(setn注意这儿缓存没
标识符与此同时实现
/** * 第三种与此同时实现,采用redis中的setIfAbsent(setnx指示)与此同时实现分布式系统锁 */ @RequestUID().toString(); Boolean stockLock = stringRedisTemplate .opsForValue().setIfAbsent(“stockLock”, opId, Duration.ofSeconds(30); if(stockLock){ try{ String stock = stringRedisTemplate.opsForValue().get(“stock”); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //增设库存量减1 int realStock = stockNum – 1; stringRedisTemplate.opsForValue().set(“stock”,realStock + “”); System.out.println(“增设库存量” + realStock); }else{ System.out.println(“库存量严重不足”); } }catch(Exception e){ e.printStackTrace(); }finally { if(opId.equals(stringRedisTemplate .opsForValue().get(“stockLock”))){ stringRedisTemplate.delete(“stockLock”); } } } }
试验略过,这儿有几个知识点需要说明
setIfAbsent增设超时
假如setIfAbsent不增设超时时间,假设缓存执行业务标识符时间时死锁或是其他原因导致长时间不释
Boolean stockLock = stringRedisTemplate .opsForValue().setIfAbsent(“stockLock”, opId, Duration.ofSeconds(30);
增设超时时间为30秒,该时间一般小于业务执行的最大时间。
考虑这样的情景
候校验UUID与否正确,假如不正确,说明加锁缓存不是当前缓存。采用Redisson与此同时实现分布式系统锁
setnx虽好,但是与此同时实现起来毕竟太过麻烦,一不小心就可能陷入并发编程的陷阱中,所以有没更加简单的与此同时实现方式呢?答案是redisson。
Redisson是架设在Redis基础上的两个Java驻内存数据网格(In-Memory Data Grid)。【Redis官方推荐】 Redisson在基于NIO的Netty框架上,充分的利用了Redis键值数据库提供的一系列优势,在Java实用工具包中常用接口的基础上,为采用者提供了一系列具有分布式系统特性的常用工具类。使得原本作为协调单机多缓存并发程序的工具包获得了协调分布式系统多机多缓存并发系统的能力,大大降低了设计和研发大规模分布式系统系统的难度。与此同时结合各富特色的分布式系统服务,更进一步简化了分布式系统环境中程序相互之间的协作。
总而言之,redisson提供了一系列较为完善的工具类,其中就包含了分布式系统锁。用redisson与此同时实现分布式系统锁的流程极为简单。
引入依赖
org.redisson
redisson3.14.0
创建Redisson实例
@Bean public RedissonClient redisson(){ // 1. Create config object Config config = new Config(); config.useSingleServer().setAddress(“redis://127.0.0.1:6379”);// config.useClusterServers()// // use “rediss://” for SSL connection// .addNodeAddress(“redis://127.0.0.1:7181”); return Redisson.create(config); }
编写分布式系统锁标识符
@Autowired private RedissonClient redissonClient; /** * 第四种与此同时实现,采用redisson与此同时实现 */ @RequestMapping(“/deduct_stock4”) public void deductStock4(){ RLock lock = redissonClient.getLock(“redisson:stockLock”); try{ //加锁 lock.lock(); String stock = stringRedisTemplate.opsForValue().get(“stock”); int stockNum = Integer.parseInt(stock); if(stockNum > 0){ //增设库存量减1 int realStock = stockNum – 1; stringRedisTemplate.opsForValue().set(“stock”,realStock + “”); System.out.println(“增设库存量” + realStock); }else{ System.out.println(“库存量严重不足”); } }catch(Exception e){ e.printStackTrace(); }finally { lock.unlock(); } }
其中加锁标识符基本与进程内加锁一致,就不再详细解读,读者自行实践即可。
Redisson分布式系统锁原理
Redisson分布式系统锁的主要就原理非常简单,利用了lua脚本的原子性。 在分布式系统环境下造成并发难题的主要就原因是三个操作方式并不是原子操作方式:
RFuture
tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand
command) { internalLockLeaseTime = unit.toMillis(leaseTime); return evalWriteAsync(getName(), LongCodec.INSTANCE, command, “if (redis.call(exists, KEYS[1]) == 0) then ” + “redis.call(hincrby, KEYS[1], ARGV[2], 1); ” + “redis.call(pexpire, KEYS[1], ARGV[1]); ” + “return nil; ” + “end; ” + “if (redis.call(hexists, KEYS[1], ARGV[2]) == 1) then ” + “redis.call(hincrby, KEYS[1], ARGV[2], 1); ” + “redis.call(pexpire, KEYS[1], ARGV[1]); ” + “return nil; ” + “end; ” + “return redis.call(pttl, KEYS[1]);”, Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
这几段源码中,redisson利用了lua脚本的原子性,校验key与否存有,如果不存有就创建key并利用incrby加一操作方式(这步操作方式主要就是为的是与此同时实现可重入性)。redisson与此同时实现的分布式系统锁具备如下特性:
锁失效锁续租执行时间长的锁快要到期时会自动续租
可重入操作方式原子性锁续租原理
使用如下标识符展开试验锁续租的情形
@Testvoid test() throws InterruptedException { RLock testlock1111 = redissonClient.getLock(“testlock”); testlock1111.lock(); try{ Thread thread = new Thread(() -> { while(true){ Long testlock = redisTemplate.getExpire(“testlock”); System.out.println(LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME) + ” ttl:” + testlock); try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread.start(); thread.join(); }finally { if(testlock1111.isHeldByCurrentThread()){ testlock1111.unlock(); } }}
他们会发现,每隔10秒会自动续租一次,保证锁不被释放。
所以这种续租的行为是怎样与此同时实现的呢?考虑这种情形:假如缓存加锁之后,民主化宕机,缓存无法执行解锁标识符,所以这个锁就无法得到释放(注意,不是加锁缓存不允许乱解锁),为的是避免这种情形的发生,锁都会增设两个过期时间。比如采用lock无参指示会默认增设30秒的过期时间。所以30秒之后呢?假如缓存还在工作,自动释放依然会造成缓存安全可靠的难题。所以Redisson采用了watch dog看门狗机制来与此同时实现自动续租。
核心标识符及注释:
private
RFuture
tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture
ttlRemainingFuture; //lock()无参方法leaseTime为-1,所以进else分值 if (leaseTime > 0) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //透过lua脚本加锁 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } CompletionStage
f = ttlRemainingFuture.thenApply(ttlRemaining -> { // 异步方法,等到加锁成功会回调,第一次加锁ttlRemaining为空,leaseTime为-1 if (ttlRemaining == null) { if (leaseTime > 0) { internalLockLeaseTime = unit.toMillis(leaseTime); } else { //增设延时任务 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper(f);}
接下去分析scheduleExpirationRenewal的过程:
private void renewExpiration() { ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } //创建两个延迟任务 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } //执行lua脚本展开续租 CompletionStage
future = renewExpirationAsync(threadId); future.whenComplete((res, e) -> { if (e != null) { log.error(“Cant update lock ” + getRawName() + ” expiration”, e); EXPIRATION_RENEWAL_MAP.remove(getEntryName()); return; } //执行lua续租,锁还在就续租,锁不在返回false就取消续租的行为 if (res) { // reschedule itself renewExpiration(); } else { cancelExpirationRenewal(null); } }); } }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); //internalLockLeaseTime默认值30,所以每10秒会续租一次,续租到30秒 ee.setTimeout(task);}
其中,renewExpirationAsync执行的lua脚本如下:
protected CompletionStage
renewExpirationAsync(long threadId) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, “if (redis.call(hexists, KEYS[1], ARGV[2]) == 1) then ” + “redis.call(pexpire, KEYS[1], ARGV[1]); ” + “return 1; ” + “end; ” + “return 0;”, Collections.singletonList(getRawName()), internalLockLeaseTime, getLockName(threadId));}
推论hash中与否存有锁,假如存有就增设过期时间为30秒,返回1。假如不存有就返回0。
总结
责任编辑介绍了超买难题造成的原因:操作方式不具备原子性,与此同时提出了集中化解思路。
synchronized锁,无法保证多实例下的缓存安全可靠setnx手动与此同时实现,坑许多、标识符较为复杂redisson与此同时实现,能够保证多实例下线程安全可靠,标识符简单可靠