多线程并发编程——阻塞队列(简要)

    科技2024-11-29  15

    文章目录

    一、阻塞队列概述二、ArrayBlockingQueue三、LinkedBlockingQueue三、DelayQueue四、SynchronusQueue

    在本篇文章中,我们将会对 Java 并发中的同步(阻塞)队列做基本的简要总结。

    虽然在高并发情况下,我们也并不会真的手动使用这些容器来存放数据,但是高并发情况下我们一定都是会使用线程池的,而不同的线程池,其底层对于任务的处理都是依赖于不同的同步队列器来实现的,所以对于阻塞队列器的学习,最起码有一个简单的了解还是很有必要的。

    由于笔者目前还是在校学生,还没有能力把各个阻塞队列的底层实现(数据结构、算法、增删改方法等)详细的总结出来,但是相信看完本篇文章,对于面试中可能问到的有关阻塞队列的相关问题,我们还是能 Hold 住的。除此之外,总结这篇文章的目的,更多还是为了后面的线程池做前置知识铺垫,相信看完这篇文章,再来学习有关线程池的底层实现,能够起到事半功倍的效果。

    一、阻塞队列概述

    整体集合框架图:

    希望提到容器,我们都能在脑海中构建出这张结构图

    阻塞队列与我们平常接触的普通队列(LinkedList或ArrayList等)的最大不同点,在于阻塞队列支持阻塞添加和阻塞删除方法。

    阻塞添加 所谓的阻塞添加是指当阻塞队列元素已满时,队列会阻塞加入元素的线程,直队列元素不满时才重新唤醒线程执行元素加入操作。阻塞删除 阻塞删除是指在队列元素为空时,删除队列元素的线程将被阻塞,直到队列不为空再执行删除操作(一般都会返回被删除的元素)

    由于Java中的阻塞队列接口BlockingQueue继承自Queue接口,因此先来看看阻塞队列接口为我们提供的主要方法

    public interface BlockingQueue<E> extends Queue<E> { //将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量) //在成功时返回 true,如果此队列已满,则抛IllegalStateException。 boolean add(E e); //将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量) //将指定的元素插入此队列的尾部,如果该队列已满, //则在到达指定的等待时间之前等待可用的空间,该方法可中断 boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //将指定的元素插入此队列的尾部,如果该队列已满,则一直等到(阻塞)。 void put(E e) throws InterruptedException; //获取并移除此队列的头部,如果没有元素则等待(阻塞), //直到有元素将唤醒等待线程执行该操作 E take() throws InterruptedException; //获取并移除此队列的头部,在指定的等待时间前一直等到获取元素, //超过时间方法将结束 E poll(long timeout, TimeUnit unit) throws InterruptedException; //从此队列中移除指定元素的单个实例(如果存在)。 boolean remove(Object o); } //除了上述方法还有继承自Queue接口的方法 //获取但不移除此队列的头元素,没有则跑异常NoSuchElementException E element(); //获取但不移除此队列的头;如果此队列为空,则返回 null。 E peek(); //获取并移除此队列的头,如果此队列为空,则返回 null。 E poll();

    这里我们把上述操作进行分类

    插入方法: add(E e) : 添加成功返回true,失败抛IllegalStateException异常offer(E e) : 成功返回 true,如果此队列已满,则返回 false。put(E e) :将元素插入此队列的尾部,如果该队列已满,则一直阻塞 删除方法: remove(Object o) :移除指定元素,成功返回true,失败返回falsepoll() : 获取并移除此队列的头元素,若队列为空,则返回 nulltake():获取并移除此队列头元素,若没有元素则一直阻塞。 检查方法 element() :获取但不移除此队列的头元素,没有元素则抛异常peek() :获取但不移除此队列的头;若队列为空,则返回 null。

    未加粗的方法是从 Queue 继承而来,功能和用法与我们常用的容器相同。重点理解 put() 方法和 take() 方法,这两个方法是 BlockingQueue 的定义,由具体子类实现的,真正实现了阻塞的方法

    面试常问:Queue 和 list 有什么区别?

    Queue实现了很多多线程友好的API方法(offer、peek、poll)等;Queue接口有很多实现子类(主要ArrayBlockingQueue、LinkedBlockingQueue等),都是线程友好的。这些实现子类重点实现了两个阻塞方法——put()、take()。这两个阻塞方法天生适合用于生产者消费者模型,而多线程情况下,生产者消费者模型是最重要的模型,如MQ,Redis等都用到了生产者消费者模型。

    阻塞队列的对元素的增删查操作主要就是上述的三类方法,通常情况下我们都是通过这3类方法操作阻塞队列,了解完阻塞队列接口 BlockingQueue 的基本方法后,下面我们就来看一下具体的子类实现

    二、ArrayBlockingQueue

    ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,其内部按先进先出的原则对元素进行排序,其中put()方法和take()方法为添加和删除的阻塞方法

    有点需要注意的是ArrayBlockingQueue内部的阻塞队列是通过重入锁ReenterLock和Condition条件队列实现的,所以ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序

    创建公平与非公平阻塞队列代码如下:

    //默认非公平阻塞队列 ArrayBlockingQueue queue = new ArrayBlockingQueue(2); //公平阻塞队列 ArrayBlockingQueue queue1 = new ArrayBlockingQueue(2,true); //构造方法源码 public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }

    三、LinkedBlockingQueue

    LinkedBlockingQueue是一个底层由链表实现的无界阻塞队列,其大小默认值为Integer.MAX_VALUE,所以我们在使用LinkedBlockingQueue时建议手动传值,使其提供我们所需的大小,避免队列过大造成机器负载或者内存爆满等情况

    LinkedBlockingQueue是一个基于链表的阻塞队列,其内部维持一个基于链表的数据队列,实际上我们对LinkedBlockingQueue的API操作都是间接操作该数据队列

    BlockingQueue 天生就是实现了生产者-消费者模型。下面我们来模拟一个生产者和五个消费者的模型:

    public class TestLinkedBlockingQueue { //使用阻塞队列完成生产者-消费者,队列任务容量最多为 50 static BlockingQueue<String> strs = new LinkedBlockingQueue<>(50); static Random r = new Random(); public static void main(String[] args) { //一个生产者,共生产100个任务 new Thread(() -> { for (int i = 0; i < 100; i++) { try { //如果任务队列满了,就会等待消费者线程消费 strs.put("a" + i); //每次生产一个任务就随机睡一会儿 TimeUnit.MILLISECONDS.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } }, "p1").start(); //共5 for (int i = 0; i < 5; i++) { new Thread(() -> { for (;;) { try { //如果任务队列空了,就会阻塞等待生产者生产 System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }, "c" + i).start(); } } }

    运行结果:5 个消费者能够随机消费掉生产者生产的任务

    c0 take -a0 c0 take -a1 c1 take -a2 c2 take -a3 c3 take -a4 c4 take -a5 ....

    从源码看,有三种方式可以构造LinkedBlockingQueue,通常情况下,我们建议创建指定大小的LinkedBlockingQueue阻塞队列。LinkedBlockingQueue队列也是按 FIFO(先进先出)排序元素。队列的头部是在队列中时间最长的元素,队列的尾部 是在队列中时间最短的元素,新元素插入到队列的尾部,而队列执行获取操作会获得位于队列头部的元素。在正常情况下,链接队列的吞吐量要高于基于数组的队列(ArrayBlockingQueue),因为其内部实现添加和删除操作使用的两个ReenterLock来控制并发执行,而ArrayBlockingQueue内部只是使用一个ReenterLock控制并发,因此LinkedBlockingQueue的吞吐量要高于ArrayBlockingQueue。

    三、DelayQueue

    特殊用途:实现按照优先级处理任务

    按照在队列中等待的时间进行排序,可以实现按时间进行任务调度。一般用来实现按时间进行任务调度的功能。其底层数据结构是优先级队列 PriorityQueue(小根堆)

    在向 DelayQueue 存放任务的时候,要求任务必须实现 Delay 接口,

    public class T07_DelayQueue { static BlockingQueue<MyTask> tasks = new DelayQueue<>(); static Random r = new Random(); static class MyTask implements Delayed { String name; long runningTime; MyTask(String name, long rt) { this.name = name; this.runningTime = rt; } @Override public int compareTo(Delayed o) {//自定义比较策略—谁紧迫谁先执行 if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) return -1; else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1; else return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString() { return name + " " + runningTime; } } public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); //以下根据紧迫程度,排序为t5 -> t1 -> t3 -> t2 -> t4 MyTask t1 = new MyTask("t1", now + 1000); MyTask t2 = new MyTask("t2", now + 2000); MyTask t3 = new MyTask("t3", now + 1500); MyTask t4 = new MyTask("t4", now + 2500); MyTask t5 = new MyTask("t5", now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0; i<5; i++) { //输出顺序为:t5 -> t1 -> t3 -> t2 -> t4 System.out.println(tasks.take()); } } }

    任务存放顺序:t1 -> t2 -> t3 -> t4 -> t5

    任务执行顺序:按照紧迫程度排序(时间长短) t5 -> t1 -> t3 -> t2 -> t4

    四、SynchronusQueue

    特殊应用场景:完成两个线程间的通信,用于两个线程间直接交换数据

    SynchronusQueue 的特点是容量为 0,所以它的功能并不是为了存放内容的,而是用来让一个线程给另一个线程下达任务的。

    用 SynchronusQueue完成一个线程给另一个线程传送数据的场景,实现很简单,且效率很高。

    SynchronusQueue看起来很没用,但是在整个线程池中它的用处是最多的,很多线程互相间进行任务调度的时候,都是用的SynchronusQueue


    关联文章:

    多线程—Java内存模型与线程

    多线程——Volatile 关键字详解

    多线程——线程安全及实现机制

    多线程——深入剖析 Synchronized

    多线程\并发编程——ReentrantLock 详解

    多线程/并发编程——CAS、Unsafe及Atomic

    多线程/并发编程——两万字详解AQS

    多线程/并发编程——同步工具类(CountDownLatch、Semaphore、ReadWriteLock、CyclicBarrier )

    多线程/并发编程——面试再也不怕 ThreadLocal 了

    Processed: 0.029, SQL: 8