千家信息网

手写AQS非公平锁的示例分析

发表于:2025-02-06 作者:千家信息网编辑
千家信息网最后更新 2025年02月06日,手写AQS非公平锁的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。1. Unsafe工具类package com.shi.fli
千家信息网最后更新 2025年02月06日手写AQS非公平锁的示例分析

手写AQS非公平锁的示例分析,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。

1. Unsafe工具类

package com.shi.flink.unsafeTest;import sun.misc.Unsafe;import java.lang.reflect.Field;/** * @author shiye * @create 2021-03-30 17:03 */public class UnsafeUtil {    public static Unsafe getInstance() {        Field field = null;        try {            field = Unsafe.class.getDeclaredField("theUnsafe");            field.setAccessible(true);            return (Unsafe) field.get(null);        } catch (NoSuchFieldException e) {            e.printStackTrace();        } catch (IllegalAccessException e) {            e.printStackTrace();        }        return null;    }}

2. 手写AQS抽象类

package com.shi.flink.shilock;import com.shi.flink.unsafeTest.UnsafeUtil;import sun.misc.Unsafe;import java.util.concurrent.locks.AbstractOwnableSynchronizer;import java.util.concurrent.locks.LockSupport;/** * 自己写抽象AQS实现 * * @author shiye * @create 2021-03-30 14:10 */public abstract class ShiAQS extends AbstractOwnableSynchronizer implements java.io.Serializable {    private static final long serialVersionUID = 7373984972572414691L;    /**     * 头指针     */    private transient volatile Node head;    /**     * 尾指针     */    private transient volatile Node tail;    /**     * 状态值:     * 0:空闲,     * 1:正在有人使用     */    private volatile int state;    /**     * 获取当前状态     *     * @return     */    protected final int getState() {        return state;    }    /**     * 设置当前锁的状态     *     * @param state     */    public void setState(int state) {        this.state = state;    }    /**     * 使用unsafe类来初始化一些参数值     */    private static final Unsafe unsafe = UnsafeUtil.getInstance();    private static long stateOffset;    private static long headOffset;    private static long tailOffset;    private static long waitStatusOffset;    private static long nextOffset;    static {        try {            stateOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("state"));            headOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("head"));            tailOffset = unsafe.objectFieldOffset(ShiAQS.class.getDeclaredField("tail"));            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));        } catch (NoSuchFieldException e) {            e.printStackTrace();        }    }    /**     * 设置状态     *     * @param expect     * @param update     * @return     */    protected boolean compareAndSetState(int expect, int update) {        //读取传入对象o在内存中偏移量为offset位置的值与期望值expected作比较        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);    }    /**     * 设置头指针     *     * @param update     * @return     */    private final boolean compareAndSetHead(Node update) {        return unsafe.compareAndSwapObject(this, headOffset, null, update);    }    /**     * 如果pre节点得waitStatus值为ws,     * 则把signal赋值给waitStatus     *     * @param pre     * @param ws     * @param signal     * @return     */    private static boolean compareAndSetWaitStatus(Node pre, int ws, int signal) {        return unsafe.compareAndSwapInt(pre, waitStatusOffset, ws, signal);    }    /**     * 设置尾指针     *     * @param expect     * @param update     * @return     */    private final boolean compareAndSetTail(Node expect, Node update) {        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);    }    /**     * 设置下一个节点     *     * @param node     * @param expect     * @param update     * @return     */    private static final boolean compareAndSetNext(Node node,                                                   Node expect,                                                   Node update) {        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);    }    /**     * 解锁方法     *     * @param arg     */    protected void release(int arg) throws Exception {        //尝试去释放占用锁得线程        boolean tryRelease = tryRelease(arg);        if (tryRelease) {            Node h = head;            if (h != null && h.waitStatus != 0) {                unparkSuccessor(h);            }        }    }    /**     * 尝试去释放占用锁得线程     *     * @param arg     * @return     * @throws Exception     */    protected boolean tryRelease(int arg) throws Exception {        if (Thread.currentThread() != getExclusiveOwnerThread()) {            //如果当前线程不是占用锁得线程就抛出异常            throw new Exception("解锁失败,当前线程不是占用锁得线程无法解锁");        } else {            setExclusiveOwnerThread(null);            this.setState(0);            return true;        }    }    /**     * 打断某个线程     *     * @return     */    protected boolean interruptThread(Thread thread) throws Exception {        Thread ownerThread = getExclusiveOwnerThread();        if (ownerThread == thread) {            //如果是正在运行得线程            compareAndSetState(1, 0);            setExclusiveOwnerThread(null);        } else if (head != null) {            //再对类中查找当前线程,并且取消排队            for (Node next1 = head.next; next1 != null; next1 = next1.next) {                if (next1.thread == thread) {                    compareAndSetWaitStatus(next1, next1.waitStatus, 1);                }            }        }        //解锁        thread.interrupt();        System.out.println(thread.getName() + " 已经中断了 ====> ");        unparkSuccessor(head);        System.out.println(thread.getName() + " 已经结束了 ====> ");        return false;    }    /**     * 自定义一个内部类Node节点     */    static final class Node {        //共享模式标记        static final Node shared = new Node();        //独占锁标记        static final Node excusive = null;        //waitStatus值,指示线程已取消        static final int cancelled = 1;        //waitStatus值,用于指示后续线程需要解除等待状态        static final int signal = -1;        //waitStatus值,指示线程正在等待条件        static final int condition = -2;        //waitStatus值,指示下一个acquireShared应该 无条件传播        static final int propagate = -3;        //锁的等待状态        volatile int waitStatus;        //前指针        volatile Node prev;        //后指针        volatile Node next;        //线程        volatile Thread thread;        Node nextWaiter;        //是否是共享锁        final boolean isShared() {            return nextWaiter == shared;        }        //无参构造        public Node() {        }        public Node(Node nextWaiter, Thread thread) {            this.nextWaiter = nextWaiter;            this.thread = thread;        }        /**         * 获取前节点         *         * @return         */        public Node getPrev() {            Node p = prev;            if (p == null) {                throw new NullPointerException("前节点不能为空");            }            return p;        }    }    /**     * 获得     *     * @param arg     */    public void acquire(int arg) {        //1.尝试去排队        boolean tryAcquire = tryAcquire(arg);        if (!tryAcquire) {            //2.如果抢占锁失败,就去排队            Node node = addWaiter(Node.excusive);            //3.对已经再队列中的节点,进行休眠等侯            acquireQueued(node, arg);        }    }    /**     * 先尝试去排队     * 1.先获取锁得状态,如果状态为0,就尝试去占用一次锁     * 否则返回占用失败     *     * @param arg     * @return true:表示抢占锁成功     * false:表示抢占所失败     */    public final boolean tryAcquire(int arg) {        Thread current = Thread.currentThread();        int state = getState();        if (state == 0) {            //如果空闲了,就尝试去占用一次锁            if (compareAndSetState(0, arg)) {                //抢占成功就返回true,并设置线程                setExclusiveOwnerThread(current);                return true;            }        } else if (getExclusiveOwnerThread() == current) {            //如果当前当前线程多次抢占锁,就将状态+arg            int nextState = state + arg;            if (nextState < 0) {                throw new Error("超过最大锁计数");            }            setState(nextState);            return true;        }        return false;    }    /**     * 添加等待队列     *     * @param mode     */    public Node addWaiter(Node mode) {        Node node = new Node(mode, Thread.currentThread());        Node temp = tail;        if (temp == null) {            //入队            enQueue(node);            return node;        } else {            //如果队列中不为空,就把当前节点添加到尾节点中            node.prev = temp;            if (compareAndSetTail(temp, node)) {                temp.next = node;                return node;            }        }        return node;    }    /**     * 入队     * 把node节点添加到队列中,     * 如果队列为null就初始化一个队列并且把node节点添加到尾节点中     *     * @param node 返回当前节点     */    public Node enQueue(Node node) {        while (true) {            Node temp = tail;            if (temp == null) {                //创建一个头指针                compareAndSetHead(new Node());                //让尾指针也指向头指针(空节点)                tail = head;            } else {                node.prev = temp;                if (compareAndSetTail(temp, node)) {                    temp.next = node;                    return node;                }            }        }    }    /**     * @param node 当前正在侯队中得节点     * @param arg     * @return     */    protected boolean acquireQueued(Node node, int arg) {        boolean failed = true;        try {            //是否被打断,默认false            boolean interrupted = false;            while (true) {                final Node p = node.getPrev();                if (p == head && tryAcquire(arg)) {                    //如果是他的头节点是head,并且尝试抢占锁成功就出队,让当前线程运行                    setHead(node);                    p.next = null;//利于gc回收                    failed = false;                    return interrupted;                }                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {                    //pre节点得waitStatus 设置成-1,并且让当前线程阻塞,打断当前线程                    interrupted = true;                }            }        } finally {            if (failed) cancelAcquire(node);        }    }    protected final void cancelAcquire(Node node) {        if (node == null) return;        node.thread = null;        Node pre = node.prev;        while (pre.waitStatus > 0) {            node.prev = pre = pre.prev;        }        Node predNext = pre.next;        node.waitStatus = Node.cancelled;        if (node == tail && compareAndSetTail(node, pre)) {            compareAndSetNext(pre, predNext, null);        } else {            // If successor needs signal, try to set pred's next-link            // so it will get one. Otherwise wake it up to propagate.            int ws;            if (pre != head &&                    ((ws = pre.waitStatus) == Node.signal ||                            (ws <= 0 && compareAndSetWaitStatus(pre, ws, Node.signal))) &&                    pre.thread != null) {                Node next = node.next;                if (next != null && next.waitStatus <= 0)                    compareAndSetNext(pre, predNext, next);            } else {                unparkSuccessor(node);            }            node.next = node; // help GC        }    }    /**     * 解锁必须成功     *     * @param node     */    protected final void unparkSuccessor(Node node) {        int ws = node.waitStatus;        if (ws < 0) {            compareAndSetWaitStatus(node, ws, 0);        }        /**         * AQS源码是这样实现得         * 如果当前节点不为空,并且用户取消了,就从尾节点往前遍历一个,直到找到最前面得一个节点,解锁当前线程         */        Node next1 = node.next;//        if (next1 == null || next1.waitStatus > 0) {//            next1 = null;//            for (Node t = tail; t != null && t != node; t = t.prev) {//                if (t.waitStatus <= 0) {//                    next1 = t;//                }//            }//        }        /**         * 我自己实现,从前往后找         */        if (next1 != null && next1.waitStatus > 0) {            for (next1 = next1.next; next1 != null; next1 = next1.next) {                if (next1.waitStatus <= 0) {                    break;                }            }        }        if (next1 != null) {            //唤醒下一个线程            System.out.println(next1.thread.getName() + " 开始唤醒了 ====> ");            LockSupport.unpark(next1.thread);            System.out.println(next1.thread.getName() + " 已经唤醒了 ====> ");        }    }    /**     * 将pre节点得waitStatus 设置成-1     *     * @param pre     * @param node     * @return     */    protected static boolean shouldParkAfterFailedAcquire(Node pre, Node node) {        //获取node节点得前一个节点得状态        int ws = pre.waitStatus;        //如果是-1 就返回true        if (ws == Node.signal) {            return true;        }        if (ws > 0) {            do {                pre = pre.prev;                node.prev = pre;            } while (pre.waitStatus > 0);            pre.next = node;        } else {            //设置成-1            boolean flag = compareAndSetWaitStatus(pre, ws, Node.signal);//            System.out.println("设置成-1是否成功:" + flag);        }        return false;    }    /**     * 阻塞当前线程,并且返回当前线程得打断状态     *     * @return true: 打断线程成功     */    protected final boolean parkAndCheckInterrupt() {        //打断线程,让线程阻塞        LockSupport.park(this);        return Thread.interrupted();    }    public Node getHead() {        return head;    }    public void setHead(Node head) {        this.head = head;    }}

3.非公平所实现

package com.shi.flink.shilock;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;/** * 自定义非公平锁 * @author shiye * @create 2021-03-30 11:02 */public class ShiNonfairLock extends ShiAQS implements Lock, java.io.Serializable {    private static final long serialVersionUID = 7373984872572414699L;    @Override    public void lock() {        if(compareAndSetState(0, 1)){            //如果抢到了锁,就把当前线程设置进去            setExclusiveOwnerThread(Thread.currentThread());        }else{//            否则就去排队            acquire(1);        }    }    @Override    public void lockInterruptibly() throws InterruptedException {    }    @Override    public boolean tryLock() {        return false;    }    @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        return false;    }    @Override    public void unlock() {        try {            super.release(1);        } catch (Exception e) {            e.printStackTrace();        }    }    @Override    public Condition newCondition() {        return null;    }    /**     *  打断某个线程(自己瞎写的,有bug)     * @param thread     * @return     */    public boolean interruptThread(Thread thread) throws Exception {        return super.interruptThread(thread);    }}

4.测试

package com.shi.flink.shilock;import java.util.concurrent.TimeUnit;/** * @author shiye * @create 2021-03-31 17:09 */public class MyLockTest {    public static void main(String[] args) throws Exception {        ShiNonfairLock lock = new ShiNonfairLock();        new Thread(() -> {            try {                System.out.println("A 线程进入到...加锁过程");                lock.lock();                System.out.println("A 已经抢占到锁...休眠10s后运行......");                TimeUnit.SECONDS.sleep(10);            } catch (Exception e) {                e.printStackTrace();            } finally {                System.out.println("A线程运行完成,开始解锁....");                lock.unlock();            }        }, "A").start();        TimeUnit.SECONDS.sleep(1);        Thread B = new Thread(() -> {            try {                System.out.println("B 线程进入到...加锁过程");                lock.lock();                System.out.println("B 已经抢占到锁...休眠10s后运行......");                TimeUnit.SECONDS.sleep(10);            } catch (Exception e) {                e.printStackTrace();            } finally {                System.out.println("B线程运行完成,开始解锁....");                lock.unlock();            }        }, "B");        B.start();        TimeUnit.SECONDS.sleep(1);        new Thread(() -> {            try {                System.out.println("C 线程进入到...加锁过程");                lock.lock();                System.out.println("C 已经抢占到锁...休眠10s后运行......");                TimeUnit.SECONDS.sleep(10);            } catch (Exception e) {                e.printStackTrace();            } finally {                System.out.println("C线程运行完成,开始解锁....");                lock.unlock();            }        }, "C").start();        TimeUnit.SECONDS.sleep(1);        new Thread(() -> {            try {                System.out.println("D 线程进入到...加锁过程");                lock.lock();                System.out.println("D 已经抢占到锁...休眠10s后运行......");                TimeUnit.SECONDS.sleep(10);            } catch (Exception e) {                e.printStackTrace();            } finally {                System.out.println("D线程运行完成,开始解锁....");                lock.unlock();            }        }, "D").start();//        TimeUnit.SECONDS.sleep(1);//        System.out.println("强制让 " + B.getName() + " 线程中断...");//        lock.interruptThread(B);    }}

看完上述内容,你们掌握手写AQS非公平锁的示例分析的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注行业资讯频道,感谢各位的阅读!

0