千家信息网

AbstractQueuedSynchronizer 的示例分析

发表于:2025-01-18 作者:千家信息网编辑
千家信息网最后更新 2025年01月18日,这篇文章给大家介绍AbstractQueuedSynchronizer 的示例分析,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。AbstractQueuedSynchronize
千家信息网最后更新 2025年01月18日AbstractQueuedSynchronizer 的示例分析

这篇文章给大家介绍AbstractQueuedSynchronizer 的示例分析,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。

AbstractQueuedSynchronizer

AQS是一个内部维护了先进先出队列+标识(数字)的模型,标识使用CAS模式修改,作为多线程工具的基础组件

属性

属性名说明
volatile Node head为头节点会清理Node中关联的pre,thread避免GC不回收
volatile Node tail尾节点
volatile int state0为空闲其它组件按需使用,使用cas来赋值,
Thread exclusiveOwnerThread持有线程

state为volatile的int,不同的业务场景按需实现

独占模式:

ReentrantLock.Sync中state为0表示未锁定>0表示被几个线程持有

ThreadPoolExecutor.Worker中state为0表示未执行

共享模式:

CountDownLatch.Sync中state为初始化时指定,表示有多少个线程可持有,

Semaphore.Sync中state与CountDownLatch相同

混合模式:

ReentrantReadWriteLock.Sync中state为共享锁+独占锁组合 通过位运算16位来分割,最大的读锁写锁个数为65535

关键方法

独占模式

持有资源:acquire

acquire: 独占模式获取,独占模式即只有一个线程可更新state.忽略中断标识,在获取之后响应中断。

acquireInterruptibly:独占模式获取,线程标识为中断则抛出异常

public final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}

tryAcquire:子类按需实现,使用独占模式更新state,增加state,成功返回true失败返回false

中断后不会正确响应park,所以需要重置线程中断标识,并在unpark之后进行中断补偿

释放资源:release

release:以独占模式释放资源

public final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}

tryRelease:子类按需实现,使用独占模式更新state,减少state,并处理对应独占线程

共享模式

持有资源:acquireShared
  • acquireShared 共享模式获取,忽略中断线程,在获取之后相应中断。

  • acquireSharedInterruptibly 共享模式获取,响应中断,线程中断则抛出异常。

public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}

tryAcquireShared:子类按需实现,对返回值有如下要求:

负值:失败。 零:共享模式下的获取成功,但是没有后续共享模式下的获取可以成功。 正值: 如果共享模式下的获取成功并且后续共享模式下的获取也可能成功,则为正值,在这种情况下,后续的等待线程必须检查可用性。 (对三个不同返回值的支持使该方法可以在仅有时进行获取的情况下使用。)成功后,就已经获取了此对象。

释放资源:releaseShared
  • releaseShared:以共享模式释放资源

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}

tryReleaseShared:子类按需实现,使用共享模式更新state,减少state

其它关键方法

检查并更新节点状态:shouldParkAfterFailedAcquire

在park线程之前的判断,当前置节点为取消时更新前置节点

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {        int ws = pred.waitStatus;        if (ws == Node.SIGNAL)            /*             * This node has already set status asking a release             * to signal it, so it can safely park.             */            return true;        if (ws > 0) {            /*             * Predecessor was cancelled. Skip over predecessors and             * indicate retry.             */            do {                node.prev = pred = pred.prev;            } while (pred.waitStatus > 0);            pred.next = node;        } else {            /*             * waitStatus must be 0 or PROPAGATE.  Indicate that we             * need a signal, but don't park yet.  Caller will need to             * retry to make sure it cannot acquire before parking.             */            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);        }        return false;    }
唤醒后续节点:unparkSuccessor

唤醒后续节点,并清理取消的节点,

private void unparkSuccessor(Node node) {    /*     * If status is negative (i.e., possibly needing signal) try     * to clear in anticipation of signalling.  It is OK if this     * fails or if status is changed by waiting thread.     */    int ws = node.waitStatus;    if (ws < 0)        compareAndSetWaitStatus(node, ws, 0);    /*     * Thread to unpark is held in successor, which is normally     * just the next node.  But if cancelled or apparently null,     * traverse backwards from tail to find the actual     * non-cancelled successor.     */    Node s = node.next;    if (s == null || s.waitStatus > 0) {        s = null;        for (Node t = tail; t != null && t != node; t = t.prev)            if (t.waitStatus <= 0)                s = t;    }    if (s != null)        LockSupport.unpark(s.thread);}
共享模式设置队列头:setHeadAndPropagate

共享模式下多个线程同时持有资源,头节点会频繁变化需要及时释放资源

private void setHeadAndPropagate(Node node, int propagate) {    Node h = head; // Record old head for check below    setHead(node);    /*     * Try to signal next queued node if:     *   Propagation was indicated by caller,     *     or was recorded (as h.waitStatus either before     *     or after setHead) by a previous operation     *     (note: this uses sign-check of waitStatus because     *      PROPAGATE status may transition to SIGNAL.)     * and     *   The next node is waiting in shared mode,     *     or we don't know, because it appears null     *     * The conservatism in both of these checks may cause     * unnecessary wake-ups, but only when there are multiple     * racing acquires/releases, so most need signals now or soon     * anyway.     */    if (propagate > 0 || h == null || h.waitStatus < 0 ||        (h = head) == null || h.waitStatus < 0) {        Node s = node.next;        if (s == null || s.isShared())            doReleaseShared();    }}

Node

属性名说明
int waitStatus1:CANCELLED,表示当前的线程被取消;
-1:SIGNAL,后续节点需要unpark;
-2:CONDITION,表示当前节点在等待condition,也就是在condition队列中;
-3:PROPAGATE,A releaseShared应该传播到其他节点。 在doReleaseShared中对此进行了设置(仅适用于头节点),以确保传播继续进行,即使此后进行了其他操作也是如此。
0:表示当前节点在sync队列中,等待着获取锁。
Node prev前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
Node next后继节点。
Thread thread入队列时的当前线程。
Node nextWaiter为NULL表示为独占模式
PROPAGATE:共享模式中会通过状态是否小于0来判断是否需要唤醒后续节点,共享模式下多个线程可同时持有state变更,waitStatus会频繁从0切换为SIGNAL,区分SIGNAL增加的中间状态所以称为传播值

关于AbstractQueuedSynchronizer 的示例分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

0