Preface
One 、 Business background
Some business requests , It's a time-consuming operation , It needs to be locked , Prevent subsequent concurrent operations , At the same time, the database data operation , Need to avoid impact on previous business .
Two 、 Analysis process
Use Redis As a distributed lock , Put the state of the lock in Redis Unified maintenance , Solve the problem of single machine in cluster JVM The problem of non exchange of information , Define the sequence of operations , Protect the user's data correctly .
Sort out the design process
1. New annotation @interface, Set the input flag in the annotation
2. increase AOP Tangent point , Scan specific annotations
3. establish @Aspect Section mission , register bean And intercept specific methods
4. Specific method parameters ProceedingJoinPoint, The method pjp.proceed() Intercept before and after
5. Lock before tangent point , Delete after task execution key
The core step : Lock 、 Unlock and renew
Lock
Used RedisTemplate Of opsForValue.setIfAbsent Method , To determine if there is key, Set a random number UUID.random().toString, Generate a random number as value.
from redis After getting the lock in , Yes key Set up expire Failure time , Automatically release lock after expiration .
According to this design , Only the first successful setting Key Request , In order to carry out subsequent data operations , Other subsequent requests are unable to obtain resources , It will end in failure .
Timeout problem
worry pjp.proceed() The tangent execution method is too time consuming , Lead to Redis Medium key Released ahead of time due to timeout .
for example , Threads A Get the lock first ,proceed The method is time consuming , Exceeded lock timeout , The expiration releases the lock , Now another thread B Succeed in getting Redis lock , Two threads operate on the same batch of data at the same time , Leading to inaccurate data .
Solution : Add one more 「 Continued 」
The task is not completed , Lock does not release :
Maintains a timed thread pool ScheduledExecutorService, every other 2s To scan the queue for Task, Judge whether the failure time is approaching , Formula for :【 Failure time 】<= 【 current time 】+【 Failure interval ( One third of the time out )】
/**
* Thread pool , Every JVM Use a thread to maintain keyAliveTime, Timing execution runnable
*/
private static final ScheduledExecutorService SCHEDULER =
new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
static {
SCHEDULER.scheduleAtFixedRate(() -> {
// do something to extend time
}, 0, 2, TimeUnit.SECONDS);
}
3、 ... and 、 design scheme
Go through the above analysis , My colleagues designed this plan :
I've talked about the whole process , Here are a few core steps :
Blocking annotations @RedisLock, Get the necessary parameters
Lock operation
Continuous operation
Close the business , Release the lock
Four 、 Practice
It's been sorted out before AOP Usage method , You can refer to it
Related property class configuration
Business attribute enumeration settings
public enum RedisLockTypeEnum {
/**
* Customize key Prefix
*/
ONE("Business1", "Test1"),
TWO("Business2", "Test2");
private String code;
private String desc;
RedisLockTypeEnum(String code, String desc) {
this.code = code;
this.desc = desc;
}
public String getCode() {
return code;
}
public String getDesc() {
return desc;
}
public String getUniqueKey(String key) {
return String.format("%s:%s", this.getCode(), key);
}
}
Task queue save parameters
public class RedisLockDefinitionHolder {
/**
* The business is unique key
*/
private String businessKey;
/**
* Lock time ( second s)
*/
private Long lockTime;
/**
* Last updated (ms)
*/
private Long lastModifyTime;
/**
* Save the current thread
*/
private Thread currentTread;
/**
* Total number of attempts
*/
private int tryCount;
/**
* Current attempts
*/
private int currentCount;
/**
* Update time period ( millisecond ), The formula = Lock time ( In milliseconds ) / 3
*/
private Long modifyPeriod;
public RedisLockDefinitionHolder(String businessKey, Long lockTime, Long lastModifyTime, Thread currentTread, int tryCount) {
this.businessKey = businessKey;
this.lockTime = lockTime;
this.lastModifyTime = lastModifyTime;
this.currentTread = currentTread;
this.tryCount = tryCount;
this.modifyPeriod = lockTime * 1000 / 3;
}
}
Set the name of the intercepted annotation
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface RedisLockAnnotation {
/**
* Specific parameter identification , The default is the second 0 A subscript
*/
int lockFiled() default 0;
/**
* Timeout retries
*/
int tryCount() default 3;
/**
* Custom lock type
*/
RedisLockTypeEnum typeEnum();
/**
* Release time , second s Company
*/
long lockTime() default 30;
}
Core section interception operation
RedisLockAspect.java The class is divided into three parts to describe the specific role
Pointcut Set up
/**
* @annotation The path in represents the interception of specific annotations
*/
@Pointcut("@annotation(cn.sevenyuan.demo.aop.lock.RedisLockAnnotation)")
public void redisLockPC() {
}
Around Lock and release before and after
The previous steps define the pointcuts we want to intercept , The next step is to do some custom operations before and after the tangent point :
@Around(value = "redisLockPC()")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
// Analytical parameters
Method method = resolveMethod(pjp);
RedisLockAnnotation annotation = method.getAnnotation(RedisLockAnnotation.class);
RedisLockTypeEnum typeEnum = annotation.typeEnum();
Object[] params = pjp.getArgs();
String ukString = params[annotation.lockFiled()].toString();
// Omit a lot of parameter checking and null
String businessKey = typeEnum.getUniqueKey(ukString);
String uniqueValue = UUID.randomUUID().toString();
// Lock
Object result = null;
try {
boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(businessKey, uniqueValue);
if (!isSuccess) {
throw new Exception("You can't do it,because another has get the lock =-=");
}
redisTemplate.expire(businessKey, annotation.lockTime(), TimeUnit.SECONDS);
Thread currentThread = Thread.currentThread();
// This time Task Information to join 「 Time delay 」 In line
holderList.add(new RedisLockDefinitionHolder(businessKey, annotation.lockTime(), System.currentTimeMillis(),
currentThread, annotation.tryCount()));
// Perform business operations
result = pjp.proceed();
// Thread interrupted , Throw an exception , Interrupt this request
if (currentThread.isInterrupted()) {
throw new InterruptedException("You had been interrupted =-=");
}
} catch (InterruptedException e ) {
log.error("Interrupt exception, rollback transaction", e);
throw new Exception("Interrupt exception, please send request again");
} catch (Exception e) {
log.error("has some error, please check again", e);
} finally {
// After request , Force to delete key, Release the lock
redisTemplate.delete(businessKey);
log.info("release the lock, businessKey is [" + businessKey + "]");
}
return result;
}
The above process is a simple summary :
Parsing annotation parameters , Get annotation values and parameter values on methods
redis Lock and set timeout
This time Task Information to join 「 Time delay 」 In line , Carry on the continuation , Way to release the lock ahead of time
Added a thread interrupt flag
End request ,finally Middle release lock
Continuous operation
This is used here. ScheduledExecutorService, A thread is maintained , Constantly judge the tasks in the task queue and extend the timeout time :
// Scanning task queue
private static ConcurrentLinkedQueue<RedisLockDefinitionHolder> holderList = new ConcurrentLinkedQueue();
/**
* Thread pool , maintain keyAliveTime
*/
private static final ScheduledExecutorService SCHEDULER = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("redisLock-schedule-pool").daemon(true).build());
{
// Once in two seconds 「 Continued 」 operation
SCHEDULER.scheduleAtFixedRate(() -> {
// Remember to add try-catch, Otherwise, the scheduled task will not be executed after the error is reported =-=
Iterator<RedisLockDefinitionHolder> iterator = holderList.iterator();
while (iterator.hasNext()) {
RedisLockDefinitionHolder holder = iterator.next();
// Sentenced to empty
if (holder == null) {
iterator.remove();
continue;
}
// Judge key Is it still valid , If it doesn't work, remove it
if (redisTemplate.opsForValue().get(holder.getBusinessKey()) == null) {
iterator.remove();
continue;
}
// Timeout retries , Set interrupt to thread when exceeding
if (holder.getCurrentCount() > holder.getTryCount()) {
holder.getCurrentTread().interrupt();
iterator.remove();
continue;
}
// Judge whether to enter the last third of the time
long curTime = System.currentTimeMillis();
boolean shouldExtend = (holder.getLastModifyTime() + holder.getModifyPeriod()) <= curTime;
if (shouldExtend) {
holder.setLastModifyTime(curTime);
redisTemplate.expire(holder.getBusinessKey(), holder.getLockTime(), TimeUnit.SECONDS);
log.info("businessKey : [" + holder.getBusinessKey() + "], try count : " + holder.getCurrentCount());
holder.setCurrentCount(holder.getCurrentCount() + 1);
}
}
}, 0, 2, TimeUnit.SECONDS);
}
This code , It is used to realize the idea of dotted box in the design drawing , It's time consuming to avoid a request , Leading to early release of the lock .
It's added here 「 Thread the interrupt 」Thread#interrupt, I hope after more than retries , Can interrupt threads ( Without rigorous testing , For reference only, ha ha ha )
However, it is suggested that if you encounter such a time-consuming request , We can still find out from the root , Analyze the time-consuming path , Carry out business optimization or other processing , Avoid these time-consuming operations .
So remember to do more Log, It's faster to analyze problems . How to use SpringBoot AOP Record operation log 、 Abnormal log ?
5、 ... and 、 Start testing
In an entry method , Use this annotation , Then simulate time-consuming requests in the business , Used Thread#sleep
@GetMapping("/testRedisLock")
@RedisLockAnnotation(typeEnum = RedisLockTypeEnum.ONE, lockTime = 3)
public Book testRedisLock(@RequestParam("userId") Long userId) {
try {
log.info(" Before sleep execution ");
Thread.sleep(10000);
log.info(" After sleep execution ");
} catch (Exception e) {
// log error
log.info("has some error", e);
}
return null;
}
When using , Add the annotation to the method , Then set the corresponding parameters , according to typeEnum You can differentiate between multiple businesses , Limit the business to be operated at the same time .
test result :
2020-04-04 14:55:50.864 INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController : Before sleep execution
2020-04-04 14:55:52.855 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 0
2020-04-04 14:55:54.851 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 1
2020-04-04 14:55:56.851 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 2
2020-04-04 14:55:58.852 INFO 9326 --- [k-schedule-pool] c.s.demo.aop.lock.RedisLockAspect : businessKey : [Business1:1024], try count : 3
2020-04-04 14:56:00.857 INFO 9326 --- [nio-8081-exec-1] c.s.demo.controller.BookController : has some error
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method) [na:1.8.0_221]
What I'm testing here is too many retries , The scene of failure , If you reduce sleep time , Can make the business run normally .
If you ask at the same time , You will find the following error message :
It means that our lock does work , Avoid duplicate requests .
6、 ... and 、 summary
For time-consuming business and core data , You can't let duplicate requests manipulate data at the same time , Avoid incorrect data , So use distributed locks to protect them .
Let's sort out the design process :
1. New annotation @interface, Set the input flag in the annotation
2. increase AOP Tangent point , Scan specific annotations
3. establish @Aspect Section mission , register bean And intercept specific methods
4. Specific method parameters ProceedingJoinPoint, The method pjp.proceed() Intercept before and after
5. Lock before tangent point , Delete after task execution key
This study is through Review Small partner's code design , Learn about the specific implementation of distributed lock , Copy his design , Rewriting a simplified version of business processing . For what I didn't think about before 「 Continued 」 operation , Here, a guard thread is used to determine the timing and extend the timeout time , The lock is not released in advance .
So , At the same time, three knowledge points are reviewed :
1、AOP The realization and common methods of virtual reality
2、 Timed thread pool ScheduledExecutorService The use and parameter meaning of
3、 Threads Thread#interrupt The meaning and usage of ( This is interesting , We can learn more about it )
Last
I've arranged a copy here :SpringBoot Related information 、Spring Family bucket series ,Java The systematic information of ,( Include Java Core knowledge points 、 Interview topics and 20 The latest Internet real topic in 、 E-books, etc ) Friends in need can pay attention to the official account. 【 Cheng Xuyuan, Xiao Wan 】 Can get .