ConcurrentHashMap中addCount功能有二:
记录ConcurrentHashMap元素数量扩容ConcurrentHashMap我们说先仅分析记录ConcurrentHashMap元素数量的代码
下面的代码仅分析第一次调用时候
//ConcurrentHashMap.java //@Contended注解是false shared(伪共享)相关知识可以参考本文引用文章 @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } //仅在fullAddCount函数中初始化 private transient volatile CounterCell[] counterCells; private transient volatile long baseCount; /** * * @param x 新添加的元素数量 * @param check if <0, don't check resize, if <= 1 only check if uncontended * * x为1表示添加一个新元素,-1表示删除 ,check往往传入某个位置哈希桶的链表或者红黑树数量。 **/ private final void addCount(long x, int check) { CounterCell[] as; long b; long s; //程序第一次counterCells必然为空 (as = counterCells) != null 为false //!U.compareAndSwapLong(this...) cas 方式增加baseCount数量 //在没有竞争的情况下修改完baseCount返回函数 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //。。假设无竞争所以!U.compareAndSwapLong()返回false,因此函数返回 } //check >= 0进行扩容逻辑 if (check >= 0) { //先不分析 } }当多个线程同时调用!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)必然有一个线程会进入方法体。
fullAddCount初始化过程
// See LongAdder version for explanation private final void fullAddCount(long x, boolean wasUncontended) { int h; //ThreadLocalRandom初始化,关于这块可以看参考文章 if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; //第一次as必然为空 if ((as = counterCells) != null && (n = as.length) > 0) { //...略 } //如果counterCells为空 那么进行初始化counterCells数组 //cellsBusy == 0标识 当前表没有在扩容操作等 //counterCells == as 防止多线程 已经修改counterCells //U.compareAndSwapInt(this, CELLSBUSY, 0, 1) 标识正在对counterCells扩容或初始化 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { //初始化一个大小为2的数组 CounterCell[] rs = new CounterCell[2]; //h & 1得到不是0就是1,h是线程的一个序列值,可以简单理解为线程特有的一个id //当前线程得到一个特定位置的CounterCell[] rs下标然后放置添加元素数量 rs[h & 1] = new CounterCell(x); //赋值回成员属性 counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) //结束循环 break; } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }总结: 总结就是当存在多线程竞争的时候,第一次会初始化counterCells数组(容量为2,后续扩容也是 2的n次方),然后根据线程id计算出counterCells下标放入.
我们再来继续细看fullAddCount其他函数细节
private final void fullAddCount(long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { CounterCell[] as; CounterCell a; int n; long v; //假设我们counterCells已经初始化过 if ((as = counterCells) != null && (n = as.length) > 0) { //as[(n - 1) & h] 利用线程id计算在counterCells的数组位置。 //如果当前位置为没有构造过CounterCell,那么近进入然后构造一个 if ((a = as[(n - 1) & h]) == null) { //cellsBusy == 0 表示: //没有正在进行扩容CounterCell数组或者初始化CounterCell,或者正在插入新节点 if (cellsBusy == 0) { // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create //CAS设置标志位cellsBusy if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; //再次检查是否符合条件,符合就创建放入然后break结束当前函数 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } //可能因为其他线程插入了或者更新了数组对象,导致当前线程失败 collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //如果当前线程对应CounterCell数组下标存在对应的实例,那么直接用该实例的value相加即可 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; else if (counterCells != as || n >= NCPU)//counterCells数组已经更新 collide = false; // At max size or stale else if (!collide)//当前线程插入时,因为其他某种原因失败失败,重置标志重来 collide = true; else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //除了当前表被更新以外的其他原因来到这里那么进行扩容数组。 try { //这里扩容是由于哈希碰撞以上的else if 都没有解决 所以doug lea 再次扩容降低 并发所带来的自旋次数 if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //初始化counterCells数组并返回 上文已经分析略 } else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }综上fullAddCount的作用就是构造一个CounterCell数组,或者构造CounterCell类型并在实例中添加插入的元素的数量,最终将实例放入线程对应数组下标。
所以我们最终发现ConcurrentHashMap元素数量其实等于baseCount+CounterCell数组元素存储的数值
我们把上面分析的全部写到下面代码中
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //if 作用1: 如果当前counterCells为空那么证明没有竞争,直接利用baseCount累加元素和 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; //if 作用1:如果当前as为空证明多线程导致baseCount赋值失败,所以需要初始化一个counterCells数组进行统计 //if 作用2:如果当前as不为空,且当前线程对应as数组存储的对象不为空,那么直接用实例对象存储的数值加上当前插入的数据累加和即可 //if 作用3:如果当前as不为空,但是对应线程数组下标实例为空,那么调用fullAddCount构造一个实例再放入新加入的元素个数 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } //很少情况check传入小于1,如果传入了那么证明不需要检查扩容,直接返回就好 if (check <= 1) return; //sumcount返回当前ConcurrentHashMap数量 s = sumCount(); } if (check >= 0) { //..扩容略 } }扩容算法必须首先理解ConcurrentHashMap#sizeCtl作用
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. * 用于表示table的初始化和扩容控制状态。 * 如果为负数: * 表示被正初始化或正在扩容。 * -1: 表示正在初始化 * -x: 高15位表示(扣除符号位):旧容器的最高为1前面有多少个0的数字。1000 0000 0000 1000..... * 表示旧容器 大小为8388608(二进制位0000 0000 1000...),是不是8388608的二进制前面有8个0? * 低16位表示:-(1+正在扩容的线程) 比如-4表示有三个线程正在帮助扩容。如 ...1000 表示十进制的8,于是表示有7个线程在扩容。参考 引用[3]有具体说明 * 为正数: * table为null表示创建table时数组大小。 * 如果table不为null表示:扩容阈值(一般是数组长度乘上扩容系数),当前存储的元素数量大于阈值时触发扩容 * */ private transient volatile int sizeCtl; private final void addCount(long x, int check) { CounterCell[] as; long b, s; //... //...计算ConcurrentHashMap元素大小代码略 //略 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //s表示表的大小 //s >= (long)(sc = sizeCtl) 当前表的大小必须大于等于阈值 //(tab = table) != null 表不为空 //(n = tab.length) < MAXIMUM_CAPACITY 表容量当前小于最大扩容上限 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //resizeStamp返回的rs低15位表示当前容量n扩容标识,用于表示是对n的扩容。(在数值不直接表示,而是表示n前面最高位为1前面有多少个0) //resizeStamp返回的rs的从低位开始数的第16位一定为1(比如说0000 0000 0000 0000 1000 0000 0000 0000) //resizeStamp具体看引用文章[3] int rs = resizeStamp(n); //小于0 证明正在扩容或者正在初始化 if (sc < 0) { //sc >>> RESIZE_STAMP_SHIFT) != rs 表示当前sc和resizeStamp计算返回是否一致.rs具体可以查看引用[3] . // sc == rs + 1 注意代码错误在jdk12修正(sc >>> RESIZE_STAMP_SHIFT) == rs + 1 ,此处主要是第二次循环的时候 有可能出现相等,也就是执行完 else if (U.compareAndSwapInt(...))第二次循环,因为第一次进入elseif时会改变sizeCtl.此处判断是如果相等证明扩容完毕 // sc == rs + MAX_RESIZERS 注意代码错误在jdk12修正(sc >>> RESIZE_STAMP_SHIFT)== rs + MAX_RESIZERS if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //SIZECTL设置当前有一个线程正在扩容 //rs 左移动16个字节(RESIZE_STAMP_SHIFT),一定为负数 ,第16位数一定都是0,+2表示有一个线程再扩容 // else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }