手写AQS非公平锁的示例分析
发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,手写AQS非公平锁的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。1. Unsafe工具类package com.shi.fli
千家信息网最后更新 2025年02月06日手写AQS非公平锁的示例分析
手写AQS非公平锁的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
1. Unsafe工具类
package com.shi.flink.unsafeTest;import sun.misc.Unsafe;import java.lang.reflect.Field;/** * @author shiye * @create 2021-03-30 17:03 */public class UnsafeUtil { public static Unsafe getInstance() { Field field = null; try { field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); return (Unsafe) field.get(null); } catch (NoSuchFieldException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } return null; }}
2. 手写AQS抽象类
package com.shi.flink.shilock;import com.shi.flink.unsafeTest.UnsafeUtil;import sun.misc.Unsafe;import java.util.concurrent.locks.AbstractOwnableSynchronizer;import java.util.concurrent.locks.LockSupport;/** * 自己写抽象AQS实现 * * @author shiye * @create 2021-03-30 14:10 */public abstract class ShiAQS extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; /** * 头指针 */ private transient volatile Node head; /** * 尾指针 */ private transient volatile Node tail; /** * 状态值: * 0:空闲, * 1:正在有人使用 */ private volatile int state; /** * 获取当前状态 * * @return */ protected final int getState() { return state; } /** * 设置当前锁的状态 * * @param state */ public void setState(int state) { this.state = state; } /** * 使用unsafe类来初始化一些参数值 */ private static final Unsafe unsafe = UnsafeUtil.getInstance(); private static long stateOffset; private static long headOffset; private static long tailOffset; private static long waitStatusOffset; private static long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next")); } catch (NoSuchFieldException e) { e.printStackTrace(); } } /** * 设置状态 * * @param expect * @param update * @return */ protected boolean compareAndSetState(int expect, int update) { //读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较 return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } /** * 设置头指针 * * @param update * @return */ private final boolean compareAndSetHead(Node update) { return unsafe.compareAndSwapObject(this, headOffset, null, update); } /** * 如果pre节点得waitStatus值为ws, * 则把signal赋值给waitStatus * * @param pre * @param ws * @param signal * @return */ private static boolean compareAndSetWaitStatus(Node pre, int ws, int signal) { return unsafe.compareAndSwapInt(pre, waitStatusOffset, ws, signal); } /** * 设置尾指针 * * @param expect * @param update * @return */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); } /** * 设置下一个节点 * * @param node * @param expect * @param update * @return */ private static final boolean compareAndSetNext(Node node, Node expect, Node update) { return unsafe.compareAndSwapObject(node, nextOffset, expect, update); } /** * 解锁方法 * * @param arg */ protected void release(int arg) throws Exception { //尝试去释放占用锁得线程 boolean tryRelease = tryRelease(arg); if (tryRelease) { Node h = head; if (h != null && h.waitStatus != 0) { unparkSuccessor(h); } } } /** * 尝试去释放占用锁得线程 * * @param arg * @return * @throws Exception */ protected boolean tryRelease(int arg) throws Exception { if (Thread.currentThread() != getExclusiveOwnerThread()) { //如果当前线程不是占用锁得线程就抛出异常 throw new Exception("解锁失败,当前线程不是占用锁得线程无法解锁"); } else { setExclusiveOwnerThread(null); this.setState(0); return true; } } /** * 打断某个线程 * * @return */ protected boolean interruptThread(Thread thread) throws Exception { Thread ownerThread = getExclusiveOwnerThread(); if (ownerThread == thread) { //如果是正在运行得线程 compareAndSetState(1, 0); setExclusiveOwnerThread(null); } else if (head != null) { //再对类中查找当前线程,并且取消排队 for (Node next1 = head.next; next1 != null; next1 = next1.next) { if (next1.thread == thread) { compareAndSetWaitStatus(next1, next1.waitStatus, 1); } } } //解锁 thread.interrupt(); System.out.println(thread.getName() + " 已经中断了 ====> "); unparkSuccessor(head); System.out.println(thread.getName() + " 已经结束了 ====> "); return false; } /** * 自定义一个内部类Node节点 */ static final class Node { //共享模式标记 static final Node shared = new Node(); //独占锁标记 static final Node excusive = null; //waitStatus值,指示线程已取消 static final int cancelled = 1; //waitStatus值,用于指示后续线程需要解除等待状态 static final int signal = -1; //waitStatus值,指示线程正在等待条件 static final int condition = -2; //waitStatus值,指示下一个acquireShared应该 无条件传播 static final int propagate = -3; //锁的等待状态 volatile int waitStatus; //前指针 volatile Node prev; //后指针 volatile Node next; //线程 volatile Thread thread; Node nextWaiter; //是否是共享锁 final boolean isShared() { return nextWaiter == shared; } //无参构造 public Node() { } public Node(Node nextWaiter, Thread thread) { this.nextWaiter = nextWaiter; this.thread = thread; } /** * 获取前节点 * * @return */ public Node getPrev() { Node p = prev; if (p == null) { throw new NullPointerException("前节点不能为空"); } return p; } } /** * 获得 * * @param arg */ public void acquire(int arg) { //1.尝试去排队 boolean tryAcquire = tryAcquire(arg); if (!tryAcquire) { //2.如果抢占锁失败,就去排队 Node node = addWaiter(Node.excusive); //3.对已经再队列中的节点,进行休眠等侯 acquireQueued(node, arg); } } /** * 先尝试去排队 * 1.先获取锁得状态,如果状态为0,就尝试去占用一次锁 * 否则返回占用失败 * * @param arg * @return true:表示抢占锁成功 * false:表示抢占所失败 */ public final boolean tryAcquire(int arg) { Thread current = Thread.currentThread(); int state = getState(); if (state == 0) { //如果空闲了,就尝试去占用一次锁 if (compareAndSetState(0, arg)) { //抢占成功就返回true,并设置线程 setExclusiveOwnerThread(current); return true; } } else if (getExclusiveOwnerThread() == current) { //如果当前当前线程多次抢占锁,就将状态+arg int nextState = state + arg; if (nextState < 0) { throw new Error("超过最大锁计数"); } setState(nextState); return true; } return false; } /** * 添加等待队列 * * @param mode */ public Node addWaiter(Node mode) { Node node = new Node(mode, Thread.currentThread()); Node temp = tail; if (temp == null) { //入队 enQueue(node); return node; } else { //如果队列中不为空,就把当前节点添加到尾节点中 node.prev = temp; if (compareAndSetTail(temp, node)) { temp.next = node; return node; } } return node; } /** * 入队 * 把node节点添加到队列中, * 如果队列为null就初始化一个队列并且把node节点添加到尾节点中 * * @param node 返回当前节点 */ public Node enQueue(Node node) { while (true) { Node temp = tail; if (temp == null) { //创建一个头指针 compareAndSetHead(new Node()); //让尾指针也指向头指针(空节点) tail = head; } else { node.prev = temp; if (compareAndSetTail(temp, node)) { temp.next = node; return node; } } } } /** * @param node 当前正在侯队中得节点 * @param arg * @return */ protected boolean acquireQueued(Node node, int arg) { boolean failed = true; try { //是否被打断,默认false boolean interrupted = false; while (true) { final Node p = node.getPrev(); if (p == head && tryAcquire(arg)) { //如果是他的头节点是head,并且尝试抢占锁成功就出队,让当前线程运行 setHead(node); p.next = null;//利于gc回收 failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { //pre节点得waitStatus 设置成-1,并且让当前线程阻塞,打断当前线程 interrupted = true; } } } finally { if (failed) cancelAcquire(node); } } protected final void cancelAcquire(Node node) { if (node == null) return; node.thread = null; Node pre = node.prev; while (pre.waitStatus > 0) { node.prev = pre = pre.prev; } Node predNext = pre.next; node.waitStatus = Node.cancelled; if (node == tail && compareAndSetTail(node, pre)) { compareAndSetNext(pre, predNext, null); } else { // If successor needs signal, try to set pred's next-link // so it will get one. Otherwise wake it up to propagate. int ws; if (pre != head && ((ws = pre.waitStatus) == Node.signal || (ws <= 0 && compareAndSetWaitStatus(pre, ws, Node.signal))) && pre.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pre, predNext, next); } else { unparkSuccessor(node); } node.next = node; // help GC } } /** * 解锁必须成功 * * @param node */ protected final void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) { compareAndSetWaitStatus(node, ws, 0); } /** * AQS源码是这样实现得 * 如果当前节点不为空,并且用户取消了,就从尾节点往前遍历一个,直到找到最前面得一个节点,解锁当前线程 */ Node next1 = node.next;// if (next1 == null || next1.waitStatus > 0) {// next1 = null;// for (Node t = tail; t != null && t != node; t = t.prev) {// if (t.waitStatus <= 0) {// next1 = t;// }// }// } /** * 我自己实现,从前往后找 */ if (next1 != null && next1.waitStatus > 0) { for (next1 = next1.next; next1 != null; next1 = next1.next) { if (next1.waitStatus <= 0) { break; } } } if (next1 != null) { //唤醒下一个线程 System.out.println(next1.thread.getName() + " 开始唤醒了 ====> "); LockSupport.unpark(next1.thread); System.out.println(next1.thread.getName() + " 已经唤醒了 ====> "); } } /** * 将pre节点得waitStatus 设置成-1 * * @param pre * @param node * @return */ protected static boolean shouldParkAfterFailedAcquire(Node pre, Node node) { //获取node节点得前一个节点得状态 int ws = pre.waitStatus; //如果是-1 就返回true if (ws == Node.signal) { return true; } if (ws > 0) { do { pre = pre.prev; node.prev = pre; } while (pre.waitStatus > 0); pre.next = node; } else { //设置成-1 boolean flag = compareAndSetWaitStatus(pre, ws, Node.signal);// System.out.println("设置成-1是否成功:" + flag); } return false; } /** * 阻塞当前线程,并且返回当前线程得打断状态 * * @return true: 打断线程成功 */ protected final boolean parkAndCheckInterrupt() { //打断线程,让线程阻塞 LockSupport.park(this); return Thread.interrupted(); } public Node getHead() { return head; } public void setHead(Node head) { this.head = head; }}
3.非公平所实现
package com.shi.flink.shilock;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;/** * 自定义非公平锁 * @author shiye * @create 2021-03-30 11:02 */public class ShiNonfairLock extends ShiAQS implements Lock, java.io.Serializable { private static final long serialVersionUID = 7373984872572414699L; @Override public void lock() { if(compareAndSetState(0, 1)){ //如果抢到了锁,就把当前线程设置进去 setExclusiveOwnerThread(Thread.currentThread()); }else{// 否则就去排队 acquire(1); } } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { try { super.release(1); } catch (Exception e) { e.printStackTrace(); } } @Override public Condition newCondition() { return null; } /** * 打断某个线程(自己瞎写的,有bug) * @param thread * @return */ public boolean interruptThread(Thread thread) throws Exception { return super.interruptThread(thread); }}
4.测试
package com.shi.flink.shilock;import java.util.concurrent.TimeUnit;/** * @author shiye * @create 2021-03-31 17:09 */public class MyLockTest { public static void main(String[] args) throws Exception { ShiNonfairLock lock = new ShiNonfairLock(); new Thread(() -> { try { System.out.println("A 线程进入到...加锁过程"); lock.lock(); System.out.println("A 已经抢占到锁...休眠10s后运行......"); TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("A线程运行完成,开始解锁...."); lock.unlock(); } }, "A").start(); TimeUnit.SECONDS.sleep(1); Thread B = new Thread(() -> { try { System.out.println("B 线程进入到...加锁过程"); lock.lock(); System.out.println("B 已经抢占到锁...休眠10s后运行......"); TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("B线程运行完成,开始解锁...."); lock.unlock(); } }, "B"); B.start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { try { System.out.println("C 线程进入到...加锁过程"); lock.lock(); System.out.println("C 已经抢占到锁...休眠10s后运行......"); TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("C线程运行完成,开始解锁...."); lock.unlock(); } }, "C").start(); TimeUnit.SECONDS.sleep(1); new Thread(() -> { try { System.out.println("D 线程进入到...加锁过程"); lock.lock(); System.out.println("D 已经抢占到锁...休眠10s后运行......"); TimeUnit.SECONDS.sleep(10); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("D线程运行完成,开始解锁...."); lock.unlock(); } }, "D").start();// TimeUnit.SECONDS.sleep(1);// System.out.println("强制让 " + B.getName() + " 线程中断...");// lock.interruptThread(B); }}
看完上述内容,你们掌握手写AQS非公平锁的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!
线程
状态
节点
运行
指针
尝试
成功
休眠
指示
过程
方法
正在
示例
分析
内容
更多
标记
空闲
问题
阻塞
数据库的安全要保护哪些东西
数据库安全各自的含义是什么
生产安全数据库录入
数据库的安全性及管理
数据库安全策略包含哪些
海淀数据库安全审计系统
建立农村房屋安全信息数据库
易用的数据库客户端支持安全管理
连接数据库失败ssl安全错误
数据库的锁怎样保障安全
红色数据库试用
51单片机数据库文件
山东cmmi软件开发
日立电梯服务器故障怎么看
数据库工程师报考
山西省的软件开发公司
蚌埠企业软件开发公司哪家好
网络安全新闻题目
我的世界服务器安全联盟
机读数据库的特点
软件开发用vs哪个版本
网络技术属于什么大类
海关数据库2014
日服LOL手游服务器
关于广告服务器的英语作文
如何保存个人数据库
在数据库中什么叫映射
维普科技期刊数据库
数据库e r图作用
湖南安卓软件开发学费多少
蚌埠企业软件开发公司哪家好
东莞软件开发找哪家
宜兴重型软件开发简介
计算机软件开发工程师月薪
软件开发量计算
永兴软件开发招生
酉阳网络安全审计系统咨询辅导
设计院和软件开发
数据库的生命周期可分两个阶段
标签 数据库 设计