千家信息网

如何理解AQS源码

发表于:2025-02-08 作者:千家信息网编辑
千家信息网最后更新 2025年02月08日,本篇内容介绍了"如何理解AQS源码"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!AQS abstra
千家信息网最后更新 2025年02月08日如何理解AQS源码

本篇内容介绍了"如何理解AQS源码"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

AQS abstractQueueSynchronizer(抽象队列同步器),是什么?

答:它是用来构建锁 或者 其他同步器组件的重量级基础框架,是整个JUC体系的基础。通过内置FIFO队列来完成获取线程取锁的排队工作,并通过一个int类型变量标识持有锁的状态;

前置知识点:

1、可重入锁(递归锁):

sync(隐式锁,jvm管理)和ReentrantLock(Lock显式锁,就是手动加解)是重入锁的典型代表,为可以重复使用的锁。一个变成多个流程,可以获取同一把锁。

可重入锁概念: 是指一个线程,在外层方法获取锁的时候,再次进入该线程的内层方法会自动获取锁(必须是同一个对象),不会被阻塞。可避免死锁

举例: 递归调用同一个 sync修饰的方法或者代码块。必须是一个对象才行。一个线程调用一个对象的method1,method1 调用method2,method2调用method3, 3个方法都是被sync修饰,这样也是一个可重入锁的例子 。

再比如下面这种

static Object lock = new Object();public void mm(){    synchronized (lock){        System.out.println("===========mm method");        synchronized (lock){            System.out.println("=========== method");        }    }}

只有一个对象 和同步代码块,如果sycn中嵌套sync 并都是lock对象,那么该线程就会持有当前对象的锁,并可重入。反编译后发现

public void mm();    Code:       0: getstatic     #7                  // Field lock:Ljava/lang/Object;       3: dup       4: astore_1       5: monitorenter       6: getstatic     #8                  // Field java/lang/System.out:Ljava/io/PrintStream;       9: ldc           #9                  // String ===========mm method      11: invokevirtual #10                 // Method java/io/PrintStream.println:(Ljava/lang/String;)V      14: aload_1      15: monitorexit      16: goto          24      19: astore_2      20: aload_1      21: monitorexit      22: aload_2      23: athrow      24: return    Exception table:       from    to  target type           6    16    19   any          19    22    19   any

sync同步代码块加解锁,使用的命令为monitorenter 和 monitorexit(同步方法标识是ACC_SYNCHRONIZED,在flag中),enter 为加锁,必须成对出现,但这里却又两个exit。原因为第一个exit为程序正常运行后的解锁命令,并执行完后会执行goto到return ,也就是第24行,

第二个exit 为当程序出现异常时,需要执行的解锁命令;

如上就是可重入锁的相关概念

2、什么是LockSupport?

根据jdk8 的api文档显示定义为: 用于创建锁和其他同步类的基本线程阻塞原语;

是一个线程阻塞工具类,所有方法均为静态,可以让线程在任意位置阻塞,阻塞后也有对应的唤醒方法。

先复习下object 对象的wait 和 notify 和Lock 的condition

  • wait 和notify 必须在sync 代码块中才能使用,否则报错。非法的监视器

  • condition的await 和 signal方法也必须在lock 和unlock方法前执行,否则报错,非法的监视器

  • 线程一定要先 等待 ,再 被 唤醒,顺序不能换

LockSupport 有两个关键函数 park 和unpark,该类使用了Permit(许可证)的概念来阻塞和唤醒线程的功能。每个线程都会有一个Permit,该Permit 只有两个值 0 和1 ,默认是0。类似于信号量,但上限是1;

来看park方法

public static void park() {    //unsafe的方法。初始为0    UNSAFE.park(false, 0L);}

禁止当前线程进行线程调度,除非Permit可用,就是1

如果Permit 为1(有可用证书) 将变更为0(线程仍然会处理业务逻辑),并且立即返回。否则当前线程对于线程调度目的将被禁用,并处于休眠状态。直至发生三件事情之一:

  • 一些其他线程调用当前线程作为目标的unpark ; 要么

  • 其他一些线程当前线程为interrupts ; 要么

  • 电话虚假(也就是说,没有理由)返回。

这种方法不报告是哪个线程导致该方法返回。 来电者应重新检查导致线程首先停放的条件。 呼叫者还可以确定线程在返回时的中断状态。

小结:Permit默认0,所以一开始调用park,当前线程被阻塞,直到别的线程将当前线程的Permit修改为1,从park方法处被唤醒,处理业务,然后会将permit修改为0,并返回;如果permit为1,调用park时会将permit修改为0,在执行业务逻辑到线程生命周期。与park方法定义吻合。

在看unpark方法:

public static void unpark(Thread thread) {    if (thread != null)        UNSAFE.unpark(thread);}

在调用unpark方法后,会将Thread线程的许可permit设置成1,会自动唤醒thread线程,即,之前阻塞中的LockSupport.park方法会立即返回,然后线程执行业务逻辑 。 且 unpark可以在park之前执行。相当于执行park没有效果。

3、AQS abstractQueueSynchronizer 源码

剩余前置知识为: 公平锁、非公平锁、自旋锁、链表、模板设计模式

AQS使用volatile修饰的int类型的变量 标识锁的状态,通过内置的FIFO队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成node节点实现锁的分配,通过CAS(自旋锁)完成对state值的修改 ;

(1)node节点源码

static final class Node {        /** Marker to indicate a node is waiting in shared mode */            //共享节点        static final Node SHARED = new Node();        /** Marker to indicate a node is waiting in exclusive mode */            //独占节点        static final Node EXCLUSIVE = null;        /** waitStatus value to indicate thread has cancelled */            //线程被取消状态        static final int CANCELLED =  1;        /** waitStatus value to indicate successor's thread needs unparking */            //      后续线程需要唤醒        static final int SIGNAL    = -1;        /** waitStatus value to indicate thread is waiting on condition */            //邓丹condition唤醒        static final int CONDITION = -2;        /**         * waitStatus value to indicate the next acquireShared should         * unconditionally propagate         */            //共享室同步状态获取 将会无条件传播下去        static final int PROPAGATE = -3;        /**         * Status field, taking on only the values:         *   SIGNAL:     The successor of this node is (or will soon be)         *               blocked (via park), so the current node must         *               unpark its successor when it releases or         *               cancels. To avoid races, acquire methods must         *               first indicate they need a signal,         *               then retry the atomic acquire, and then,         *               on failure, block.         *   CANCELLED:  This node is cancelled due to timeout or interrupt.         *               Nodes never leave this state. In particular,         *               a thread with cancelled node never again blocks.         *   CONDITION:  This node is currently on a condition queue.         *               It will not be used as a sync queue node         *               until transferred, at which time the status         *               will be set to 0. (Use of this value here has         *               nothing to do with the other uses of the         *               field, but simplifies mechanics.)         *   PROPAGATE:  A releaseShared should be propagated to other         *               nodes. This is set (for head node only) in         *               doReleaseShared to ensure propagation         *               continues, even if other operations have         *               since intervened.         *   0:          None of the above         *         * The values are arranged numerically to simplify use.         * Non-negative values mean that a node doesn't need to         * signal. So, most code doesn't need to check for particular         * values, just for sign.         *         * The field is initialized to 0 for normal sync nodes, and         * CONDITION for condition nodes.  It is modified using CAS         * (or when possible, unconditional volatile writes).         */            //初始为0,状态是上面几种,标识当前节点在队列中的状态        volatile int waitStatus;        /**         * Link to predecessor node that current node/thread relies on         * for checking waitStatus. Assigned during enqueuing, and nulled         * out (for sake of GC) only upon dequeuing.  Also, upon         * cancellation of a predecessor, we short-circuit while         * finding a non-cancelled one, which will always exist         * because the head node is never cancelled: A node becomes         * head only as a result of successful acquire. A         * cancelled thread never succeeds in acquiring, and a thread only         * cancels itself, not any other node.         */            //前置节点        volatile Node prev;        /**         * Link to the successor node that the current node/thread         * unparks upon release. Assigned during enqueuing, adjusted         * when bypassing cancelled predecessors, and nulled out (for         * sake of GC) when dequeued.  The enq operation does not         * assign next field of a predecessor until after attachment,         * so seeing a null next field does not necessarily mean that         * node is at end of queue. However, if a next field appears         * to be null, we can scan prev's from the tail to         * double-check.  The next field of cancelled nodes is set to         * point to the node itself instead of null, to make life         * easier for isOnSyncQueue.         */            //后置节点        volatile Node next;        /**         * The thread that enqueued this node.  Initialized on         * construction and nulled out after use.         */            //当线程对象        volatile Thread thread;        /**         * Link to next node waiting on condition, or the special         * value SHARED.  Because condition queues are accessed only         * when holding in exclusive mode, we just need a simple         * linked queue to hold nodes while they are waiting on         * conditions. They are then transferred to the queue to         * re-acquire. And because conditions can only be exclusive,         * we save a field by using special value to indicate shared         * mode.         */        Node nextWaiter;        /**         * Returns true if node is waiting in shared mode.         */        final boolean isShared() {            return nextWaiter == SHARED;        }        /**         * Returns previous node, or throws NullPointerException if null.         * Use when predecessor cannot be null.  The null check could         * be elided, but is present to help the VM.         *         * @return the predecessor of this node         */        final Node predecessor() throws NullPointerException {            Node p = prev;            if (p == null)                throw new NullPointerException();            else                return p;        }

node节点就是每一个等待执行的线程。还有一个waitState状态字段,标识当前等待中的线程状态

根据node节点 绘画一个aqs基本结构图

解释:state为状态位,aqs为同步器。有head 和tail两个 头 尾节点,当state = 1时,表明同步器被占用(或者说当前有线程持有了同一个对象的锁),将后续线程添加到队列中,并用双向链表连接,遵循FIFO。

(2)以ReentrantLock的实现分析。因为他也实现了Lock 并内部持有同步器sync和AQS(以银行柜台例子)

new ReentrantLock()或 new ReentrantLock(false)时,创建的是非公平锁,而 ReentrantLock对象内部还有 两个类 分别为公平同步器和非公平同步器

static final class NonfairSync extends Sync//公平锁 有一个判断队列中是否有排队的线程,这是与上面锁不同的获取方式static final class FairSync extends Sync

公平锁解释:先到先得,新线程在获取锁时,如果这个同步器的等待队列中已经有线程在等待,那么当前线程会先进入等待队列;

非公平锁解释:新进来的线程不管是否有等待的线程,如果可以获取锁,则立刻占有锁。

这里还有一个关键的模板设计模式: 在查询aqs的tryAcquire方法时发现,该方法直接抛出异常,这就是典型的模板设计模式,强制要求子类重写该方法。否则不让用

1.1 当线程a到柜台办理业务时,会调用sync 的lock,即 a线程调用lock方法

final void lock() {    //利用cas将当前对象的state 从0 设置成1,当然里面还有一个偏移量    //意思就是如果是0 就设置为1成功返回true    if (compareAndSetState(0, 1))        setExclusiveOwnerThread(Thread.currentThread());    else        acquire(1);}

因为是第一个运行的线程,肯定是true所以,将当前运行的线程设置为a,即a线程占用了同步器,获取了锁

1.2 当b线程运行lock时,发现不能将0设置成1(cas思想),就会运行acquire(1)方法

public final void acquire(int arg) {    //这里用到了模板设计模式,强制子类实现该方法    //因为默认使用非公平锁,所以看NonfairSync     if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}static final class NonfairSync extends Sync {    private static final long serialVersionUID = 7316153563782823691L;    /**         * Performs lock.  Try immediate barge, backing up to normal         * acquire on failure.         */    final void lock() {        if (compareAndSetState(0, 1))            setExclusiveOwnerThread(Thread.currentThread());        else            acquire(1);    }    protected final boolean tryAcquire(int acquires) {        return nonfairTryAcquire(acquires);    }}//该方法就是非公平锁执行 等待的方法final boolean nonfairTryAcquire(int acquires) {    //获取当前b线程    final Thread current = Thread.currentThread();    //获取当前锁的状态是1,因为a线程已经获取,并将state修改为1    int c = getState();    //有可能b在设置state时,a正办理,到这儿时,a办理完了。state为0了。    if (c == 0) {        //乐观的将state 从0 修改为 1        if (compareAndSetState(0, acquires)) {            //设置当前获取锁的线程为b            setExclusiveOwnerThread(current);            return true;        }    }    //有可能 a线程办完业务。又回头办理了一个,所以当前线程持有锁的线程依旧是a    else if (current == getExclusiveOwnerThread()) {        //2        int nextc = c + acquires;        if (nextc < 0) // overflow            throw new Error("Maximum lock count exceeded");        //设置state的值        setState(nextc);        return true;    }    //如果b线程走到这里,就证明b必须到等待队列里去了    return false;}

再来看逻辑运算符后面的逻辑

private Node addWaiter(Node mode) {    //实例化一个节点,并将b 和 节点类型封装成node    Node node = new Node(Thread.currentThread(), mode);    //等待队列为null 所以tail初始化肯定是null    //如果是线程c在b之后进来,tail就是b 节点    Node pred = tail;    //c节点之后都走这个方法    if (pred != null) {        //node的前置为tail        //c 的前置设置为b        node.prev = pred;        //cas乐观,比较 如果当前节点仍然是b 就将b 设置成c        //b就是tail尾节点,将tail设置成c        //这里根据源码可知,就是将tail的值设置成c 并不影响pred的值,还是b        if (compareAndSetTail(pred, node)) {            //b 的下一个节点设置成c            pred.next = node;            return node;        }    }    //线程b 入等待队列    enq(node);    return node;}//入队列方法private Node enq(final Node node) {    //该方法类似于while(true)    for (;;) {        //获取tail节点        Node t = tail;        //初始化锁等待队列        if (t == null) { // Must initialize            //设置头部节点为新的节点            //这里看出,锁等待队列的第一个节点并非b,而是一个空node,该node为站位节点或者叫哨兵节点            if (compareAndSetHead(new Node()))                //将头尾都指向该节点                tail = head;        } else {            //第二次循环时,t为空node,将b的前置设置为空node            node.prev = t;            //设置tail节点为b节点            if (compareAndSetTail(t, node)) {                //空node节点的下一个节点为b node节点                t.next = node;                return t;            }        }    }}/** * CAS head field. Used only by enq. */private final boolean compareAndSetHead(Node update) {    return unsafe.compareAndSwapObject(this, headOffset, null, update);}

以上b线程的进入等待队列的操作就完成了 ,但线程还是活跃的,如何阻塞的呢?

下面接着看acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        //自旋执行        for (;;) {            //如果是b线程,这里p就是b节点的前置节点            final Node p = node.predecessor();            //空节点就是head节点,但又调用了一次tryAcquire方法,想再尝试获取锁资源            //如果a线程未处理完,那么这里返回false            //如果a线程处理完成,那么这里就可以获取到锁            if (p == head && tryAcquire(arg)) {                //将head设置成b节点                setHead(node);                //原空节点的下连接断开                p.next = null; // help GC                failed = false;                return interrupted;            }            //第一次空节点进入should方法。返回false            //当第二次循环到此处should方法返回true            //执行parkAndCheckInterrupt方法,会将当前线程park,并获取b线程的中断状态,如果未中断返回false,并再次自旋一次 ,中断为true            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {    //head节点就是空节点所以w=0    //空节点第二次进入时就是-1    int ws = pred.waitStatus;    if (ws == Node.SIGNAL)        return true;    //如果b节点状态是其他,则将节点连接变化一下    if (ws > 0) {        do {            node.prev = pred = pred.prev;        } while (pred.waitStatus > 0);        pred.next = node;    } else {        //ws = 0时,使用cas将验证pred 和ws 的值,是空节点和0  并将ws修改为-1        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);    }    return false;}private final boolean parkAndCheckInterrupt() {    LockSupport.park(this);    return Thread.interrupted();}

以上b线程未获取锁 并被挂起的操作就完成了

1.3 当a线程调用unlock方法时:

public void unlock() {    sync.release(1);}public final boolean release(int arg) {    //判断a线程是否完成业务。并释放锁,state=0    if (tryRelease(arg)) {        //获取头部节点,就是空节点        Node h = head;        //空节点在b获取锁时,状态变更为-1,所以这里是true        if (h != null && h.waitStatus != 0)            //唤醒线程            unparkSuccessor(h);        return true;    }    return false;}//aqs父类的模板方法,强制要求子类实现该方法protected boolean tryRelease(int arg) {    throw new UnsupportedOperationException();}protected final boolean tryRelease(int releases) {    //将锁的状态设置为0    int c = getState() - releases;    //判断当前线程与 锁的独占线程是否一致    if (Thread.currentThread() != getExclusiveOwnerThread())        throw new IllegalMonitorStateException();    //所得状态标识    boolean free = false;    if (c == 0) {        free = true;        //如果state=0证明a完成了业务。那么锁的独占状态就应该恢复为null        setExclusiveOwnerThread(null);    }    //恢复锁的state状态    setState(c);    return free;}

注意:这里state是减1操作。如果ReentrantLock不断可重入,那么这里是不能一次就归零的。所以才会有ReentrantLock 调了几次lock 就是要调几次unlock

a线程唤醒b线程

private void unparkSuccessor(Node node) {        int ws = node.waitStatus;    if (ws < 0)        //空节点-1,设置成0        compareAndSetWaitStatus(node, ws, 0);        //获取b节点,非null 且 waitState=0    Node s = node.next;    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    if (s != null)        //将b线程唤醒        LockSupport.unpark(s.thread);}

而 b线程还在acquireQueued方法里自旋呢,不过自旋后就会获取锁。

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        //b线程在a未释放锁之前一直在自旋,        for (;;) {            final Node p = node.predecessor();            //当a释放锁后,b获取到锁,将state设置为1            //并将空节点的所有连接断开等待GC回收            //并返回当前线程 b 的中断状态            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            if (shouldParkAfterFailedAcquire(p, node) &&                //park当前线程b 并获取b的中断状态,肯定是false,且调用的是带参的native方法,多次调用会重置b线程的中断状态                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}//调用该方法的if判断如果进入了。会将b 中断,并在不断循环中再重置中断状态为false

1.4 c线程的执行流程与b线程类似。会将b节点充等待队列中移除,遵循FIFO

"如何理解AQS源码"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

0