Preface

The last article wrote Redis The principle and defect of distributed lock , I don't like it , Just a brief introduction Redisson This framework , The specific principle has not been mentioned yet . While the project was almost busy years ago , Anyway, idle is also idle , How about putting Redisson Learn the source code of .

Although it was a whim , But after careful study, it turns out that Redisson The workload of source code interpretation is quite heavy , There's a lot of Java Concurrent class , And quoted Netty As a communication tool , Realization and Redis Remote calls to components , It is not realistic to explain all these knowledge points , The main point of this paper is about Redisson The implementation principle of distributed lock , So the code interpretation of network communication and concurrency principle will not be too careful , I'm sorry for the shortcomings !

Redis Publish subscribe

I said before , There are actually three core functions of distributed lock : Lock 、 Unlock 、 Set lock timeout . These three functions are also our research Redisson The direction of distributed lock principle .

Before learning , It is necessary for us to understand a knowledge point first , That's about it Redis The publish and subscribe function of .

Redis Publish subscribe (pub/sub) It's a message communication mode : sender (pub) Send a message , subscriber (sub) receive messages , The publisher can send the (channel) Send a message , Subscribers can receive messages if they subscribe to the channel , So as to realize the communication effect of multiple clients .

The order to subscribe is SUBSCRIBE channel[channel ...], You can subscribe to one or more channels , When new news passes PUBLISH When a command is sent to a channel , The subscriber will receive the message , It's like this

Open two clients , One subscribed to the channel channel1, The other passed through PUBLISH After sending the message , You can get the subscription , With this mode, the communication between different clients can be realized .

Of course , We won't start with the magical scenarios of this communication mode , You can check it on the Internet by yourself , Our protagonist is still Redisson, After warming up , It's time for the main course .

Redisson Source code

In the use of Redisson Before locking , You need to get one first RLock Instance object , With this object, you can call lock、tryLock Method to complete the function of locking

Config config = new Config();
config.useSingleServer()
  .setPassword("")
  .setAddress("redis://127.0.0.1:6379");
RedissonClient redisson = Redisson.create(config);
// RLock object
RLock lock = redisson.getLock("myLock");

Configure the corresponding host, Then you can create a RLock object .RLock It's an interface , The specific synchronizer needs to implement the interface , When we call redisson.getLock() when , The program initializes a default synchronization actuator RedissonLock

It initializes a few parameters ,

commandExecutor: Asynchronous Executor actuator ,Redisson All of the commands in are through ...Executor Executive ;

id: only ID, Initialization is done with UUID Created ;

internalLockLeaseTime: Waiting for lock acquisition time , This is the default definition in the configuration class , Time is 30 second ;

meanwhile , I also marked a method in the picture getEntryName, The return is “ID : Lock name ” String , Represents that the current thread holds an ID of the corresponding lock , These parameters need to be impressed , The following source code analysis often appears .

That's all about initialization , We can start to learn the source code of locking and unlocking .

Lock

Redisson There are two ways to lock ,tryLock and lock, The difference in use is tryLock You can set the expiration time of the lock leaseTime And waiting time waitTime, The logic of core processing is almost the same , Let's start with tryLock Speak up .

tryLock

The code is a bit long ... It's not convenient to make the whole picture , Just stick it up ,

/**
 * @param waitTime  Waiting for the lock  
 * @param leaseTime  Lock holding time  
 * @param unit  Time unit
 * @return
 * @throws InterruptedException
 */
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {    
        //  The remaining waiting time for the lock
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        
        final long threadId = Thread.currentThread().getId();
        //  Attempt to acquire lock , If you don't get the lock , The remaining timeout of the lock is returned
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // ttl by null, It means we can get the lock , return true
        if (ttl == null) {
            return true;
        }
        
        //  If waitTime It's over time , Just go back to false, Failed to apply for lock
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        //  Subscribe to distributed locks ,  Notify when unlocking , see , Here we use the above release - Subscribe
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        //  Block waiting for lock to release ,await() return false, Indicates that the waiting time has timed out
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                         //  It's all overtime , Unsubscribe directly
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
         //  Into the dead cycle , Call again and again tryAcquire Attempt to acquire lock , It's the same logic as the one above
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

The code is quite long , But the process is just two steps , Either the thread gets the lock and returns success ; Or if you don't get the lock and the waiting time is not over, you can continue to cycle to get the lock , At the same time, monitor whether the lock is released .

The way to hold the lock is tryAcquire, The parameters passed in are the holding time of the lock , Units of time and... Representing the current thread ID, Follow up the code to see the call stack , It'll be tuned to a place called tryAcquireAsync Methods :

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        //  If there is a waiting time to set the lock , Just call directly tryLockInnerAsync Method to acquire lock
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //  If the waiting time is not set , Add one more monitor , That is to call lock.lock() The logic of running , I'll say later
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

Let's go on with , have a look tryLockInnerAsync Method source code :

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
String getLockName(long threadId) {
    return id + ":" + threadId;
}

Here is the underlying call stack , Direct operation command , Integrated into lua After script , call netty The tool class of is similar to redis communicate , So as to achieve the function of obtaining the lock .

This script command is a bit interesting , A simple interpretation of :

  • First use exists key Command to determine whether the lock is occupied , If not, use hset Command write ,key Is the name of the lock ,field by “ The client is unique ID: Threads ID”,value by 1;
  • The lock is occupied , Determine whether it is occupied by the current thread , If yes value It's worth adding 1;
  • The lock is not occupied by the current thread , Returns the remaining expiration time of the lock ;

The logic of the command is not complicated , But I have to say , The author's design is very intentional , It was used redis Of Hash Structure stores data , If it is found that the current thread already holds the lock , Just use hincrby The order will value It's worth adding 1,value The value of will determine the number of times the unlock command is called when the lock is released , Achieve the effect of lock reentry .

The logic corresponding to each step of the command is indicated in the figure below , You can read :

Let's continue with the code , According to the above command, it can be seen that , If the thread gets the lock ,tryLock Method will return directly true, Everything will be fine .

If you can't get it , Will return the remaining expiration time of the lock , What's the use of this duration ? We go back to tryLock The dead loop in the method :

Here's one for waitTime and key Compare the size of the remaining expiration time of , Take the smaller of the two , And then use Java Of Semaphore Semaphore tryAcquire Method to block the thread .

that Semaphore Who controls the semaphore , When can we release Well . Here we need to go back to the top , You guys should remember , It's on us tryLock There's also this section in the code :

current = System.currentTimeMillis();
//  Subscribe to distributed locks ,  Notify when unlocking
final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);

The logic of subscription is obviously subscribe In the method , Follow the chain of method calls , It's going to go in PublishSubscribe.Java in :

The purpose of this code is to convert the current thread's threadId Add to a AsyncSemaphore in , And set up a redis The monitor for , This monitor is through redis Release 、 Subscription function .

Once the monitor receives redis Message sent , From it, we can get the current thread dependent , If it's a lock released message , Immediately through the operation Semaphore( That is to call release Method ) To let go of the blockage .

After release, the thread continues to execute , It's still about judging whether it's time-out . If you haven't timed out yet , Go to the next cycle and get the lock again , Get it and go back true, If you don't get it, continue the process .

Here's an explanation , The reason for the cycle , Because the lock may be scrambled by multiple clients at the same time , The moment after the thread block is released, it is likely that the lock will not be available , But the waiting time of the thread has not passed yet , At this time, you need to run the cycle again to get the lock .

This is it. tryLock The whole process of getting the lock , If you draw a flow chart, it means something like this :

lock

except tryLock, In general, we often call lock To get the lock ,lock The process of holding the lock is the same as that of holding the lock tryLock Basically consistent , The difference lies in lock There is no parameter to manually set the lock expiration time , The call chain of this method also runs to tryAcquire Method to get the lock , The difference is , It goes to this part of the logic :

This code does two things :

1、 Preset 30 The expiration time of seconds , And then get the lock

2、 Turn on a monitor , If you find out you've got the lock , Start the timing task to refresh the expiration time of the lock

The way to refresh the expiration time is scheduleExpirationRenewal, Post the source code :

private void scheduleExpirationRenewal(final long threadId) {
 // expirationRenewalMap It's a ConcurrentMap, The storage flag is " Current thread ID:key name " The task of
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                //  Detecting the existence of a lock lua Script , If it exists, use pexpire Command refresh expiration time
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

The flow of the code is relatively simple , It's about starting a timed task , every other internalLockLeaseTime / 3 Time for ( This time is 10 second ) To detect whether the lock is still held by the current thread , If yes, reset the expiration time internalLockLeaseTime, That is to say 30 The second time .

And these timing tasks are stored in a ConcurrentHashMap object expirationRenewalMap in , Stored key for “ Threads ID:key name ”, If you find that expirationRenewalMap There is no corresponding current thread in key Words , Regular tasks don't run , This is also an important step in unlocking .

This is the code above Redisson The so-called ” watchdog “ Program , Use an asynchronous thread to detect and execute , In case it's expired before manual unlocking .

The other logic is like tryLock() It's almost the same , Let's have a look

Unlock

There's a way to hold the lock , Naturally, there will be unlocking .Redisson The upper calling method of distributed lock unlocking is unlock(), By default, no parameters are passed

@Override
    public void unlock() {
     //  Initiate command request to release lock
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
         //  Lock released successfully , Cancel " watchdog " Continuous thread of
            cancelExpirationRenewal();
        }
    }

Unlock related command operations in unlockInnerAsync Defined in method ,

Another bunch of lua Script , It's a little more complicated than the command in the previous script , But that's okay , Let's just sort it out , The logic of the command is like this :

1、 Determine if the lock exists , If it doesn't exist, use publish Command to release the lock , After the subscriber receives it, he can do the next lock handling ;

2、 The lock exists but is not held by the current thread , Return to vacant nil;

3、 The current thread holds the lock , use hincrby Command will lock the number of reentrant times -1, Then judge whether the number of reentry is greater than 0, If yes, refresh the expiration time of the lock , return 0, Otherwise, delete the lock , And release the lock , return 1;

When the thread completely releases the lock , Will call cancelExpirationRenewal() Method cancel " watchdog " Continuous thread of

void cancelExpirationRenewal() {
 // expirationRenewalMap Remove the corresponding key, Will not be executed for the current thread " watchdog " Procedure
    Timeout task = expirationRenewalMap.remove(getEntryName());
    if (task != null) {
        task.cancel();
    }
}

This is the process of releasing the lock , What about? , Is it relatively simple , It's much more comfortable to read than to lock the code , Of course! , Simple to simple , In order to facilitate you to sort out the whole process of distributed locking , Of course, I still painstakingly draw a flow chart to show you ( Just for that , Should I have a third company , ha-ha ):

RedLock

That's all Redisson Explain the principle of distributed lock , in general , It's simple to use lua Script integration basic set The command realizes the function of lock , That's a lot Redis The design principle of distributed lock tool . besides ,Redisson It also supports the use of "RedLock Algorithm " To achieve the lock effect , This tool class is RedissonRedLock.

It's easy to use , Create multiple Redisson Node, By these unrelated Node You can form a complete distributed lock

RLock lock1 = Redisson.create(config1).getLock(lockKey);
RLock lock2 = Redisson.create(config2).getLock(lockKey);
RLock lock3 = Redisson.create(config3).getLock(lockKey);

RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
try {
   redLock.lock();
} finally {
   redLock.unlock();
}

RedLock I will not elaborate on the principle of algorithm , If you are interested, please read my previous article , Or search online , In short, it can effectively prevent Redis Examples of single point of failure problems , But it's not entirely reliable , No matter what kind of design it is , The light on Redis It is impossible to guarantee the strong consistency of the lock itself .

Or that sentence , You can't have both fish and bear paws , This is often the case with performance and security ,Redis Powerful performance and ease of use are enough to meet the daily needs of distributed locks , If the business scenario is intolerable of lock security risks , The most secure way is to do idempotent processing in the business layer .

summary

Read the source code analysis of this article , I believe you are right Redisson We also know enough about the design of distributed locks , Of course! , Although it's about the source code , Our main focus is on the principle of distributed lock , Some code that has nothing to do with the process does not take the interpretation of the sentence , If you are interested, you can read it yourself , Many places in the source code show the magical use of some basic concurrency tools and network communication , It's very rewarding to learn .

Finally, I still want to make complaints about it ,Redisson It's true that there are very few comments on it ......

If you find the article useful , You're welcome to support , It would be the best encouragement for me to create !

author : I'm Xue , An Internet person who is not limited to technology , Like to use easy to understand language to deconstruct the knowledge of back-end technology , I want to read more interesting articles to pay attention to my official account , WeChat search 【 I'm Xue 】 Can focus on