Implementing redis distributed lock with spring boot annotation in half an hour

Java tinker 2021-01-14 16:32:51
implementing redis distributed lock spring


Preface

One 、 Business background

Some business requests , It's a time-consuming operation , It needs to be locked , Prevent subsequent concurrent operations , At the same time, the database data operation , Need to avoid impact on previous business .

Two 、 Analysis process

Use Redis As a distributed lock , Put the state of the lock in Redis Unified maintenance , Solve the problem of single machine in cluster JVM The problem of non exchange of information , Define the sequence of operations , Protect the user's data correctly .

Sort out the design process

1. New annotation @interface, Set the input flag in the annotation

2. increase AOP Tangent point , Scan specific annotations

3. establish @Aspect Section mission , register bean And intercept specific methods

4. Specific method parameters ProceedingJoinPoint, The method pjp.proceed() Intercept before and after

5. Lock before tangent point , Delete after task execution key

The core step : Lock 、 Unlock and renew

Lock

Used RedisTemplate Of opsForValue.setIfAbsent Method , To determine if there is key, Set a random number UUID.random().toString, Generate a random number as value.

from redis After getting the lock in , Yes key Set up expire Failure time , Automatically release lock after expiration .

According to this design , Only the first successful setting Key Request , In order to carry out subsequent data operations , Other subsequent requests are unable to obtain resources , It will end in failure .

Timeout problem

worry pjp.proceed() The tangent execution method is too time consuming , Lead to Redis Medium key Released ahead of time due to timeout .

for example , Threads A Get the lock first ,proceed The method is time consuming , Exceeded lock timeout , The expiration releases the lock , Now another thread B Succeed in getting Redis lock , Two threads operate on the same batch of data at the same time , Leading to inaccurate data .

Solution : Add one more 「 Continued 」

The task is not completed , Lock does not release :

Maintains a timed thread pool ScheduledExecutorService, every other 2s To scan the queue for Task, Judge whether the failure time is approaching , Formula for :【 Failure time 】<= 【 current time 】+【 Failure interval ( One third of the time out )】

/**
* Thread pool , Every JVM Use a thread to maintain keyAliveTime, Timing execution runnable
*/
private static final ScheduledExecutorService SCHEDULER =
new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
static {
SCHEDULER.scheduleAtFixedRate(() -> {
// do something to extend time
}, 0, 2, TimeUnit.SECONDS);
}

3、 ... and 、 design scheme

Go through the above analysis , My colleagues designed this plan :

I've talked about the whole process , Here are a few core steps :

Blocking annotations @RedisLock, Get the necessary parameters

Lock operation

Continuous operation

Close the business , Release the lock

Four 、 Practice

It's been sorted out before AOP Usage method , You can refer to it

Business attribute enumeration settings

public enum RedisLockTypeEnum {
/**
* Customize key Prefix
*/
ONE("Business1", "Test1"),
TWO("Business2", "Test2");
private String code;
private String desc;
RedisLockTypeEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
public String getCode() {
return code;
}
public String getDesc() {
return desc;
}
public String getUniqueKey(String key) {
return String.format("%s:%s", this.getCode(), key);
}
}

Task queue save parameters

public class RedisLockDefinitionHolder {
/**
* The business is unique key
*/
private String businessKey;
/**
* Lock time ( second s)
*/
private Long lockTime;
/**
* Last updated (ms)
*/
private Long lastModifyTime;
/**
* Save the current thread
*/
private Thread currentTread;
/**
* Total number of attempts
*/
private int tryCount;
/**
* Current attempts
*/
private int currentCount;
/**
* Update time period ( millisecond ), The formula = Lock time ( In milliseconds ) / 3
*/
private Long modifyPeriod;
public RedisLockDefinitionHolder(String businessKey, Long lockTime, Long lastModifyTime, Thread currentTread, int tryCount) {
this.businessKey = businessKey;
this.lockTime = lockTime;
this.lastModifyTime = lastModifyTime;
this.currentTread = currentTread;
this.tryCount = tryCount;
this.modifyPeriod = lockTime * 1000 / 3;
}
}

Set the name of the intercepted annotation

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface RedisLockAnnotation {
/**
* Specific parameter identification , The default is the second 0 A subscript
*/
int lockFiled() default 0;
/**
* Timeout retries
*/
int tryCount() default 3;
/**
* Custom lock type
*/
RedisLockTypeEnum typeEnum();
/**
* Release time , second s Company
*/
long lockTime() default 30;
}

Core section interception operation
RedisLockAspect.java The class is divided into three parts to describe the specific role

Pointcut Set up

/**
* @annotation The path in represents the interception of specific annotations
*/
@Pointcut("@annotation(cn.sevenyuan.demo.aop.lock.RedisLockAnnotation)")
public void redisLockPC() {
}

Around Lock and release before and after
The previous steps define the pointcuts we want to intercept , The next step is to do some custom operations before and after the tangent point :

@Around(value = "redisLockPC()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
// Analytical parameters
Method method = resolveMethod(pjp);
RedisLockAnnotation annotation = method.getAnnotation(RedisLockAnnotation.class);
RedisLockTypeEnum typeEnum = annotation.typeEnum();
Object[] params = pjp.getArgs();
String ukString = params[annotation.lockFiled()].toString();
// Omit a lot of parameter checking and null
String businessKey = typeEnum.getUniqueKey(ukString);
String uniqueValue = UUID.randomUUID().toString();
// Lock
Object result = null;
try {
boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(businessKey, uniqueValue);
if (!isSuccess) {
throw new Exception("You can't do it,because another has get the lock =-=");
}
redisTemplate.expire(businessKey, annotation.lockTime(), TimeUnit.SECONDS);
Thread currentThread = Thread.currentThread();
// This time Task Information to join 「 Time delay 」 In line
holderList.add(new RedisLockDefinitionHolder(businessKey, annotation.lockTime(), System.currentTimeMillis(),
currentThread, annotation.tryCount()));
// Perform business operations
result = pjp.proceed();
// Thread interrupted , Throw an exception , Interrupt this request
if (currentThread.isInterrupted()) {
throw new InterruptedException("You had been interrupted =-=");
}
} catch (InterruptedException e ) {
log.error("Interrupt exception, rollback transaction", e);
throw new Exception("Interrupt exception, please send request again");
} catch (Exception e) {
log.error("has some error, please check again", e);
} finally {
// After request , Force to delete key, Release the lock
redisTemplate.delete(businessKey);
log.info("release the lock, businessKey is [" + businessKey + "]");
}
return result;
}

The above process is a simple summary :

Parsing annotation parameters , Get annotation values and parameter values on methods

redis Lock and set timeout

This time Task Information to join 「 Time delay 」 In line , Carry on the continuation , Way to release the lock ahead of time

Added a thread interrupt flag

End request ,finally Middle release lock

Continuous operation

This is used here. ScheduledExecutorService, A thread is maintained , Constantly judge the tasks in the task queue and extend the timeout time :

// Scanning task queue
private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
/**
* Thread pool , maintain keyAliveTime
*/
private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
{
// Once in two seconds 「 Continued 」 operation
SCHEDULER.scheduleAtFixedRate(() -> {
// Remember to add try-catch, Otherwise, the scheduled task will not be executed after the error is reported =-=
Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();
while (iterator.hasNext()) {
RedisLockDefinitionHolder holder = iterator.next();
// Sentenced to empty
if (holder == null) {
iterator.remove();
continue;
}
// Judge key Is it still valid , If it doesn't work, remove it
if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {
iterator.remove();
continue;
}
// Timeout retries , Set interrupt to thread when exceeding
if (holder.getCurrentCount() > holder.getTryCount()) {
holder.getCurrentTread().interrupt();
iterator.remove();
continue;
}
// Judge whether to enter the last third of the time
long curTime = System.currentTimeMillis();
boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;
if (shouldExtend) {
holder.setLastModifyTime(curTime);
redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);
log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());
holder.setCurrentCount(holder.getCurrentCount() + 1);
}
}
}, 0, 2, TimeUnit.SECONDS);
}

This code , It is used to realize the idea of dotted box in the design drawing , It's time consuming to avoid a request , Leading to early release of the lock .

It's added here 「 Thread the interrupt 」Thread#interrupt, I hope after more than retries , Can interrupt threads ( Without rigorous testing , For reference only, ha ha ha )

However, it is suggested that if you encounter such a time-consuming request , We can still find out from the root , Analyze the time-consuming path , Carry out business optimization or other processing , Avoid these time-consuming operations .

So remember to do more Log, It's faster to analyze problems . How to use SpringBoot AOP Record operation log 、 Abnormal log ?

5、 ... and 、 Start testing

In an entry method , Use this annotation , Then simulate time-consuming requests in the business , Used Thread#sleep

@GetMapping("/testRedisLock")
@RedisLockAnnotation(typeEnum = RedisLockTypeEnum.ONE, lockTime = 3)
public Book testRedisLock(@RequestParam("userId") Long userId) {
try {
log.info(" Before sleep execution ");
Thread.sleep(10000);
log.info(" After sleep execution ");
} catch (Exception e) {
// log error
log.info("has some error", e);
}
return null;
}

When using , Add the annotation to the method , Then set the corresponding parameters , according to typeEnum You can differentiate between multiple businesses , Limit the business to be operated at the same time .

test result :

2020-04-04 14:55:50.864 INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController : Before sleep execution
2020-04-04 14:55:52.855 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 0
2020-04-04 14:55:54.851 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 1
2020-04-04 14:55:56.851 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 2
2020-04-04 14:55:58.852 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 3
2020-04-04 14:56:00.857 INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController : has some error
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) [na:1.8.0_221]

What I'm testing here is too many retries , The scene of failure , If you reduce sleep time , Can make the business run normally .

If you ask at the same time , You will find the following error message :

It means that our lock does work , Avoid duplicate requests .

6、 ... and 、 summary

For time-consuming business and core data , You can't let duplicate requests manipulate data at the same time , Avoid incorrect data , So use distributed locks to protect them .

Let's sort out the design process :

1. New annotation @interface, Set the input flag in the annotation

2. increase AOP Tangent point , Scan specific annotations

3. establish @Aspect Section mission , register bean And intercept specific methods

4. Specific method parameters ProceedingJoinPoint, The method pjp.proceed() Intercept before and after

5. Lock before tangent point , Delete after task execution key

This study is through Review Small partner's code design , Learn about the specific implementation of distributed lock , Copy his design , Rewriting a simplified version of business processing . For what I didn't think about before 「 Continued 」 operation , Here, a guard thread is used to determine the timing and extend the timeout time , The lock is not released in advance .

So , At the same time, three knowledge points are reviewed :

1、AOP The realization and common methods of virtual reality

2、 Timed thread pool ScheduledExecutorService The use and parameter meaning of

3、 Threads Thread#interrupt The meaning and usage of ( This is interesting , We can learn more about it )

Last

I've arranged a copy here :SpringBoot Related information 、Spring Family bucket series ,Java The systematic information of ,( Include Java Core knowledge points 、 Interview topics and 20 The latest Internet real topic in 、 E-books, etc ) Friends in need can pay attention to the official account. 【 Cheng Xuyuan, Xiao Wan 】 Can get .

版权声明
本文为[Java tinker]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210114163145622X.html

  1. Rocketmq CPP client visual studio 2019 compilation
  2. Usage of data custom attribute in jquery
  3. Common decompression in Linux
  4. Upload large files in Java
  5. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  6. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  7. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  8. Docker + MySQL Cluster + read / write separation + MYCAT Management + vertical sub database + load balancing
  9. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  10. Java use interceptor infinite forwarding / redirection infinite loop / redirection times too many error (stack overflow error) solution
  11. 010_ MySQL
  12. 010_ MySQL
  13. Fast integration of imsdk and Huawei offline push
  14. 消息队列之RabbitMQ
  15. Rabbitmq of message queue
  16. 初学java进制转换方面补充学习
  17. Learn java base conversion supplementary learning
  18. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  19. 了解一下RPC,为何诞生RPC,和HTTP有什么不同?
  20. 初学java进制转换方面补充学习
  21. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  22. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  23. Learn java base conversion supplementary learning
  24. JDBC测试连接数据库
  25. JDBC test connection database
  26. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  27. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  28. 安卓开发和java开发有什么区别!2021年BATJ30套大厂Android经典高频面试题,面试必问
  29. Spring Security OAuth2.0認證授權四:分散式系統認證授權
  30. What's the difference between Android development and java development! 2021 batj30 Android classic high frequency interview questions
  31. Spring security oauth2.0 authentication and authorization 4: distributed system authentication and authorization
  32. Java微服务 vs Go微服务,究竟谁更强!?
  33. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  34. Who is stronger, Java microservice vs go microservice!?
  35. Java微服务 vs Go微服务,究竟谁更强!?
  36. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  37. Who is stronger, Java microservice vs go microservice!?
  38. springboot异常处理之404
  39. Spring boot exception handling 404
  40. Spring Boot Security 国际化 多语言 i18n 趟过巨坑
  41. springboot异常处理之404
  42. Spring boot security international multilingual I18N
  43. Spring boot exception handling 404
  44. Netty系列化之Google Protobuf编解码
  45. Netty之编解码
  46. Java编解码
  47. Netty解码器
  48. Netty与TCP粘包拆包
  49. Netty开发入门
  50. Java集合遍历时遇到的坑
  51. Spring IOC 源码解析(下)
  52. Spring IoC源码解析(上)
  53. Google protobuf codec of netty serialization
  54. Encoding and decoding of netty
  55. Java codec
  56. Netty decoder
  57. Netty and TCP packet sticking and unpacking
  58. Introduction to netty development
  59. Problems encountered in Java collection traversal
  60. Spring IOC source code analysis (2)