怎么从源码看AQS
本篇内容介绍了"怎么从源码看AQS"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
AQS实际上是操作以Node为元素的队列,Node包含了所属线程,先以不公平锁分析:lock时先尝试获取锁,获取失败则进入队列且被阻塞(期间可以被打断)等待,当锁被释放的时候,如果队列不为空,则唤醒头节点的下一个节点NEXT,如果此时被新线程NT拿到锁,则NEXT继续进入阻塞等待,当NT释放锁时,头节点的下一个节点还是NEXT再次被唤醒,如果此时NEXT获得锁,则将NEXT设置为头节点,这就是一个大概的流程,顺着这个思路,再来一步步看实现代码。
AQS->AbstractQueuedSynchronizer,看名字 抽象队列同步器,首先得有队列。
先来看队列节点类,Node:
static final class Node { static final Node SHARED = new Node(); //共享模式标记 static final Node EXCLUSIVE = null;//独占模式标记 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus;//取值为 上面的4个int,初始化为0 volatile Node prev;//前一个节点 volatile Node next;//下一个节点 volatile Thread thread; Node nextWaiter;//下一个等待节点 SHARED,EXCLUSIVE //如果节点在共享模式下等待,则返回true final boolean isShared() { return nextWaiter == SHARED; } //返回前一个节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { } //以下是Node的两种不同的使用方式 Node(Thread thread, Node mode) { this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { this.waitStatus = waitStatus; this.thread = thread; } }
注意上面两种不同的节点使用方式,要构建队列那么还得有 队列的首尾表示节点,
//队列头节点,除开初始化,只能通过setHead方法设置 private transient volatile Node head; //队列尾节点,只能通过enq方法设置 private transient volatile Node tail; //同步状态 private volatile int state; //初始状态为0
那么具体的实现或者用法,我们还是通过举例来说明:
ReentrantLock-可重入锁,下面以RL代表 ,以AQS代表AbstractQueuedSynchronizer。
在RL中有个静态内部类,Sync 是AbstractQueuedSynchronizer的抽象子类。在Sync的基础上继续扩展出两个实现子类 NonfairSync(非公平)和FairSync(公平)。而RL实现锁机制就是建立在 Sync的基础之上的。
先说说NonfairSync(以nfs简称),非公平锁:
当我们使用RL.lock()获取锁,实际调用是nfs的lock方法
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1);}
compareAndSetState就是使用CAS乐观锁来修改AQS的state值,由0 改为1,有疑问可以参考UnSafe类实现CAS乐观锁,然后将当前线程设置为锁的占有线程。acquire方法是由AQS实现的,
//尝试获取锁失败,则进入队列public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果在acquireQueued期间线程标记中断,那么将进入线程中断selfInterrupt。 selfInterrupt(); }
tryAcquire是由子类来实现的,这里也就是由nfs实现,主要是再次尝试获取锁(可重入锁),来看具体实现:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //如果state为0 , 则尝试获取锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果锁已经被占用,但是占有者是当前线程,那么将state + 1,即重入锁,最大值为Integer.MAX_VALUE else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;//否则返回false }
第二个条件 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),EXCLUSIVE独占锁标志,两个方法都是AQS实现的:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); //先判断队列的尾节点是否为空,此处是一个快速尝试,失败了就继续走enq方法 Node pred = tail; if (pred != null) {//尝试将尾节点设置为node,设置成功则返回 node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node);//将node插入队列尾部,然后返回,也是由CAS实现,注意 enq会初始化一个空的Node(无参构造函数)作为head节点 return node; }final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //进入循环 for (;;) { final Node p = node.predecessor();//获取node的上一个节点p if (p == head && tryAcquire(arg)) {//如果p是头节点,而且当前线程拿到了锁,那么就将node设为头节点,注意这里可能会有最新来的线程和p节点所属的线程进行竞争,竞争失败则继续进入阻塞等待,当锁被释放时,又会唤醒头节点的下一个节点 setHead(node); p.next = null; // 方便回收 failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //线程会阻塞,等待释放,直到prev节点完全释放锁的时候会被唤醒,然后开始再次循环。shouldParkAfterFailedAcquire 主要是修改node的waitStatus。parkAndCheckInterrupt 主要阻塞当前线程,期间线程可能会被中断,当线程被唤醒之后,再判断是否被中断。 interrupted = true; } } finally { if (failed) cancelAcquire(node); //取消获得的锁 } }//因为是普通节点,所以waitStatus的初始值为0,忘了的话可以回到上面看下Node的两种构造函数//所以这里主要由两步,第一步尝试将pred的 waitStatus 改为SIGNAL,第二次再进来就直接返回true了private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
成功获取锁之后才能释放锁:
//AQS实现public final boolean release(int arg) { if (tryRelease(arg)) { //由子类实现,包括重入锁持有,直到所有锁全部释放 Node h = head; if (h != null && h.waitStatus != 0) //如果head节点的等待状态不为0,在获取锁失败的时候,node节点会将prev的waitstatus改为SIGNAL,也就是p后面还有节点, 具体是在 shouldParkAfterFailedAcquire方法 unparkSuccessor(h);//唤醒h的下一个节点 return true;//完全释放锁 } return false; }//nfs实现:protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) // 判断当前线程是否是锁的持有线程 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //如果c == 0表示当前线程获得的重入锁已经全部释放,则修改锁的持有线程为null free = true; setExclusiveOwnerThread(null); } setState(c);//否则只是将state - 1 return free; }
从上面加锁、释放锁的步骤可以看出,不公平锁不公平的地方在于,当head节点完全释放锁的时候,这个时候最新来的线程和head节点唤醒的下一个节点会同时竞争锁。
"怎么从源码看AQS"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!