Semaphore source code analysis

Semaphore source code analysis

1. Brief introduction to semephore

  generally speaking, the commonly used synchronization tools in Java are lock and synchronized, but Java also adds many new mechanisms, such as semaphores mentioned here. What he actually gives is the semaphore of the operating system. If a thread wants to run and needs to get a resource for a P operation, it is called the acquire method, then the V operation is called after running, and the release method is released to release the resource for other threads. If the resources are robbed by other threads, the thread can only be in the waiting state. That is, the - 1 returned by the P operation.

  the semaphores we mentioned above are multivalued semaphores, which are mainly used for synchronization between processes. Another is called binary semaphore, which is often mentioned in the operating system. Yes, he is the lock at this time! Because after a p operation, other threads can enter the critical area only after a V operation. Now it works the same as lock.

  write a simple program to feel the synchronization tool.

public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 4; i++) {
            new Thread(()->{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            }).start();
        }
    }
}

The final output is as follows: print it first

Thread-0
Thread-1

Printed every second

Thread-2
Thread-3

  you can see that only two threads execute simultaneously in the critical area. In fact, when we understand the AQS in the previous article, we can probably guess the implementation principle of semaphore. With the help of the state attribute in AQS, we will analyze the structure and principle of semaphore in detail.

2. Basic structure

1. Succession

null

2. Realization

Serializable  感觉好多类都能序列化啊!!有没有觉得。

3. Main fields

    private final Sync sync;

   OK, just a lock field. It's really straightforward. Then it proves that the previous guess is also correct. This sync must be an internal class that inherits AQS. In the previous article, we saw some shared locks in AQS. I didn't analyze them. In fact, shared locks are used here. Therefore, this article mainly introduces the remaining shared locks in AQS. Look at the overall structure.

4. Methodological overview

3. Analysis of main methods

1. ctor-2

   two construction methods specify whether the underlying layer adopts fair lock or unfair lock respectively! The value of permits is directly passed to the AQS parent class, that is, the AQS state attribute is set.

//默认创建公平锁,permits指定同一时间访问共享资源的线程数
public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

public Semaphore(int permits,boolean fair) {
     sync = fair ? new FairSync(permits) : new NonfairSync(permits);
 }

2. acquire

   this method is called directly, acquiresharedinterruptible (1). Without looking at the code, let's introduce the process: when the thread calls acquire and the number of resources represented by the state value is sufficient, the requesting thread will obtain the synchronization status, that is, access to shared resources, and update the state value (generally minus 1 for the state value), but if the number of permissions represented by the state value is 0, the requesting thread will not be able to obtain the synchronization status, Threads will be added to the synchronization queue and blocked until other threads release the synchronization state (generally adding 1 to the state value) before they can obtain access to shared resources.

  now, let's analyze the acquireseareinterruptable method. Obviously, it can be interrupted, that is, we can also be interrupted when acquiring. The first thing to enter the method is to determine whether an interrupt event has occurred. Then tryAcquireShared is called to get the shared lock. If it fails, call doacquireesharedinterruptible to join the waiting queue. Doacquiresharedinterruptible this method is really the same as acquirequeueueueueued method, and most of the code is the same!!! Only a small part is different. I have annotated it.

// 这个方法真的是似曾相识,感觉就是 acquireQueued 的方法稍稍修改了一下
    /*
        1. 那个方法不抛异常
        2. 那个方法在调用前把节点加入了等待队列,封装了独占锁
        3. 那个方法设置标志位,这里直接抛异常

     */
    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        // 加入等待队列
        final Node node = addWaiter(Node.SHARED);
        boolean Failed = true;
        try {
            for (;;) {
                // 前驱
                final Node p = node.predecessor();
                // 如果前驱是 head 。自旋获取锁  简直一样!!!
                // 就是写法不一样,意思一模一样
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node,r);
                        p.next = null; // help GC
                        Failed = false;
                        return;
                    }
                }
                // 那个方法设置标志位,这里直接抛异常
                if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (Failed)
                cancelAcquire(node);
        }
    }

    then there is the problem of the template method. Tryacquireshared has different implementations in fair locks and unfair locks. Here are the differences between the two. In the fair lock, the operation is the same as that in the reentrantlock. First, judge whether there are other waiting threads in the synchronization queue. If there are, it will directly return the failure. Otherwise, subtract the state value and return the remaining semaphores. Note that if the semaphore we request is greater than the available semaphore, the returned value is negative, and our method is required to return a positive value in doacquiresharedinterstructibly before execution can continue. For example, when we start with 2 semaphores and the first thread directly requests 10, this thread will be suspended and let other threads grab resources. If it is found that the number of resources after the request is negative, it will not update the state.

  then analyze the unfair lock. The unfair lock directly calls nonfairtryacquireshared in the parent class. Like reentrantlock, it calls nonfairtryacquire in the parent class. This method is relatively simple, that is, delete the judgment of synchronization queue in tryacquireshared above. Note that the above methods are all dead loops and will not jump out of the loop until there is a conditional correspondence.

// 非公平锁的获取方式
protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available,remaining))
                    return remaining;
            }
        }
        
// 公平锁获取
protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;   // 仅仅多了这一行代码而已。
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 || compareAndSetState(available,remaining))
                    return remaining;
            }
        }

3. release

   the release method is mainly used to release resources. Remind us to ensure the implementation of release, otherwise the thread exits but the resources are not released. So generally, this code is best written in finally. In addition, the number of resources to be released depends on the number of resources obtained. Otherwise, the resources have not been definitely released. For example, acquire (10) is executed at the beginning, and release (10) cannot be written only when it is finally released.

As we can see before, we can see that the release lock operation in the release method is certainly a public operation. After all, releasing the lock does not involve fairness and injustice. The releaseShared in AQS is invoked in release. This method will no doubt call tryrereleaseshared first. This is implemented by subclasses such as template methods. In fact, it is implemented in sync. The logic is also very simple, that is, incremental operation on state.

  if tryrereleaseshared succeeds, execute dorreleaseshared to wake up the subsequent thread. Note that there is a release method in the reentrantlock. It directly attempts to release the lock. If it succeeds, it wakes up the successor node. The logic of the two is basically the same. In fact, the code structure and logic of the release and releaseshared methods are exactly the same. The difference is that the operations of the nodes in the wake-up synchronization queue after tryacquire are different. Otherwise, there is no need to do two methods.

    //  尝试释放锁
    public final boolean release(int arg) {
        // 如果释放锁成功 唤醒同步队列中的后继节点
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    // 为了方便对比把两个代码放在一块 可以看到 release 中的结构完全一样
    // 区别就在于 doReleaseShared 中有更多的判断操作
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();  //在里面执行的 unparkSuccessor(h)
            return true;
        }
        return false;
    }

  focus on doreleaseshared, a special method. Due to the characteristics of sharing, subsequent nodes need to be awakened in the process of obtaining and releasing locks, because multiple threads can enter the critical area at the same time. The main functions of this method are:

// 这个方法中与 release 中的唤醒不同点在于他保证了释放动作的传递
    // 如果后继节点需要唤醒,则执行唤醒操作,如果没有后继节点则把头设置为 PROPAGATE
    // 这里的死循环和其他的操作中的死循环一样,为了检测新的节点进入队列
    // 其实这个方法比较特殊,在 acquireShared 和 releaseShared 中都被执行了,主要就是共享模式允许多个线程进入临界区
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 等待队列中有正在等待的线程
            if (h != null && h != tail) {
                // 获取头节点对应的线程的状态
                int ws = h.waitStatus;
                // 如果头节点对应的线程是SIGNAL状态,则意味着头结点正在运行,后继结点所对应的线程需要被唤醒。
                if (ws == Node.SIGNAL) {
                    // 修改头结点,现在后继节点要成为头结点了,状态设置初始值
                    if (!compareAndSetWaitStatus(h,Node.SIGNAL,0))
                        continue;
                    // 唤醒头结点h的后继结点所对应的线程
                    unparkSuccessor(h);
                }
                // 队列中没有等待线程,只有一个正在运行的线程。
                // 将头结点设置为 PROPAGATE 标志进行传递唤醒
                else if (ws == 0 && !compareAndSetWaitStatus(h,Node.PROPAGATE))
                    continue;                // loop on Failed CAS
            }
            // 如果头结点发生变化有一种可能就是在 acquireShared 的时候会调用 setHeadAndPropagate 导致头结点变化,则继续循环。
            // 从新的头结点开始唤醒后继节点。
            if (h == head)                   // loop if head changed
                break;
        }
    }

    //  被 acquireShard 调用
    private void setHeadAndPropagate(Node node,int propagate) {
        Node h = head; // Record old head for check below
        // 当前线程设为头结点
        setHead(node);
        // propagate 代表的是当前剩余的资源数,如果还有资源就唤醒后面的共享线程,允许多个线程获取锁,
        // 或者还有一个比较有意思的条件就是 h.waitStatus < 0 他其实是说 h.waitStatus 要么是 signal 要么是 propagate
        // 从而唤醒后继节点
        if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

4. Summary

   well, in fact, the analysis is almost here. In fact, some other rarely used methods are written in a similar way, such as judgment without interruption, etc.    in summary, semaphore is a signal synchronization tool. Threads enter the critical area by applying for resources and releasing resources from semaphore. Generally, there are many resources, so we allow multiple threads to enter the critical area at the same time. When there is only one resource, it degenerates into a lock, that is, only one thread is allowed to enter the critical area. However, we use more threads to enter. After all, lock may do better if we want to use lock.    well, semaphore is generally considered that multiple threads can enter the critical area at the same time, so our underlying implementation can not use exclusive locks, but use shared locks, that is, the implementation of another part of "Jiangshan" shared locks in AQS. The number of resources is directly maintained by the state variable in AQS.

  the following is a complete process:

OK, after reading these, I basically know about semaphore, and I have a complete introduction to the shared lock in AQS.

The content of this article comes from the network collection of netizens. It is used as a learning reference. The copyright belongs to the original author.
THE END
分享
二维码
< <上一篇
下一篇>>