并发包源码解读——ConcurrentLinkedQueue

    科技2025-08-29  14

    并发包源码解读——ConcurrentLinkedQueue


    ​ 先说点题外话,安利一波看jdk源码的好处,如果你不想看,略过也可

    ​ 看源码真的对咱们程序员很有帮助。大家可能觉得看源码就是为了精通内部机制、然后会用,或者就是为了面试。但其实不是的,就我自身而言,我刚接触源码,看ArrayList的时候也是非常非常的慢,很多地方不知道大神写的什么意思,他们写这块逻辑的时候是怎么考虑的,可能得边看边存疑,然后回头再看一遍又一遍,又枯燥又没有成就感,哪有自己写一个页面或者一个后台功能来的爽,像极了当年高考做英语阅读的时候,因为看过的东西太少。但当你慢慢地啃下了第一篇源码,脑子里有了一个大致的原理,再看第二篇,你会发现有很多逻辑跟其他源码一样,能看懂大部分,又能学到一些新东西,再看第三篇、第四篇,良性循环,脑子里的知识越来越多,其他的新知识学起来也会更快

    ​ 虽然第一步很难很难,但是只要你迈出了这一步,后面的道路肯定会越来越自信的

    下面我们进入正题,ConcurrrentLinkedQueue是一个支持并发的高性能排队队列,我们以一个简单的例子开始

    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); queue.offer(""); queue.poll();

    1、构造函数

    很简单,在构造函数中初始化了首尾节点

    public ConcurrentLinkedQueue() { head = tail = new Node<E>(null); }

    节点的构造函数就是初始化节点的值

    所以ConcurrentLinkedQueue初始化了值为空的首尾节点

    private static class Node<E> { volatile E item; volatile Node<E> next; Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node<E> cmp, Node<E> val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }

    2、入队——offer

    public boolean offer(E e) { //初始化一个值为e的节点 checkNotNull(e); final Node<E> newNode = new Node<E>(e); //这是一个尾节点的自旋, for (Node<E> t = tail, p = t;;) { Node<E> q = p.next; //如果尾节点没有后继,说明没有并发入队,就把尾节点的后继指向当前入队的e if (q == null) { if (p.casNext(null, newNode)) { //p原来是尾节点,上次自旋失败后p变成了他的后继,说明后继变了但是尾节点还没更新 //那就先把两个后继连起来,然后再更新尾节点 if (p != t) casTail(t, newNode); return true; } } //这其实是出队时造成的现象,说明当前节点已经出队了,出队的头节点的后继都会指向自己 //虽然尾节点出队了,但是并不表示尾节点出队后,其他节点没有入队,所以只能从头开始找 else if (p == q) p = (t != (t = tail)) ? t : head; else //如果其他线程还没来的及改尾节点,把p赋值成他的后继q,然后重新尝试入队 //如果尾节点已经被改了,那就给p重新赋值尾节点 p = (p != t && t != (t = tail)) ? t : q; } }

    3、出队——poll

    public E poll() { //这又是一个自旋 restartFromHead: for (;;) { for (Node<E> h = head, p = h, q;;) { E item = p.item; //cas出队,把头节点的值对象置为null if (item != null && p.casItem(item, null)) { //p原来是头节点,上一次自旋失败后变成了头节点的后继(第二个节点) //头节点已经变了,要更新头节点,如果头节点还有后继就更新成后继,没有后继正好p的值对象item也是null if (p != h) updateHead(h, ((q = p.next) != null) ? q : p); return item; } //cas失败,说明并发出队改变了头节点 //如果头节点没有后继,说明队列里没有节点了,首尾节点相同是个值对象为null的节点,那就更新头节点为p else if ((q = p.next) == null) { updateHead(h, p); return null; } //(先看下面的else,因为这是第一次自旋进不来的)如果头节点的后继就是他自己,说明该重新取头节点了 else if (p == q) continue restartFromHead; //把头节点p置为他的后继q,然后在下一次循环时再尝试一次出队(下次尝试出队的就是后继q,因为并发出队,头节点已经被别的线程出队了),如果还没出队就跳到外层循环,重新取头节点 else p = q; } } }

    这个updateHead需要注意一下

    他的作用是更新头节点,但包含的操作不仅仅是cas,还有个lasySetNext方法用来将老的头节点的后继指向自己,这样对并发入队出队同一个节点有区分作用

    final void updateHead(Node<E> h, Node<E> p) { if (h != p && casHead(h, p)) h.lazySetNext(h); } void lazySetNext(Node<E> val) { UNSAFE.putOrderedObject(this, nextOffset, val); }
    Processed: 0.122, SQL: 8