Java并发数据结构的基石是什么
这篇文章主要介绍了Java并发数据结构的基石是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java并发数据结构的基石是什么文章都会有所收获,下面我们一起来看看吧。
线程阻塞原语
Java 的线程阻塞和唤醒是通过 Unsafe 类的 park 和 unpark 方法做到的。
public class Unsafe {
...
public native void park(boolean isAbsolute, long time);
public native void unpark(Thread t);
...
}
这两个方法都是 native 方法,它们本身是由 C 语言来实现的核心功能。park 的意思是停车,让当前运行的线程 Thread.currentThread() 休眠,unpark 的意思是解除停车,唤醒指定线程。这两个方法在底层是使用操作系统提供的信号量机制来实现的。具体实现过程要深究 C 代码,这里暂时不去具体分析。park 方法的两个参数用来控制休眠多长时间,第一个参数 isAbsolute 表示第二个参数是绝对时间还是相对时间,单位是毫秒。
线程从启动开始就会一直跑,除了操作系统的任务调度策略外,它只有在调用 park 的时候才会暂停运行。锁可以暂停线程的奥秘所在正是因为锁在底层调用了 park 方法。
parkBlocker
线程对象 Thread 里面有一个重要的属性 parkBlocker,它保存当前线程因为什么而 park。就好比停车场上停了很多车,这些车主都是来参加一场拍卖会的,等拍下自己想要的物品后,就把车开走。那么这里的 parkBlocker 大约就是指这场「拍卖会」。它是一系列冲突线程的管理者协调者,哪个线程该休眠该唤醒都是由它来控制的。
class Thread {
...
volatile Object parkBlocker;
...
}
当线程被 unpark 唤醒后,这个属性会被置为 null。Unsafe.park 和 unpark 并不会帮我们设置 parkBlocker 属性,负责管理这个属性的工具类是 LockSupport,它对 Unsafe 这两个方法进行了简单的包装。
class LockSupport {
...
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null); // 醒来后置null
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
}
...
}
Java 的锁数据结构正是通过调用 LockSupport 来实现休眠与唤醒的。线程对象里面的 parkBlocker 字段的值就是下面我们要讲的「排队管理器」。
排队管理器
当多个线程争用同一把锁时,必须有排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁。每一把锁内部都会有这样一个队列管理器,管理器里面会维护一个等待的线程队列。ReentrantLock 里面的队列管理器是 AbstractQueuedSynchronizer,它内部的等待队列是一个双向列表结构,列表中的每个节点的结构如下。
class AbstractQueuedSynchronizer {
volatile Node head; // 队头线程将优先获得锁
volatile Node tail; // 抢锁失败的线程追加到队尾
volatile int state; // 锁计数
}
class Node {
Node prev;
Node next;
Thread thread; // 每个节点一个线程
// 下面这两个特殊字段可以先不去理解
Node nextWaiter; // 请求的是共享锁还是独占锁
int waitStatus; // 精细状态描述字
}
加锁不成功时,当前的线程就会把自己纳入到等待链表的尾部,然后调用 LockSupport.park 将自己休眠。其它线程解锁时,会从链表的表头取一个节点,调用 LockSupport.unpark 唤醒它。
AbstractQueuedSynchronizer 类是一个抽象类,它是所有的锁队列管理器的父类,JDK 中的各种形式的锁其内部的队列管理器都继承了这个类,它是 Java 并发世界的核心基石。比如 ReentrantLock、ReadWriteLock、CountDownLatch、Semaphone、ThreadPoolExecutor 内部的队列管理器都是它的子类。这个抽象类暴露了一些抽象方法,每一种锁都需要对这个管理器进行定制。而 JDK 内置的所有并发数据结构都是在这些锁的保护下完成的,它是JDK 多线程高楼大厦的地基。
锁管理器维护的只是一个普通的双向列表形式的队列,这个数据结构很简单,但是仔细维护起来却相当复杂,因为它需要精细考虑多线程并发问题,每一行代码都写的无比小心。
JDK 锁管理器的实现者是 Douglas S. Lea,Java 并发包几乎全是他单枪匹马写出来的,在算法的世界里越是精巧的东西越是适合一个人来做。
Douglas S. Lea是纽约州立大学奥斯威戈分校计算机科学教授和现任计算机科学系主任,专门研究并发编程和并发数据结构的设计。他是Java Community Process的执行委员会成员,主持JSR 166,它为Java编程语言添加了并发实用程序。
后面我们将 AbstractQueuedSynchronizer 简写成 AQS。我必须提醒各位读者,AQS 太复杂了,如果在理解它的路上遇到了挫折,这很正常。目前市场上并不存在一本可以轻松理解 AQS 的书籍,能够吃透 AQS 的人太少太少,我自己也不算。
公平锁与非公平锁
公平锁会确保请求锁和获得锁的顺序,如果在某个点锁正处于自由状态,这时有一个线程要尝试加锁,公平锁还必须查看当前有没有其它线程排在排队,而非公平锁可以直接插队。联想一下在肯德基买汉堡时的排队场景。
也许你会问,如果某个锁处于自由状态,那它怎么会有排队的线程呢?我们假设此刻持有锁的线程刚刚释放了锁,它唤醒了等待队列中第一个节点线程,这时候被唤醒的线程刚刚从 park 方法返回,接下来它就会尝试去加锁,那么从 park 返回到加锁之间的状态就是锁的自由态,这很短暂,而这短暂的时间内还可能有其它线程也在尝试加锁。
其次还有一点需要注意,执行了 Lock.park 方法的线程自我休眠后,并不是非要等到其它线程 unpark 了自己才会醒来,它可能随时会以某种未知的原因醒来。我们看源码注释,park 返回的原因有四种
其它线程 unpark 了当前线程
时间到了自然醒(park 有时间参数)
其它线程 interrupt 了当前线程
其它未知原因导致的「假醒」
文档中没有明确说明何种未知原因会导致假醒,它倒是说明了当 park 方法返回时并不意味着锁自由了,醒过来的线程在重新尝试获取锁失败后将会再次 park 自己。所以加锁的过程需要写在一个循环里,在成功拿到锁之前可能会进行多次尝试。
计算机世界非公平锁的服务效率要高于公平锁,所以 Java 默认的锁都使用了非公平锁。不过现实世界似乎非公平锁的效率会差一点,比如在肯德基如果可以不停插队,你可以想象现场肯定一片混乱。为什么计算机世界和现实世界会有差异,大概是因为在计算机世界里某个线程插队并不会导致其它线程抱怨。
public ReentrantLock() {
this.sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
this.sync = fair ? new FairSync() : new NonfairSync();
}
共享锁与排他锁
ReentrantLock 的锁是排他锁,一个线程持有,其它线程都必须等待。而 ReadWriteLock 里面的读锁不是排他锁,它允许多线程同时持有读锁,这是共享锁。共享锁和排他锁是通过 Node 类里面的 nextWaiter 字段区分的。
class AQS {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
boolean isShared() {
return this.nextWaiter == SHARED;
}
}
那为什么这个字段没有命名成 mode 或者 type 或者干脆直接叫 shared?这是因为 nextWaiter 在其它场景还有不一样的用途,它就像 C 语言联合类型的字段一样随机应变,只不过 Java 语言没有联合类型。
条件变量
关于条件变量,需要提出的第一个问题是为什么需要条件变量,只有锁还不够么?考虑下面的伪代码,当某个条件满足时,才去干某件事
void doSomething() {
locker.lock();
while(!condition_is_true()) { // 先看能不能搞事
locker.unlock(); // 搞不了就歇会再看看能不能搞
sleep(1);
locker.lock(); // 搞事需要加锁,判断能不能搞事也需要加锁
}
justdoit(); // 搞事
locker.unlock();
}
当条件不满足时,就循环重试(其它线程会通过加锁来修改条件),但是需要间隔 sleep,不然 CPU 就会因为空转而飙高。这里存在一个问题,那就是 sleep 多久不好控制。间隔太久,会拖慢整体效率,甚至会错过时机(条件瞬间满足了又立即被重置了),间隔太短,又回导致 CPU 空转。有了条件变量,这个问题就可以解决了
void doSomethingWithCondition() {
cond = locker.newCondition();
locker.lock();
while(!condition_is_true()) {
cond.await();
}
justdoit();
locker.unlock();
}
await() 方法会一直阻塞在 cond 条件变量上直到被另外一个线程调用了 cond.signal() 或者 cond.signalAll() 方法后才会返回,await() 阻塞时会自动释放当前线程持有的锁,await() 被唤醒后会再次尝试持有锁(可能又需要排队),拿到锁成功之后 await() 方法才能成功返回。
阻塞在条件变量上的线程可以有多个,这些阻塞线程会被串联成一个条件等待队列。当 signalAll() 被调用时,会唤醒所有的阻塞线程,让所有的阻塞线程重新开始争抢锁。如果调用的是 signal() 只会唤醒队列头部的线程,这样可以避免「惊群问题」。
await() 方法必须立即释放锁,否则临界区状态就不能被其它线程修改,condition_is_true() 返回的结果也就不会改变。 这也是为什么条件变量必须由锁对象来创建,条件变量需要持有锁对象的引用这样才可以释放锁以及被 signal 唤醒后重新加锁。创建条件变量的锁必须是排他锁,如果是共享锁被 await() 方法释放了并不能保证临界区的状态可以被其它线程来修改,可以修改临界区状态的只能是排他锁。这也是为什么 ReadWriteLock.ReadLock 类的 newCondition 方法定义如下
public Condition newCondition() {
throw new UnsupportedOperationException();
}
有了条件变量,sleep 不好控制的问题就解决了。当条件满足时,调用 signal() 或者 signalAll() 方法,阻塞的线程可以立即被唤醒,几乎没有任何延迟。
条件等待队列
当多个线程 await() 在同一个条件变量上时,会形成一个条件等待队列。同一个锁可以创建多个条件变量,就会存在多个条件等待队列。这个队列和 AQS 的队列结构很接近,只不过它不是双向队列,而是单向队列。队列中的节点和 AQS 等待队列的节点是同一个类,但是节点指针不是 prev 和 next,而是 nextWaiter。
class AQS {
...
class ConditionObject {
Node firstWaiter; // 指向第一个节点
Node lastWaiter; // 指向第二个节点
}
class Node {
static final int CONDITION = -2;
static final int SIGNAL = -1;
Thread thread; // 当前等待的线程
Node nextWaiter; // 指向下一个条件等待节点
Node prev;
Node next;
int waitStatus; // waitStatus = CONDITION
}
...
}
ConditionObject 是 AQS 的内部类,这个对象里会有一个隐藏的指针 this$0 指向外部的 AQS 对象,ConditionObject 可以直接访问 AQS 对象的所有属性和方法(加锁解锁)。位于条件等待队列里的所有节点的 waitStatus 状态都被标记为 CONDITION,表示节点是因为条件变量而等待。
队列转移
当条件变量的 signal() 方法被调用时,条件等待队列的头节点线程会被唤醒,该节点从条件等待队列中被摘走,然后被转移到 AQS 的等待队列中,准备排队尝试重新获取锁。这时节点的状态从 CONDITION 转为 SIGNAL,表示当前节点是被条件变量唤醒转移过来的。
class AQS {
...
boolean transferForSignal(Node node) {
// 重置节点状态
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false
Node p = enq(node); // 进入 AQS 等待队列
int ws = p.waitStatus;
// 再修改状态为SIGNAL
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
...
}
被转移的节点的 nextWaiter 字段的含义也发生了变更,在条件队列里它是下一个节点的指针,在 AQS 等待队列里它是共享锁还是互斥锁的标志。
ReentrantLock 加锁过程
下面我们精细分析加锁过程,深入理解锁逻辑控制。我必须肯定 Dough Lea 的代码写成下面这样的极简形式,阅读起来还是挺难以理解的。
class ReentrantLock {
...
public void lock() {
sync.acquire(1);
}
...
}
class Sync extends AQS {
...
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
...
}
acquire 的 if 判断语句要分为三个部分,tryAcquire 方法表示当前的线程尝试加锁,如果加锁不成功就需要排队,这时候调用 addWaiter 方法,将当前线程入队。然后再调用 acquireQueued 方法,开始了 park 、醒来重试加锁、加锁不成功继续 park 的循环重试加锁过程。直到加锁成功 acquire 方法才会返回。
如果在循环重试加锁过程中被其它线程打断了,acquireQueued 方法就会返回 true。这时候线程就需要调用 selfInterrupt() 方法给当前线程设置一个被打断的标识位。
// 打断当前线程,其实就是设置一个标识位
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
线程如何知道自己被其它线程打断了呢?在 park 醒来之后调用 Thread.interrupted() 就知道了,不过这个方法只能调用一次,因为它在调用之后就会立即 clear 打断标志位。这也是为什么 acquire 方法里需要调用 selfInterrupt() ,为的就是重新设置打断标志位。这样上层的逻辑才可以通过 Thread.interrupted() 知道自己有没有被打断。
acquireQueued 和 addWaiter 方法由 AQS 类提供,tryAcquire 需要由子类自己实现。不同的锁会有不同的实现。下面我们来看看 ReentrantLock 的公平锁 tryAcquire 方法的实现
这里有个 if else 分支,其中 else if 部分表示锁的重入,当前尝试加锁的线程是已经持有了这把锁的线程,也就是同一个线程重复加锁,这时只需要增加计数值就行了。锁的 state 记录的就是加锁计数,重入一次就 +1。AQS 对象里有一个 exclusiveOwnerThread 字段,记录了当前持有排他锁的线程。
if(c == 0) 意味着当前锁是自由态,计数值为零。这时就需要争抢锁,因为同一时间可能会有多个线程在调用 tryAcquire。争抢的方式是用 CAS 操作 compareAndSetState,成功将锁计数值从 0 改成 1 的线程将获得这把锁,将当前的线程记录到 exclusiveOwnerThread 中。
代码里还有一个 hasQueuedPredecessors() 判断,这个判断非常重要,它的意思是看看当前的 AQS 等待队列里有没有其它线程在排队,公平锁在加锁之前需要 check 一下,如果有排队的,自己就不能插队。而非公平锁就不需要 check,公平锁和非公平锁的全部的实现差异就在于此,就这一个 check 决定了锁的公平与否。
下面我们再看看 addWaiter 方法的实现,参数 mode 表示是共享锁还是排他锁,它对应 Node.nextWaiter 属性。
addWaiter 需要将新的节点添加到 AQS 等待队列的队尾。如果队尾 tail 是空的意味着队列还没有初始化,那就需要初始化一下。AQS 队列在初始化时需要一个冗余的头部节点,这个节点的 thread 字段是空的。
将新节点添加到队尾也是需要考虑多线程并发的,所以代码里再一次使用了 CAS 操作 compareAndSetTail 来竞争队尾指针。没有竞争到的线程就会继续下一轮竞争 for(;;) 继续使用 CAS 操作将新节点往队尾添加。
下面我们再看看 acquireQueue 方法的代码实现,它会重复 park、尝试再次加锁、加锁失败继续 park 的循环过程。
acquireQueue 在尝试加锁之前会先看看自己是不是 AQS 等待队列的第一个节点,如果不是它就继续去 park。这意味着不管是公平还是非公平锁,在这里它们都统一采取了公平的方案,看看队列中是不是轮到自己了。也就是说「一朝排队,永远排队」。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
线程在 park 返回醒来之后要立即检测一下是否被其它线程中断了。不过即使发生中断了,它还会继续尝试获取锁,如果获取不到,还会继续睡眠,直到锁获取到了才将中断状态返回。这意味着打断线程并不会导致死锁状态(拿不到锁)退出。
同时我们还可以注意到锁是可以取消的 cancelAcquire(),准确地说是取消处于等待加锁的状态,线程处于 AQS 的等待队列中等待加锁。那什么情况下才会抛出异常而导致取消加锁呢,唯一的可能就是 tryAcquire 方法,这个方法是由子类实现的,子类的行为不受 AQS 控制。当子类的 tryAcquire 方法抛出了异常,那 AQS 最好的处理方法就是取消加锁了。cancelAcquire 会将当前节点从等待队列中移除。
ReentrantLock 解锁过程
解锁的过程要简单一些,将锁计数降为零后,唤醒等待队列中的第一个有效节点。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 解铃还须系铃人
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
考虑到可重入锁,需要判断锁计数是否降为零才可以确定锁是否彻底被释放。只有锁彻底被释放了才能唤醒后继等待节点。unparkSuccessor 会跳过无效节点(已取消的节点),找到第一个有效节点调用 unpark() 唤醒相应的线程。
读写锁
读写锁分为两个锁对象 ReadLock 和 WriteLock,这两个锁对象共享同一个 AQS。AQS 的锁计数变量 state 将分为两个部分,前 16bit 为共享锁 ReadLock 计数,后 16bit 为互斥锁 WriteLock 计数。互斥锁记录的是当前写锁重入的次数,共享锁记录的是所有当前持有共享读锁的线程重入总次数。
读写锁同样也需要考虑公平锁和非公平锁。共享锁和互斥锁的公平锁策略和 ReentrantLock 一样,就是看看当前还有没有其它线程在排队,自己会乖乖排到队尾。非公平锁策略不一样,它会比较偏向于给写锁提供更多的机会。如果当前 AQS 队列里有任何读写请求的线程在排队,那么写锁可以直接去争抢,但是如果队头是写锁请求,那么读锁需要将机会让给写锁,去队尾排队。
毕竟读写锁适合读多写少的场合,对于偶尔出现一个写锁请求就应该得到更高的优先级去处理。
写锁加锁过程
读写锁的写锁加锁在整体逻辑上和 ReentrantLock 是一样的,不同的是 tryAcquire() 方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁也需要考虑可重入,如果当前 AQS 互斥锁的持有线程正好是当前要加锁的线程,那么就是写锁在重入,重入只需要递增锁计数值即可。当 c!=0 也就是锁计数不为零时,既可能是因为当前的 AQS 有读锁也可能是因为有写锁,判断 w == 0 就是判断当前的计数是不是读锁带来的。
如果计数值为零,那就开始争抢锁。取决于锁是否公平,在争抢之前调用 writerShouldBlock() 方法看看自己是否需要排队,如果不需要排队,就可以使用 CAS 操作来争抢,成功将计数值从 0 设置为 1 的线程将独占写锁。
读锁加锁过程
读锁加锁过程比写锁要复杂很多,它在整体流程上和写锁一样,但是细节差距很大。特别是它需要为每一个线程记录读锁计数,这部分逻辑占据了不少代码。
public final void acquireShared(int arg) {
// 如果尝试加锁不成功, 就去排队休眠,然后循环重试
if (tryAcquireShared(arg) < 0)
// 排队、循环重试
doAcquireShared(arg);
}
如果当前线程已经持有写锁,它还可以继续加读锁,这是为了达成锁降级必须支持的逻辑。锁降级是指在持有写锁的情况下,再加读锁,再解写锁。相比于先写解锁再加读锁而言,这样可以省去加锁二次排队的过程。因为锁降级的存在,锁计数中读写计数可以同时不为零。
wlock.lock();
if(whatever) {
// 降级
rlock.lock();
wlock.unlock();
doRead();
rlock.unlock();
} else {
// 不降级
doWrite()
wlock.unlock();
}
为了给每一个读锁线程进行锁计数,它设置了一个 ThreadLocal 变量。
private transient ThreadLocalHoldCounter readHolds;
static final class HoldCounter {
int count;
final long tid = LockSupport.getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
但是 ThreadLocal 变量访问起来效率不够高,所以又设置了缓存。它会存储最近一次获取读锁线程的锁计数。在线程争用不是特别频繁的情况下,直接读取缓存会比较高效。
private transient HoldCounter cachedHoldCounter;
Dough Lea 觉得使用 cachedHoldCounter 还是不够高效,所以又加了一层缓存记录 firstReader,记录第一个将读锁计数从 0 变成 1 的线程以及锁计数。当没有线程争用时,直接读取这两个字段会更加高效。
private transient Thread firstReader;
private transient int firstReaderHoldCount;
final int getReadHoldCount() {
// 先访问锁全局计数的读计数部分
if (getReadLockCount() == 0)
return 0;
// 再访问 firstReader
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
// 再访问最近的读线程锁计数
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == LockSupport.getThreadId(current))
return rh.count;
// 无奈读 ThreadLocal 吧
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
所以我们看到为了记录这个读锁计数作者煞费苦心,那这个读计数的作用是什么呢?那就是线程可以通过这个计数值知道自己有没有持有这个读写锁。
读加锁还有一个自旋的过程,所谓自旋就是第一次加锁失败,那就直接循环重试,不休眠,听起来有点像死循环重试法。
final static int SHARED_UNIT = 65536
// 读计数是高16位
final int fullTryAcquireShared(Thread current) {
for(;;) {
int c = getState();
// 如果有其它线程加了写锁,还是返回睡觉去吧
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
...
// 超出计数上限
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 拿到读锁了
...
return 1
}
...
// 循环重试
}
}
因为读锁需要使用 CAS 操作来修改底层锁的总读计数值,成功的才可以获得读锁,获取读锁的 CAS 操作失败只是意味着读锁之间存在 CAS 操作的竞争,并不意味着此刻锁被别人占据了自己不能获得。多试几次肯定可以加锁成功,这就是自旋的原因所在。同样在释放读锁的时候也有一个 CAS 操作的循环重试过程。
protected final boolean tryReleaseShared(int unused) {
...
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
...
}
关于"Java并发数据结构的基石是什么"这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对"Java并发数据结构的基石是什么"知识都有一定的了解,大家如果还想学习更多知识,欢迎关注行业资讯频道。