半小时用Spring Boot注解实现Redis分布式锁

Java小叮当 2021-01-14 16:31:59
spring boot 注解 小时 半小时


前言

一、业务背景

有些业务请求,属于耗时操作,需要加锁,防止后续的并发操作,同时对数据库的数据进行操作,需要避免对之前的业务造成影响。

二、分析流程

使用Redis作为分布式锁,将锁的状态放到Redis统一维护,解决集群中单机JVM信息不互通的问题,规定操作顺序,保护用户的数据正确。

梳理设计流程

1.新建注解@interface,在注解里设定入参标志

2.增加AOP切点,扫描特定注解

3.建立@Aspect切面任务,注册bean和拦截特定方法

4.特定方法参数ProceedingJoinPoint,对方法pjp.proceed()前后进行拦截

5.切点前进行加锁,任务执行后进行删除key

核心步骤:加锁、解锁和续时

加锁

使用了RedisTemplate的opsForValue.setIfAbsent 方法,判断是否有key,设定一个随机数UUID.random().toString,生成一个随机数作为value。

从redis中获取锁之后,对key设定expire失效时间,到期后自动释放锁。

按照这种设计,只有第一个成功设定Key的请求,才能进行后续的数据操作,后续其它请求由于无法获得资源,将会失败结束。

超时问题

担心pjp.proceed()切点执行的方法太耗时,导致Redis中的key由于超时提前释放了。

例如,线程A先获取锁,proceed方法耗时,超过了锁超时时间,到期释放了锁,这时另一个线程B成功获取Redis锁,两个线程同时对同一批数据进行操作,导致数据不准确。

解决方案:增加一个「续时」

任务不完成,锁不释放:

维护了一个定时线程池ScheduledExecutorService,每隔2s去扫描加入队列中的Task,判断是否失效时间是否快到了,公式为:【失效时间】<= 【当前时间】+【失效间隔(三分之一超时)】

/**
* 线程池,每个 JVM 使用一个线程去维护 keyAliveTime,定时执行 runnable
*/
private static final ScheduledExecutorService SCHEDULER =
new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
static {
SCHEDULER.scheduleAtFixedRate(() -> {
// do something to extend time
}, 0, 2, TimeUnit.SECONDS);
}

三、设计方案

经过上面的分析,同事设计出了这个方案:

前面已经说了整体流程,这里强调一下几个核心步骤:

拦截注解 @RedisLock,获取必要的参数

加锁操作

续时操作

结束业务,释放锁

四、实操

之前也有整理过AOP使用方法,可以参考一下

相关属性类配置

业务属性枚举设定

public enum RedisLockTypeEnum {
/**
* 自定义 key 前缀
*/
ONE("Business1", "Test1"),
TWO("Business2", "Test2");
private String code;
private String desc;
RedisLockTypeEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
public String getCode() {
return code;
}
public String getDesc() {
return desc;
}
public String getUniqueKey(String key) {
return String.format("%s:%s", this.getCode(), key);
}
}

任务队列保存参数

public class RedisLockDefinitionHolder {
/**
* 业务唯一 key
*/
private String businessKey;
/**
* 加锁时间 (秒 s)
*/
private Long lockTime;
/**
* 上次更新时间(ms)
*/
private Long lastModifyTime;
/**
* 保存当前线程
*/
private Thread currentTread;
/**
* 总共尝试次数
*/
private int tryCount;
/**
* 当前尝试次数
*/
private int currentCount;
/**
* 更新的时间周期(毫秒),公式 = 加锁时间(转成毫秒) / 3
*/
private Long modifyPeriod;
public RedisLockDefinitionHolder(String businessKey, Long lockTime, Long lastModifyTime, Thread currentTread, int tryCount) {
this.businessKey = businessKey;
this.lockTime = lockTime;
this.lastModifyTime = lastModifyTime;
this.currentTread = currentTread;
this.tryCount = tryCount;
this.modifyPeriod = lockTime * 1000 / 3;
}
}

设定被拦截的注解名字

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface RedisLockAnnotation {
/**
* 特定参数识别,默认取第 0 个下标
*/
int lockFiled() default 0;
/**
* 超时重试次数
*/
int tryCount() default 3;
/**
* 自定义加锁类型
*/
RedisLockTypeEnum typeEnum();
/**
* 释放时间,秒 s 单位
*/
long lockTime() default 30;
}

核心切面拦截的操作
RedisLockAspect.java该类分成三部分来描述具体作用

Pointcut 设定

/**
* @annotation 中的路径表示拦截特定注解
*/
@Pointcut("@annotation(cn.sevenyuan.demo.aop.lock.RedisLockAnnotation)")
public void redisLockPC() {
}

Around 前后进行加锁和释放锁
前面步骤定义了我们想要拦截的切点,下一步就是在切点前后做一些自定义操作:

@Around(value = "redisLockPC()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
// 解析参数
Method method = resolveMethod(pjp);
RedisLockAnnotation annotation = method.getAnnotation(RedisLockAnnotation.class);
RedisLockTypeEnum typeEnum = annotation.typeEnum();
Object[] params = pjp.getArgs();
String ukString = params[annotation.lockFiled()].toString();
// 省略很多参数校验和判空
String businessKey = typeEnum.getUniqueKey(ukString);
String uniqueValue = UUID.randomUUID().toString();
// 加锁
Object result = null;
try {
boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(businessKey, uniqueValue);
if (!isSuccess) {
throw new Exception("You can't do it,because another has get the lock =-=");
}
redisTemplate.expire(businessKey, annotation.lockTime(), TimeUnit.SECONDS);
Thread currentThread = Thread.currentThread();
// 将本次 Task 信息加入「延时」队列中
holderList.add(new RedisLockDefinitionHolder(businessKey, annotation.lockTime(), System.currentTimeMillis(),
currentThread, annotation.tryCount()));
// 执行业务操作
result = pjp.proceed();
// 线程被中断,抛出异常,中断此次请求
if (currentThread.isInterrupted()) {
throw new InterruptedException("You had been interrupted =-=");
}
} catch (InterruptedException e ) {
log.error("Interrupt exception, rollback transaction", e);
throw new Exception("Interrupt exception, please send request again");
} catch (Exception e) {
log.error("has some error, please check again", e);
} finally {
// 请求结束后,强制删掉 key,释放锁
redisTemplate.delete(businessKey);
log.info("release the lock, businessKey is [" + businessKey + "]");
}
return result;
}

上述流程简单总结一下:

解析注解参数,获取注解值和方法上的参数值

redis 加锁并且设置超时时间

将本次 Task 信息加入「延时」队列中,进行续时,方式提前释放锁

加了一个线程中断标志

结束请求,finally 中释放锁

续时操作

这里用了ScheduledExecutorService,维护了一个线程,不断对任务队列中的任务进行判断和延长超时时间:

// 扫描的任务队列
private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
/**
* 线程池,维护keyAliveTime
*/
private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
{
// 两秒执行一次「续时」操作
SCHEDULER.scheduleAtFixedRate(() -> {
// 这里记得加 try-catch,否者报错后定时任务将不会再执行=-=
Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();
while (iterator.hasNext()) {
RedisLockDefinitionHolder holder = iterator.next();
// 判空
if (holder == null) {
iterator.remove();
continue;
}
// 判断 key 是否还有效,无效的话进行移除
if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {
iterator.remove();
continue;
}
// 超时重试次数,超过时给线程设定中断
if (holder.getCurrentCount() > holder.getTryCount()) {
holder.getCurrentTread().interrupt();
iterator.remove();
continue;
}
// 判断是否进入最后三分之一时间
long curTime = System.currentTimeMillis();
boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;
if (shouldExtend) {
holder.setLastModifyTime(curTime);
redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);
log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());
holder.setCurrentCount(holder.getCurrentCount() + 1);
}
}
}, 0, 2, TimeUnit.SECONDS);
}

这段代码,用来实现设计图中虚线框的思想,避免一个请求十分耗时,导致提前释放了锁。

这里加了「线程中断」Thread#interrupt,希望超过重试次数后,能让线程中断(未经严谨测试,仅供参考哈哈哈哈)

不过建议如果遇到这么耗时的请求,还是能够从根源上查找,分析耗时路径,进行业务优化或其它处理,避免这些耗时操作。

所以记得多打点Log,分析问题时可以更快一点。如何使用SpringBoot AOP记录操作日志、异常日志?

五、开始测试

在一个入口方法中,使用该注解,然后在业务中模拟耗时请求,使用了 Thread#sleep

@GetMapping("/testRedisLock")
@RedisLockAnnotation(typeEnum = RedisLockTypeEnum.ONE, lockTime = 3)
public Book testRedisLock(@RequestParam("userId") Long userId) {
try {
log.info("睡眠执行前");
Thread.sleep(10000);
log.info("睡眠执行后");
} catch (Exception e) {
// log error
log.info("has some error", e);
}
return null;
}

使用时,在方法上添加该注解,然后设定相应参数即可,根据 typeEnum 可以区分多种业务,限制该业务被同时操作。

测试结果:

2020-04-04 14:55:50.864 INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController : 睡眠执行前
2020-04-04 14:55:52.855 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 0
2020-04-04 14:55:54.851 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 1
2020-04-04 14:55:56.851 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 2
2020-04-04 14:55:58.852 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 3
2020-04-04 14:56:00.857 INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController : has some error
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) [na:1.8.0_221]

我这里测试的是重试次数过多,失败的场景,如果减少睡眠时间,就能让业务正常执行。

如果同时请求,你将会发现以下错误信息:

表示我们的锁的确生效了,避免了重复请求。

六、总结

对于耗时业务和核心数据,不能让重复的请求同时操作数据,避免数据的不正确,所以要使用分布式锁来对它们进行保护。

再来梳理一下设计流程:

1.新建注解@interface,在注解里设定入参标志

2.增加 AOP切点,扫描特定注解

3.建立 @Aspect切面任务,注册bean和拦截特定方法

4.特定方法参数ProceedingJoinPoint,对方法pjp.proceed() 前后进行拦截

5.切点前进行加锁,任务执行后进行删除 key

本次学习是通过Review 小伙伴的代码设计,从中了解分布式锁的具体实现,仿照他的设计,重新写了一份简化版的业务处理。对于之前没考虑到的「续时」操作,这里使用了守护线程来定时判断和延长超时时间,避免了锁提前释放。

于是乎,同时回顾了三个知识点:

1、AOP 的实现和常用方法

2、定时线程池ScheduledExecutorService 的使用和参数含义

3、线程 Thread#interrupt的含义以及用法(这个挺有意思的,可以深入再学习一下)

最后

我这边整理了一份:SpringBoot相关资料、Spring全家桶系列,Java的系统化资料,(包括Java核心知识点、面试专题和20年最新的互联网真题、电子书等)有需要的朋友可以关注公众号【程序媛小琬】即可获取。

版权声明
本文为[Java小叮当]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/xiaowan7/p/14277925.html

  1. Rocketmq CPP client visual studio 2019 compilation
  2. Usage of data custom attribute in jquery
  3. Common decompression in Linux
  4. Upload large files in Java
  5. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  6. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  7. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  8. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  9. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  10. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  11. 010_ MySQL
  12. 010_ MySQL
  13. Fast integration of imsdk and Huawei offline push
  14. 消息队列之RabbitMQ
  15. Rabbitmq of message queue
  16. 初学java进制转换方面补充学习
  17. Learn java base conversion supplementary learning
  18. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  19. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  20. 初学java进制转换方面补充学习
  21. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  22. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  23. Learn java base conversion supplementary learning
  24. JDBC测试连接数据库
  25. JDBC test connection database
  26. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  27. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  28. 安卓开发和java开发有什么区别!2021年BATJ30套大厂Android经典高频面试题,面试必问
  29. Spring Security OAuth2.0認證授權四:分散式系統認證授權
  30. What's the difference between Android development and java development! 2021 batj30 Android classic high frequency interview questions
  31. Spring security oauth2.0 authentication and authorization 4: distributed system authentication and authorization
  32. Java微服务 vs Go微服务,究竟谁更强!?
  33. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  34. Who is stronger, Java microservice vs go microservice!?
  35. Java微服务 vs Go微服务,究竟谁更强!?
  36. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  37. Who is stronger, Java microservice vs go microservice!?
  38. springboot异常处理之404
  39. Spring boot exception handling 404
  40. Spring Boot Security 国际化 多语言 i18n 趟过巨坑
  41. springboot异常处理之404
  42. Spring boot security international multilingual I18N
  43. Spring boot exception handling 404
  44. Netty系列化之Google Protobuf编解码
  45. Netty之编解码
  46. Java编解码
  47. Netty解码器
  48. Netty与TCP粘包拆包
  49. Netty开发入门
  50. Java集合遍历时遇到的坑
  51. Spring IOC 源码解析(下)
  52. Spring IoC源码解析(上)
  53. Google protobuf codec of netty serialization
  54. Encoding and decoding of netty
  55. Java codec
  56. Netty decoder
  57. Netty and TCP packet sticking and unpacking
  58. Introduction to netty development
  59. Problems encountered in Java collection traversal
  60. Spring IOC source code analysis (2)