BlockingQueue接口及ArrayBlockingQueue实现类的方法
这篇文章主要介绍"BlockingQueue接口及ArrayBlockingQueue实现类的方法"的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇"BlockingQueue接口及ArrayBlockingQueue实现类的方法"文章能帮助大家解决问题。
队列是一种 FIFO
(先进先出)的数据结构,本文要讲的 BlockingQueue
也是一种队列,而且强调了线程安全的特性。
BlockingQueue
全称:java.util.concurrent.BlockingQueue
。它是是一个线程安全的队列接口,多个线程能够以并发的方式从队列中插入数据,取出数据的同时不会出现线程安全的问题。
生产者和消费者例子
BlockingQueue
通常用于消费者线程向队列存入数据,消费者线程从队列中取出数据,具体如下
生产者线程不停的向队列中插入数据,直到队列满了,生产者线程被阻塞
消费者线程不停的从队列中取出数据,直到队列为空,消费者线程被阻塞
(推荐教程:Java教程)
BlockingQueue 方法
BlockingQueue
提供 4 种不同类型的方法用于插入数,取出数据以及检查数据,具体如下
操作失败,抛出异常
无论成功/失败,立即返回
true/false
如果队列为空/满,阻塞当前线程
如果队列为空/满,阻塞当前线程并有超时机制插入
add(o)
offer(o)
put(o)
offer(o, timeout, timeunit)
取出remove(o)
poll()
take()
poll(timeout, timeunit)
检查element()
peek()
BlockingQueue 的具体实现类
BlockingQueue
只是一个接口,在实际开发中有如下的类实现了该接口。
ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue
PriorityBlockingQueue
SynchronousQueue
ArrayBlockingQueue 的使用
这里以 BlockingQueue
接口的具体实现类 ArrayBlockingQueue
举例。通过 ArrayBlockingQueue
实现一个消费者和生产者多线程模型。
核心内容如下:
以
ArrayBlockingQueue
作为生产者和消费者的数据容器通过
ExecutorService
启动 3 个线程,2 两个生产者,1 个消费者指定数据总量
生产者线程
ArrayBlockingQueueProducer
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.atomic.AtomicInteger;/** * 生产者线程向容器存入指定总量的 任务 * */public class ArrayBlockingQueueProducer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueProducer.class); // 容器 private ArrayBlockingQueuequeue; // 生产指定的数量 private AtomicInteger numberOfElementsToProduce; public ArrayBlockingQueueProducer(ArrayBlockingQueue queue, AtomicInteger numberOfElementsToProduce) { this.queue = queue; this.numberOfElementsToProduce = numberOfElementsToProduce; } @Override public void run() { try { while (numberOfElementsToProduce.get() > 0) { try { // 向队列中存入任务 String task = String.format("task_%s", numberOfElementsToProduce.getAndUpdate(x -> x-1)); queue.put(task); logger.info("thread {}, produce task {}", Thread.currentThread().getName(), task); // 任务为0,生产者线程退出 if (numberOfElementsToProduce.get() == 0) { break; } } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { logger.error(this.getClass().getName().concat(". has error"), e); } }}
消费者线程
ArrayBlockingQueueConsumer
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.atomic.AtomicInteger;/** * 消费者线程向容器 消费 指定总量的任务 * */public class ArrayBlockingQueueConsumer implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ArrayBlockingQueueConsumer.class); private ArrayBlockingQueuequeue; private AtomicInteger numberOfElementsToProduce; public ArrayBlockingQueueConsumer(ArrayBlockingQueue queue, AtomicInteger numberOfElementsToProduce) { this.queue = queue; this.numberOfElementsToProduce = numberOfElementsToProduce; } @Override public void run() { try { while (!queue.isEmpty() || numberOfElementsToProduce.get() >= 0) { // 从队列中获取任务,并执行任务 String task = queue.take(); logger.info("thread {} consume task {}", Thread.currentThread().getName(),task); // 队列中数据为空,消费者线程退出 if (queue.isEmpty()) { break; } } } catch (Exception e) { logger.error(this.getClass().getName().concat(". has error"), e); } }}
测试TestBlockingQueue
import com.ckjava.synchronizeds.appCache.WaitUtils;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.atomic.AtomicInteger;/** * 1. 以 ArrayBlockingQueue 作为生产者和消费者的数据容器
* 2. 通过 ExecutorService 启动 3 个线程,2 两个生产者,1 个消费者
* 3. 指定数据总量 */public class TestBlockingQueue { public static void main(String[] args) { ArrayBlockingQueuearrayBlockingQueue = new ArrayBlockingQueue<>(10); /*BlockingQueue delayQueue = new DelayQueue(); BlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<>(10); BlockingQueue priorityBlockingQueue = new PriorityBlockingQueue<>(10); BlockingQueue synchronousQueue = new SynchronousQueue<>();*/ ExecutorService executorService = Executors.newFixedThreadPool(3); // 最多生产 5 个数据 AtomicInteger numberOfElementsToProduce = new AtomicInteger(5); // 2 个生产者线程 executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce)); executorService.submit(new ArrayBlockingQueueProducer(arrayBlockingQueue, numberOfElementsToProduce)); // 1 个消费者线程 executorService.submit(new ArrayBlockingQueueConsumer(arrayBlockingQueue, numberOfElementsToProduce)); executorService.shutdown(); WaitUtils.waitUntil(() -> executorService.isTerminated(), 1000L); }}
输出如下:
13:54:17.884 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_513:54:17.884 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_513:54:17.884 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_413:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_413:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_213:54:17.887 [pool-1-thread-1] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-1, produce task task_313:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_313:54:17.887 [pool-1-thread-2] INFO c.c.b.ArrayBlockingQueueProducer - thread pool-1-thread-2, produce task task_113:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_213:54:17.887 [pool-1-thread-3] INFO c.c.b.ArrayBlockingQueueConsumer - thread pool-1-thread-3 consume task task_1
关于"BlockingQueue接口及ArrayBlockingQueue实现类的方法"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注行业资讯频道,小编每天都会为大家更新不同的知识点。