ConcurrentHashMap1.7和1.8对比
数据结构
1.7中采用Segment
+HashEntry
的方式实现
ConcurrentHashMap
初始化时,计算出Segment
数组的大小ssize
和每个Segment
中HashEntry
数组的大小cap
,并初始化Segment
数组的第一个元素;
其中ssize
大小为2的幂次方,默认为16,cap
大小也是2的幂次方,最小值为2,最终结果根据初始化容量initialCapacity
进行计算,计算过程如下
if (c * ssize < initialCapacity) ++c;int cap = MIN_SEGMENT_TABLE_CAPACITY;while (cap < c) cap <<= 1;
因为Segment
继承了ReentrantLock,所有segment是线程安全的
1.8中放弃了Segment
分段锁的设计,使用的是Node
+CAS
+Synchronized
来保证线程安全性
只有在第一次执行put
方法是才会初始化Node
数组
put操作
1.7 put
当执行put
方法插入数据的时候,根据key的hash值,在Segment
数组中找到对应的位置
如果当前位置没有值,则通过CAS进行赋值,接着执行Segment
的put
方法通过加锁机制插入数据
假如有线程AB同时执行相同Segment
的put
方法
线程A 执行
tryLock
方法成功获取锁,然后把HashEntry
对象插入到相应位置线程B 尝试获取锁失败,则执行
scanAndLockForPut()
方法,通过重复执行tryLock()
方法尝试获取锁在多处理器环境重复64次,单处理器环境重复1次,当执行
tryLock()
方法的次数超过上限时,则执行lock()
方法挂起线程B当线程A执行完插入操作时,会通过
unlock
方法施放锁,接着唤醒线程B继续执行
1.8 put
当执行put
方法插入数据的时候,根据key的hash值在Node
数组中找到相应的位置
如果当前位置的Node
还没有初始化,则通过CAS插入数据
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果当前位置的`Node`还没有初始化,则通过CAS插入数据 if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin}
如果当前位置的Node已经有值,则对该节点加synchronized
锁,然后从该节点开始遍历,直到插入新的节点或者更新新的节点
if (fh >= 0) { binCount = 1; for (Nodee = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { pred.next = new Node (hash, key, value, null); break; } }}
如果当前节点是TreeBin
类型,说明该节点下的链表已经进化成红黑树结构,则通过putTreeVal
方法向红黑树中插入新的节点
else if (f instanceof TreeBin) { Nodep; binCount = 2; if ((p = ((TreeBin )f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } }
如果binCount
不为0,说明put
操作对数据产生了影响,如果当前链表的节点个数达到了8个,则通过treeifyBin
方法将链表转化为红黑树
size 操作
1.7 size实现
统计每个segment
对象中的元素个数,然后进行累加
但是这种方式计算出来的结果不一定准确
因为在计算后面的segment
的元素个数时
前面计算过了的segment
可能有数据的新增或删除
计算方式为:
先采用不加锁的方式,连续计算两次
如果两次结果相等,说明计算结果准确
如果两次结果不相等,说明计算过程中出现了并发新增或者删除操作
于是给每个
segment
加锁,然后再次计算
try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segmentseg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; }} finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); }}
1.8 size实现
使用一个volatile
类型的变量baseCount
记录元素的个数
当新增或者删除节点的时候会调用,addCount()
更新baseCount
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount();}
初始化时counterCells
为空
在并发量很高时,如果存在两个线程同时执行CAS
修改baseCount
值
则失败的线程会继续执行方法体中的逻辑
使用CounterCell
记录元素个数的变化
如果CounterCell
数组counterCells
为空
调用fullAddCount()
方法进行初始化
并插入对应的记录数,通过CAS
设置cellsBusy字段
只有设置成功的线程才能初始化CounterCell
数组
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break;}
如果通过CAS
设置cellsBusy字段失败的话
则继续尝试通过CAS
修改baseCount
字段
如果修改baseCount
字段成功的话,就退出循环
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break;
否则继续循环插入CounterCell
对象;
所以在1.8中的size
实现比1.7简单多,因为元素个数保存baseCount
中,部分元素的变化个数保存在CounterCell
数组中,实现如下:
public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);}final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum;}
通过累加baseCount
和CounterCell
数组中的数量,即可得到元素的总个数;