千家信息网

JAVA中怎么利用阻塞队列实现一个并发容器

发表于:2025-01-22 作者:千家信息网编辑
千家信息网最后更新 2025年01月22日,今天就跟大家聊聊有关JAVA中怎么利用阻塞队列实现一个并发容器,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。在并发编程中,有时候需要使用线程安
千家信息网最后更新 2025年01月22日JAVA中怎么利用阻塞队列实现一个并发容器

今天就跟大家聊聊有关JAVA中怎么利用阻塞队列实现一个并发容器,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两 种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。

阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  1. 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。

  2. 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

应用场景

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是 从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

插入和移除操作的4中处理方式

  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。

  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。

  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞消费者线程,直到队列不为空。

  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程 一段时间,如果超过了指定的时间,生产者线程就会退出。

注意: 如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永 远不会被阻塞,而且使用offer方法时,该方法永远返回true。

Java里的阻塞队列

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。

  • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。

  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。

  • DelayQueue:一个使用优先级队列实现的无界阻塞队列。

  • SynchronousQueue:一个不存储元素的阻塞队列。

  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列,在初始化时需要指定队列的长度。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,但是在初始化的队列的时候指定阻塞队列的公平性,如:ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);。它使用ReentrantLock来实现队列的线程安全。

核心属性

    /** The queued items */    final Object[] items;    /** items index for next take, poll, peek or remove */    int takeIndex;    /** items index for next put, offer, or add */    int putIndex;    /** Number of elements in the queue */    int count;    /** Main lock guarding all access */    final ReentrantLock lock;    /** Condition for waiting takes */    private final Condition notEmpty;    /** Condition for waiting puts */    private final Condition notFull;

核心方法

    public boolean offer(E e, long timeout, TimeUnit unit)        throws InterruptedException {        checkNotNull(e);        long nanos = unit.toNanos(timeout);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length) {                if (nanos <= 0)                    return false;                nanos = notFull.awaitNanos(nanos);            }            enqueue(e);            return true;        } finally {            lock.unlock();        }    }    private void enqueue(E x) {        // assert lock.getHoldCount() == 1;        // assert items[putIndex] == null;        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        notEmpty.signal();    }    public E poll(long timeout, TimeUnit unit) throws InterruptedException {        long nanos = unit.toNanos(timeout);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == 0) {                if (nanos <= 0)                    return null;                nanos = notEmpty.awaitNanos(nanos);            }            return dequeue();        } finally {            lock.unlock();        }    }    private E dequeue() {        // assert lock.getHoldCount() == 1;        // assert items[takeIndex] != null;        final Object[] items = this.items;        @SuppressWarnings("unchecked")        E x = (E) items[takeIndex];        items[takeIndex] = null;        if (++takeIndex == items.length)            takeIndex = 0;        count--;        if (itrs != null)            itrs.elementDequeued();        notFull.signal();        return x;    }

LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。出队和入队使用两把锁来实现。

核心属性

    /** The capacity bound, or Integer.MAX_VALUE if none */    private final int capacity;    /** Current number of elements */    private final AtomicInteger count = new AtomicInteger();    /**     * Head of linked list.     * Invariant: head.item == null     */    transient Node head;    /**     * Tail of linked list.     * Invariant: last.next == null     */    private transient Node last;    /** Lock held by take, poll, etc */    private final ReentrantLock takeLock = new ReentrantLock();    /** Wait queue for waiting takes */    private final Condition notEmpty = takeLock.newCondition();    /** Lock held by put, offer, etc */    private final ReentrantLock putLock = new ReentrantLock();    /** Wait queue for waiting puts */    private final Condition notFull = putLock.newCondition();

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证 同优先级元素的顺序。底层使用数组实现,默认初始容量是11,最大值是Integer.MAX_VALUE - 8。容量不够时会进行扩容

核心方法

    // 入队    private static  void siftUpComparable(int k, T x, Object[] array) {        Comparable key = (Comparable) x;        while (k > 0) {            int parent = (k - 1) >>> 1;            Object e = array[parent];            if (key.compareTo((T) e) >= 0)                break;            array[k] = e;            k = parent;        }        array[k] = key;    }    // 扩容    private void tryGrow(Object[] array, int oldCap) {        lock.unlock(); // must release and then re-acquire main lock        Object[] newArray = null;        if (allocationSpinLock == 0 &&            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,                                     0, 1)) {            try {                int newCap = oldCap + ((oldCap < 64) ?                                       (oldCap + 2) : // grow faster if small                                       (oldCap >> 1));                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow                    int minCap = oldCap + 1;                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)                        throw new OutOfMemoryError();                    newCap = MAX_ARRAY_SIZE;                }                if (newCap > oldCap && queue == array)                    newArray = new Object[newCap];            } finally {                allocationSpinLock = 0;            }        }        if (newArray == null) // back off if another thread is allocating            Thread.yield();        lock.lock();        if (newArray != null && queue == array) {            queue = newArray;            System.arraycopy(array, 0, newArray, 0, oldCap);        }    }

DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队 列中的元素必须实现Delayed接口和Comparable接口,在创建元素时可以指定多久才能从队列中获取当前元素。 只有在延迟期满时才能从队列中提取元素。

应用场景

  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。

  • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从 DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

如何实现Delayed接口

DelayQueue队列的元素必须实现Delayed接口。我们可以参考ScheduledThreadPoolExecutor 里ScheduledFutureTask类的实现,一共有三步。 第一步:在对象创建的时候,初始化基本数据。使用time记录当前对象延迟到什么时候可 以使用,使用sequenceNumber来标识元素在队列中的先后顺序。代码如下:

private static final AtomicLong sequencer = new AtomicLong(0);ScheduledFutureTask(Runnable r, V result, long ns, long period) {    ScheduledFutureTask(Runnable r, V result, long ns, long period){        super(r, result);        this.time = ns;        this.period = period;        this.sequenceNumber = sequencer.getAndIncrement();    }}

第二步:实现getDelay方法,该方法返回当前元素还需要延时多长时间,单位是纳秒,代码 如下:

public long getDelay(TimeUnit unit) {    return unit.convert(time - now(), NANOSECONDS);}

注意当time小于当前时间时,getDelay会返回负数,这时才可以出队。

第三步:实现compareTo方法来指定元素的顺序。例如,让延时时间最长的放在队列的末 尾。实现代码如下:

public int compareTo(Delayed other) {    if (other == this) // compare zero if same object        return 0;    if (other instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {        ScheduledThreadPoolExecutor.ScheduledFutureTask x = (ScheduledThreadPoolExecutor.ScheduledFutureTask)other;        // 过期时间小的排前面,大的排后面,如果一样就使用sequenceNumber 来排序。        long diff = time - x.time;        if (diff < 0)            return -1;        else if (diff > 0)            return 1;        else if (sequenceNumber < x.sequenceNumber)            return -1;        else            return 1;    }    // 快要过期的排在前面    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;}

如何实现延时阻塞队列

延时阻塞队列的实现很简单,当消费者从队列里获取元素时,如果元素没有达到延时时 间,就阻塞当前线程。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {    long nanos = unit.toNanos(timeout);    final ReentrantLock lock = this.lock;    lock.lockInterruptibly();    try {        for (;;) {            E first = q.peek();            if (first == null) {                if (nanos <= 0)                    return null;                else                    // 队列为NULL,阻塞线程直到超时                    nanos = available.awaitNanos(nanos);            } else {                long delay = first.getDelay(NANOSECONDS);                if (delay <= 0)                    return q.poll();                if (nanos <= 0)                    return null;                first = null; // don't retain ref while waiting                // 等待时间小于第一个元素的过期时间                if (nanos < delay || leader != null)                    // 阻塞线程直到超时                    nanos = available.awaitNanos(nanos);                else {                    Thread thisThread = Thread.currentThread();                    leader = thisThread;                    try {                        // 等待时间大于第一个元素的过期时间,阻塞线程直到第一个元素过期                        long timeLeft = available.awaitNanos(delay);                        nanos -= delay - timeLeft;                    } finally {                        if (leader == thisThread)                            leader = null;                    }                }            }        }    } finally {        if (leader == null && q.peek() != null)            // 唤醒其他阻塞线程            available.signal();        lock.unlock();    }}

SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作, 否则不能继续添加元素。 它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。

SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费 者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于 LinkedBlockingQueue和ArrayBlockingQueue。

LinkedTransferQueue

LinkedTransferQueue是一个由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻 塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。

transfer方法

如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法 时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等 待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返 回。

LinkedBlockingDeque

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以 从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队 时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、 获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双 端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于 removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First 和Last后缀的方法更清楚。

在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在"工作窃取"模式中。

看完上述内容,你们对JAVA中怎么利用阻塞队列实现一个并发容器有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

队列 元素 阻塞 方法 线程 消费 时间 消费者 生产者 生产 支持 结构 排序 双向 方式 优先级 场景 情况 接口 核心 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 软件开发部经理 寄语 软件开发和运维的相同点 元神怎样更换服务器 王者qq区哪个服务器名字好听 常见的数据库连接技术不包括 反恐精英服务器信息 公安机关关于网络安全的法律 网络技术与应用学习心得 更改登入的域控制器服务器 服务器只有一个网口亮 交换服务器 商品化的数据库管理系统 江西质量网络技术咨询产品 知名专业软件开发联系方式 山西大学校园网络安全设计方案 lwip服务器发送数据失败 pb怎么避免一直连数据库 wps表乱码怎么找回数据库 网络安全手抄报怎么画最难带字 sql数据库关系图怎么生成 万户网络技术有限公司陈世杰 虚拟现实软件开发技术 网络安全宣传主题可爱 计算机技术跟网络技术区别 坦克连接服务器 汉字拼音数据库 网络安全工程师下半年 女生学会计还是软件开发 海北州网络技术联系方式 数据库的名词怎么读
0