Can you tell me what AQS is? How do you use it?

preface

During the Java interview, the knowledge related to multithreading cannot be avoided and will certainly be asked. I was asked about the knowledge of AQS, so I asked directly, do you know what AQS is? Let's talk about how it is implemented and where it is used. I really didn't speak well at that time, so I'll summarize this knowledge point this time.

What is AQS

The full name of AQS is abstractqueuedsynchronizer, which is the abstract queue synchronizer. AQS defines two resource sharing modes:

It maintains a volatile state variable and a FIFO (first in first out) queue. The state variable represents the competitive resource ID, and the queue represents the container stored when the thread that fails to compete for resources queues.

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
	...
	/**
	 * The synchronization state.
	 */
	private volatile int state;
	/**
     * Wait queue node class.
     **/
     static final class Node {
		...
	}
	...
}

AQS provides methods for operating state:

protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
protected final boolean compareAndSetState(int expect,int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this,stateOffset,expect,update);
}

Abstractqueuedsynchronizer is an abstract class. It uses the design pattern of template method to specify the methods to be implemented in exclusive and shared patterns, and has implemented some general functions. Therefore, the use methods of different patterns only need to be defined to achieve the acquisition and release of shared resources, As for the maintenance of specific threads in the waiting queue (obtaining resources into the queue, waking up out of the queue, etc.), AQS has been implemented.

Therefore, according to the mode of sharing resources, the general implementation methods are as follows:

None of the above methods is defined as abstract in the abstract class abstractqueuedsynchronizer, indicating that these methods can be implemented on demand, In the shared mode, you can only implement the methods in the shared mode (such as countdownlatch), and in the exclusive mode, you can only implement the methods in the exclusive mode (such as reentrantlock). You can also implement both modes and use both modes (such as reentrantreadwritelock).

AQS source code analysis

First, we briefly introduce how the implementation classes of the two modes of AQS, reentrantlock (exclusive mode) and countdownlatch (shared mode), are a process of sharing resources, and then analyze the whole implementation process in detail through the source code of AQS.

Exclusive mode analysis

There is a static inner class node in the abstractqueuedsynchronizer class. It represents each node in the queue. The node node has the following attributes:

// 节点的状态
volatile int waitStatus;
// 当前节点的前一个节点
volatile Node prev;
// 当前节点的后一个节点
volatile Node next;
// 当前节点中所包含的线程对象
volatile Thread thread;
// 等待队列中的下一个节点
Node nextWaiter;

What each attribute represents has been written in the code comments. There are also several constants in the node class, which represent the waitstatus values of several nodes.

	/** waitStatus value to indicate thread has cancelled */
	static final int CANCELLED =  1;
	/** waitStatus value to indicate successor's thread needs unparking */
	static final int SIGNAL    = -1;
	/** waitStatus value to indicate thread is waiting on condition */
	static final int CONDITION = -2;
	/**
	 * waitStatus value to indicate the next acquireShared should
	 * unconditionally propagate
	 */
	static final int PROPAGATE = -3;

First, the node status value waitstatus defaults to 0, and then the following constants have their own specific meanings. CANCELLED = 1; It means that the current node is cancelled from the synchronization queue, When timeout or is interrupted (in case of response interruption), the node will be triggered to change to this state, and the node will not change after entering this state. Signal = - 1; it means that the successor node is in the waiting state. When the successor node joins the queue, the state of the predecessor node will be updated to signal. Condition = - 2; the node is in the waiting queue, the node thread is waiting on the condition, and when other threads call the condition After the signal () method is, the node will be transferred from the waiting queue to the synchronization queue and added to the acquisition of synchronization status. PROPAGATE = -3; Indicates that in the sharing mode, the predecessor node will wake up the successor node after releasing resources and propagate this sharing mode.

Through the above fixed constant values, we can see that in the node state, negative values usually mean that the node is in an effective waiting state, while positive values mean that the node has been cancelled.

Therefore, there are many places in the AQS source code to judge whether the nodes in the queue are normal by waitstatus > 0 or waitstatus < 0.

In the exclusive mode, only one thread can occupy the lock resource. After the competition fails, the threads of other competing resources will enter the waiting queue, wait for the thread occupying the lock resource to release the lock, and then wake up the competing resources again.

Reentrantlock locking process

Reentrantlock is a non fair lock by default, that is, threads do not acquire locks in the order of first come first served when competing for locks, but reentrantlock also supports fair locks. You can pass in a parameter value when creating. Let's analyze the source code of AQS with the lock of reentrantlock by default. Reentrantlock does not directly inherit AQS classes, but inherits AQS classes through internal classes. In this way, its own implementation functions can be used by itself. When we lock with reentrantlock, we call the lock () method. Let's take a look at the source code of the lock () method under the default unfair lock:

/**
 * Sync object for non-fair locks
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge,backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0,1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

As can be seen from the source code, the lock () method first preempts the lock through CAS. If the preemption is successful, set the value of state to 1. Then set the object exclusive thread to the current thread.

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

If the lock preemption fails, the acquire () method will be called. The implementation of this acquire () method is in the AQS class, which describes the logic after the lock preemption fails. AQS has specified the template.

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
        selfInterrupt();
}

As described above, the exclusive mode needs to implement the tryacquire () method. Here, the first step is to preempt the lock through the tryacquire () method. If it succeeds, it returns true and if it fails, it returns false. The specific implementation of the tryacquire () method is in reentrantlock. By default, exceptions are thrown directly in the AQS class.

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
     final Thread current = Thread.currentThread();
     int c = getState();// 获取state值
     if (c == 0) { 
     // 如果state值为0,说明无锁,那么就通过cas方式,尝试加锁,成功后将独占线程设置为当前线程
         if (compareAndSetState(0,acquires)) {
             setExclusiveOwnerThread(current);
             return true;
         }
     }
     else if (current == getExclusiveOwnerThread()) { // 如果是同一个线程再次来获取锁,那么就将state的值进行加1处理(可重入锁的,重入次数)。
         int nextc = c + acquires;
         if (nextc < 0) // overflow
             throw new Error("Maximum lock count exceeded");
         setState(nextc);
         return true;
     }
     return false;
 }

Let's continue to look at the acquire () method. After the tryacquire () method is executed, if the lock fails, the addwaiter () method and acquirequeueueueueueueueued () method will be executed. These two methods are used to put the thread that failed to compete for the lock into the waiting queue.

private Node addWaiter(Node mode) {
	// 用参数指定的模式将当前线程封装成队列中的节点(EXCLUSIVE【独占】,SHARED【共享】)
    Node node = new Node(Thread.currentThread(),mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // tail是队列的尾部节点,初始时队列为空,尾部节点为null,直接调用enq将节点插入队列
    if (pred != null) {
    // 将当前线程节点的前级节点指向队列的尾部节点。
        node.prev = pred;
        // 通过CAS方式将节点插入到队列中
        if (compareAndSetTail(pred,node)) {
        // 插入成功后,将原先的尾部节点的后级节点指向新的尾部节点
            pred.next = node;
            return node;
        }
    }
    // 如果尾部节点为空或通过CAS插入队列失败则通过enq方法插入节点
    enq(node);
    return node;
}

Addwaiter () mainly does three things:

So how does the enq () method insert nodes?

The source code of enq () method is as follows:

private Node enq(final Node node) {
	// 看到死循环,就明白是通过自旋咯
    for (;;) {
    // 当tail节点为空时直接将当前节点设置成尾部节点,并插入到队列中,以及设置它为head节点。
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
        // 若是因为在addWaiter()方法中插入失败或第二次进入循环,那么将当前线程的前级节点指向尾部节点,并通过CAS方式将尾部节点指向当前线程的节点。
            node.prev = t;
            if (compareAndSetTail(t,node)) {
                t.next = node;
                return t;
            }
        }
    }
}

In fact, enq () method is mainly used to insert data into the queue through spin:

In this way, the addwaiter () method constructs a queue and adds the current thread to the queue. Let's go back to the acquire () method.

final boolean acquireQueued(final Node node,int arg) {
    boolean Failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
        // 获取前级节点,如果未null,则抛出异常
            final Node p = node.predecessor();
        // 如果前级节点为head,并且执行抢占锁成功。
            if (p == head && tryAcquire(arg)) {
            // 抢占锁成功,当前节点成功新的head节点
                setHead(node);
                // 然后将原先的head节点指向null,方便垃圾回收进行回收
                p.next = null; // help GC
                Failed = false;
                return interrupted;
            }
            // 如果当前节点不为head,或者抢占锁失败。就根据节点的状态决定是否需要挂起线程。
            if (shouldParkAfterFailedAcquire(p,node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (Failed) // 如果获取锁异常,则出取消获取锁操作。
            cancelAcquire(node);
    }
}
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}

Let's take a look at how shouldparkafterfailedacquire() and parkandcheckinterrupt() methods suspend threads.

private static boolean shouldParkAfterFailedAcquire(Node pred,Node node) {
    int ws = pred.waitStatus;// 获取前级节点
    if (ws == Node.SIGNAL)// 如果前级节点的waitStatus值为SIGNAL(-1),说明当前节点也已经在等待唤醒了,直接返回true。
        return true;
  // 如果前级节点的waitStatus值大于0说明前级节点已经取消了。
    if (ws > 0) {
   // 如果前级节点已经是CANCEL状态了,那么会继续向前找,直到找到的节点不是CANCEL(waitStatue>0)状态的节点,然后将其设置为当前节点的前级节点。
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
    // 如果前级节点为0或者其他不为-1的小于0的值,则将当前节点的前级节点设置为 SIGNAL(-1)
        compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
    }
    return false;
}

The parkandcheckinterrupt() method is used to suspend the thread. If the shouldparkafterfailedacquire() method succeeds, it will execute the parkandcheckinterrupt() method. It suspends the current thread (waiting) through the park() method of locksupport. It needs the unpark() method to wake it up and realize the lock operation through such a FIFO mechanism.

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

Locksupport is a thread synchronization source language tool class provided by JDK from 1.6. Its two methods are mainly used here: suspend thread and wake thread:

public static void park() {
    UNSAFE.park(false,0L);
}
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

The suspended and awakened threads of locksupport are non reentrant. It is marked by a permission flag. When park() is called, the permission will be set to 0. The thread will be suspended. If park() is called again, the thread will be blocked. The license flag is set to 1 only when unpark() is called.

Reentrantlock release lock process

Reentrantlock releases locks in two stages:

public void unlock() {
   sync.release(1);
}

The method of releasing the lock is written in the parent class, abstractqueuedsynchronizer class. The source code is as follows:

// 释放独占模式下的锁资源
public final boolean release(int arg) {
    if (tryRelease(arg)) { // 尝试释放资源
        Node h = head;
  //释放成功后,判断头节点的状态是否为无锁状态,如果不为无锁状态就将头节点中的线程唤醒。
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false; // 释放资源失败,直接返回false
}

Let's first release the resources and look at the source code of the tryrelease () method to see how the resources are released.

protected final boolean tryRelease(int releases) {
// 从state中减去传入参数的相应值(一般为1)
    int c = getState() - releases;
    // 当释放资源的线程与独占锁现有线程不一致时,非法线程释放,直接抛出异常。
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 这里是处理重入锁的机制,因为可重入机制,所以每次都重入state值都加1,
    //所以在释放的时候也要相应的减1,直到state的值为0才算完全的释放锁资源。
    if (c == 0) {
        free = true;
        // 完全释放资源后,将独占线程设置为null,这样后面的竞争线程才有可能抢占。
        setExclusiveOwnerThread(null);
    }
    // 重新赋值state
    setState(c);
    return free;
}

When tryrelease () method releases lock resources, it can be simply understood as modifying the state value of exclusive mode and emptying the occupied thread. Subtract the corresponding parameter value from the value of state (usually 1). If the calculation result is 0, its exclusive thread will be set to null, and other threads will have a chance to preempt successfully. When adding a lock, the state value of the same thread will increase by 1. If it is not unlocked once, it will decrease by 1. The same lock can be re entered. Resources will be released only when the number of locks is the same as the number of unlocks. Set the exclusive thread to null 。

After releasing the resources, let's look at the process of waking up the suspended thread. This process is in the unparksuccess () method.

private void unparkSuccessor(Node node) {
    /* 获取当前节点的等待状态,一般是头节点,占有锁的节点是在头节点上。 */
    int ws = node.waitStatus;
    // 将当前节点的线程的状态值设为0,成为无锁状态。
    if (ws < 0)
        compareAndSetWaitStatus(node,0);
    /*
     * Thread to unpark is held in successor,which is normally
     * just the next node.  But if cancelled or apparently null,* traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;// 获取下一个需要唤醒的节点线程。
    if (s == null || s.waitStatus > 0) {// 如果获取到的节点线程为空或已经取消
        s = null;
        // 就从队列的后面向前找,直到找到一个未取消的节点。
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) // 如果获得的下一个可以唤醒的节点线程不为空,那么就唤醒它。
        LockSupport.unpark(s.thread);
}

This release process is to set the thread node to be released to the unlocked state, and then go to the queue to find the node that can be awakened to wake up the thread. One thing that needs to be explained is why you should look back and forward when looking for a node that can wake up? There is an English comment (7-12 lines) in the source code of the unparkwinner () method above, which I have retained.

When a thread wakes up, it usually starts from the next node thread of the current thread, but the next node may have been cancelled or null, so look back and forward until a non cancelled node thread is found.

Because the article is too long, I will summarize the locking and unlocking process of exclusive mode here. In the next article, I will summarize the process of AQS in shared mode through the locking and unlocking process of countdownlatch.

Analysis of AQS sharing mode

In fact, the sharing mode of AQS is a little easier to sum up than the exclusive mode, just a little easier to sum up.

Countdownlatch's process of obtaining shared resources

When using countdownlatch, the countdownlatch object is created first, and then the countdown () method is executed once after each task is executed. The execution is not completed until the value obtained through getcount() is 0. If the count value is not 0, the main thread can wait through the await() method until all tasks are completed, and the count value is set to 0. Let's first look at the method of creating countdownlatch.

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

private static final class Sync extends AbstractQueuedSynchronizer {
   private static final long serialVersionUID = 4982264981922014374L;
   Sync(int count) {
       setState(count);
   }
}

We see that the process of creating countdownlatch is actually the process of assigning the count value to state.

Let's look at the source code of await () method:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);// 等待可中断的获取共享资源的方法
}
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted()) // 如果线程已经中断,直接抛出异常结束。
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)// 尝试获取共享资源,获取失败后,自旋入队列
        doAcquireSharedInterruptibly(arg);// 可中断的入队列过程
}

The whole wait process of await () is to try to obtain the shared resources first. If the acquisition succeeds, the task will be executed. If the acquisition fails, the method will be called to spin into the waiting queue. When we first introduced AQS, we said that in the sharing mode, we need to implement the tryacquireshared () method to obtain shared resources. Let's see how countdownlatch realizes the acquisition of shared resources.

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

It's easy to understand. Just one line of code to directly obtain the state value. If it is equal to 0, it means success, and if it is not equal to 0, it means failure.

So how does the doacquiresharedinterruptible () method execute after the resource acquisition fails. The source code is as follows:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);// addWaiter()方法已经总结过了,这一步操作的目的就是将当前线程封装成节点加入队尾,并设置成共享模式。
    boolean Failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();// 获取前级节点
            if (p == head) {
            // 如果前级节点是头节点,直接尝试获取共享资源。
                int r = tryAcquireShared(arg);
                if (r >= 0) {// 如果获取共享资源成功,将head节点指向自己
                    setHeadAndPropagate(node,r);
                    p.next = null; // help GC 将原head节点指向空,方便垃圾回收。
                    Failed = false;
                    return;
                }
            }
            // 如果不是前级节点不是head节点,就根据前级节点状态,判断是否需要挂起线程。
            if (shouldParkAfterFailedAcquire(p,node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (Failed) // 如果执行失败,取消获取共享资源的操作。
            cancelAcquire(node);
    }
}

The method here is very similar to the acquirequeueueueueued () method in exclusive mode, except that it is different when setting the header node to wake up a new thread. It is in the setheadandpropagate () method.

private void setHeadAndPropagate(Node node,int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
   // 如果在唤醒完下一个节点后,资源还有剩余,并且新唤醒的节点状态不为无效状态,就继续唤醒队列中的后面节点里的线程。
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

The name of setheadandpropagate() method is translated into Chinese as "set header node and propagate". In fact, when obtaining shared lock resources, if there is more resources than used to wake up the next node, it will be used to wake up the subsequent nodes until the resources are used up. This fully reflects the "sharing" of the sharing mode.

Countdownlatch free resources

Let's look at how the countdown () method frees up resources. The source code is as follows:

public void countDown() {
    sync.releaseShared(1);
}

The releaseshared() method of the internal class sync in countdownlatch is the releaseshared() method of AQS.

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {// 尝试释放资源
        doReleaseShared();// 释放资源成功后,唤醒节点。
        return true;
    }
    return false;
}

The tryrereleaseshared() method that attempts to release resources is required to be implemented by AQS. The implementation of countdownlatch is as follows:

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0) // 若state为0,说明已经不需要释放资源了,直接返回false。
            return false;
        int nextc = c-1;
        if (compareAndSetState(c,nextc))// 真正的释放资源,是通过CAS的方式将state的值减1。
            return nextc == 0;
    }
}

In fact, the main operation is to reduce the value of state by 1 through CAS. After the resource is released successfully, it is time to wake up the node in the doreleaseshared () method.

private void doReleaseShared() {
  for (;;) {
      Node h = head;
      if (h != null && h != tail) {// 当头节点不为空,并且不等于尾节点时,从头开始唤醒。
          int ws = h.waitStatus;// 获取头节点的等待状态
          if (ws == Node.SIGNAL) {// 如果头节点状态为等待唤醒,那么将头节点的状态设置为无锁状态,若CAS设置节点状态失败,就自旋。
              if (!compareAndSetWaitStatus(h,Node.SIGNAL,0))
                  continue;            // loop to recheck cases
              unparkSuccessor(h);// 唤醒头节点
          }// 如果head节点的状态已经为无锁状态了,那么将head节点状态设置为可以向下传播唤醒的状态(PROPAGATE)。
          else if (ws == 0 &&
                   !compareAndSetWaitStatus(h,Node.PROPAGATE))
              continue;                // loop on Failed CAS
      }
      // 若在执行过程中,head节点发生的变化,直接跳出循环。
      if (h == head)                   // loop if head changed
          break;
  }
}

So far, the exclusive mode and sharing mode of AQS have been summarized in the process of obtaining and releasing shared resources. There are a lot of contents, which need to be digested. I can see the last powerful characters, because when I summarize this part of the content, I also consulted a lot of materials and read a lot of source code. It took me several days to summarize and understand what AQS is and how it is implemented.

In fact, AQS contains not only the contents summarized above, but also conditions As well as the interruptible acquisition of resources (acquireinterruptible [exclusive], acquireseredinterruptible [shared] acquire() and acquiresered() ignore interrupts during thread waiting), and how reentrantlock implements fair lock (in fact, when competing for resources, if there is a new thread, first judge whether there is a node in the queue. If there is a node, directly insert it into the tail of the queue and wait, and obtain resources in order).

By summarizing AQS, we can basically understand the source code of reentrantlock, countdownlatch, semaphore, etc. based on AQS, and even the cyclicbarrier and copyonwritearraylist on the upper layer. We can know what process it is by looking at the source code.

Finally, there are a lot of contents, and you are welcome to correct some bad things (if I can make it clear, I can't change it if I don't understand)

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>