阻塞队列ArrayBlockingQueue源码学习

    科技2025-05-22  37

    ArrayBlockingQueue总结

    先直接总结ArrayBlockingQueue相关的特性再根据源码来进行说明,它的主要特性如下:

    他是一个由数组实现的FIFO有界阻塞队列,数组由final修饰;ArrayBlockingQueue有界且固定,在构造函数时必须指定大小,确认后不支持改变(确定数组长度且不可变);默认在多线程环境下不保证"公平性";通过 ReentrantLock 与 Condition 实现线程安全;

    重要属性介绍

    ArrayBlockingQueue的属性还是比较简单,首先是存放元素的Object数组,它是final修饰的,因此它无法扩容或者缩容,这也就是ArrayBlockingQueue的长度不可变的原因。 然后三个int属性分别表示下次获取数据时应该从数组的哪里获取,下次保存数据时应该保存到数组的哪里, count 记录着还有多少个可以拿。 所有方法首先都必须获取到 lock 锁, lock 有公平与非公平锁,默认实现的是非公平锁,也可以在初始化ArrayBlockingQueue指定,所以默认ArrayBlockingQueue并不保证公平性。

    公平与非公平我在这里额外解释一下: 当队列满了的时候,如果有线程要往其中添加元素,这些线程就会阻塞,而一旦队列中有元素出队了,线程们就可以向其中添加元素了,如果是非公平锁,那么有可能后拿到锁的线程会先往队列中添加元素,如果是公平锁的话,一定是先来后到的,先拿到锁的线程一定可以先向队列中添加元素。 这里简单介绍了属性的作用,接下来会通过源码再来理解它们的作用。

    最关键的两个私有方法enqueue和dequeue

    enqueue 方法用来把数据保存到数组items中,会递增 putIndex ,也就是下次应该保存的位置,如果 putIndex 等于了数组的长度,则下次为0。最后会唤醒那些调用 notEmpty.await() 阻塞的线程,实际上只有 take 方法调用了notEmpty.await()。 dequeue 方法是获取数据,获取的是 takeIndex 处的数据,在获取过后会把items[takeIndex]处设置为null,同样递增 takeIndex 后如果等于了数组的长度则会被置为0。最后会唤醒那些调用 notFull.await() 阻塞的线程,只有 put 方法调用了notFull.await()。

    由于ArrayBlockingQueue的底层数组无法扩容,所以主要变化在于所以主要的变化在于 putIndex 和 takeIndex,实际上 putIndex 和 takeIndex 可以理解为是两个指针,一个指向队尾,一个指向队首。 大家可能担心 putIndex 走完一轮之后,如果继续添加元素越过 takeIndex 会导致数据被覆盖,或者takeIndex 走的比 putIndex 快导致获取到null值。 实际上ArrayBlockingQueue公共的方法都会对 enqueue 和 dequeue 加以保护,比如所有方法在调用 enqueue 之前一定会先判断 count == item.length ,调用 dequeue 之前一定会先判断 count == 0

    添加元素方法介绍

    ArrayBlockingQueue提供了4个添加方法add、offer、offer(指定阻塞时间)、put; add方法调用的是offer方法,而offer方法先获取到锁,再判断队列是否已满,已满直接返回false,没有满则调用enqueue保存数据。所以add与offer没有阻塞。

    offer还有一个可以在队列已满情况下阻塞指定时间(timeout)在尝试保存的方法,在判断队列已满的情况会调用 nanos = notFull.awaitNanos(nanos) 阻塞线程,一定时间后如果队列还是满的则会返回false。

    put获取到锁,如果数组已满则调用 notFull.await 一直阻塞等待唤醒,唤醒后再次验证是否已满,没满则调用enqueue,否则再次阻塞。

    所以向ArrayBlockingQueue中加元素只有put 方法才真正的阻塞,保证添加成功,其他方法会可能添加失败。

    获取元素方法介绍

    获取数据提供了poll、poll(指定阻塞时间)、take、peek poll方法先获取到锁,如果 count==0 则返回null,否则调用 dequeue 方法获取结果。

    poll(指定阻塞时间)在判断 count == 0 时会先阻塞一段时间,等待阻塞时间结束后再次判断,如果还是 count == 0 则返回null,否则调用dequeue方法获取结果。

    take方法先获取到锁然后在循环判断count是否等于0,如果等于0则调用notEmpty.await()阻塞,否则调用dequeue获取结果,同样take方法也会保证一定能拿到数据,否则会一直阻塞。

    peek是偷看的意思,peek方法在获取到锁后直接获取item[takeIndex]的元素返回,然后不做任何事情,就好像偷偷看下下一次会获取哪一个元素,但是不会删除队首的元素。

    总结

    ArrayBlockingQueue还有一些其他次要的方法,就不再一一说明了,ArrayBlockingQueue通过两个索引来保存和拉取数据,并且实现了先进先出的特性。虽然数组的长度是固定有限的,但是通过循环使用数组,也能通过他同步无限的数据。 分析方法发现只有take方法和put方法才能保证真正获取或者保存数据成功,如果没有成功则会一直阻塞,其他方法则不一定。

    Processed: 0.011, SQL: 8