理解 ConcurrentHashMap

本人零基础转码中,以下内容都是根据视频或者博客学习的笔记,如果您发现了其中的错误,恳请指出,谢谢~

ConcurrentHashMap的实现原理

ConcurrentHashMap 在 JDK1.7 和 JDK1.8 的实现方式是不同的

JDK1.7下的ConcurrentHashMap

实现原理是数组加链表,jdk1.7下,ConcurrentHashMap由segment 数组结构和 hashEntry 数组结构构成的,每个hashEntry相当于HashMap 中的table数组,即 ConcurrentHashMap 把哈希桶数组切分成小数组(Segment ),每个小数组有 n 个 HashEntry 组成。

每个segment配备一把锁,当一个线程访问其中一段数据时,就可以给这段segment 进行上锁,这样在保证segment锁住的同时而不影响其他线程访问其他的segment,实现了真正的并发访问。

Segment 是 ConcurrentHashMap 的一个内部类,主要的组成如下:Segment继承了ReentrantLock,因此Segment 是一种可重入锁,Segment 的默认值为16,即并发度为16。

 存放元素的HashEntry也是一个静态内部类,其中用volatile 修饰了  HashEntry 的数据 value 和 下一个节点 next,保证了多线程环境下数据获取时的可见性

JDK1.8下的ConcurrentHashMap

 JDK1.8 中的ConcurrentHashMap 选择了与 HashMap 相同的Node数组+链表+红黑树结构;在锁的实现上,抛弃了原有的 Segment 分段锁,采用CAS + synchronized实现更加细粒度的锁。

1.8下,ConcurrentHashMap将锁的级别控制在了更细粒度的哈希桶级别,只要锁住这个桶位置的头节点,就不会影响其他桶数组元素的读写,大大提升了并发度。

 JDK1.8 中为什么使用内置锁 synchronized替换 可重入锁 ReentrantLock?

  1. 在jdk1.6中,对synchronized锁的实现进行了大量的优化,支持多种锁状态,会从无锁->偏向锁->轻量级锁->重量级锁一步步的升级转换。
  2. 锁粒度降低,我理解是并发时需要争抢同一把锁的概率大大降低了,这种情况下ReentrantLock 优势也不一定很明显。
  3. 减少内存的开销,假设使用可重入锁来获得同步支持,那么每个节点都需要继承AQS来获得同步支持,但是实际上,只有链表的头节点或者红黑树的根节点才需要加锁,带来了巨大的内存浪费。

ConcurrentHashMap的源码解析

JDK1.7下的ConcurrentHashMap的源码

1、初始化:
initialCapacity :初始容量,这个值指的是整个 ConcurrentHashMap 的初始容量,实际操作的时候需要平均分给每个 Segment。
比如你初始容量是 64,Segment 的容量为16,那么每个段中哈希表的初始容量就为 64/16=4。
loadFactor:这个负载因子是给 段中哈希表 扩容时候使用的

Segment数组的长度是不可以被改变的,初始化如果不规定,那么就采用默认的 16,扩容只是针对segment数组内部的HashEnrty数组

从源码中发现,创建ConcurrentHashMap的时候,在Segment第一个位置初始化了一个HashEntry,其他位置则没有,这是因为每次创建一个Segment对象的时候,需要进行大量的初始化工作,首先初始化一个s0后,后面只要在涉及到创建Segment对象,只要 拿s0里面的数据当做模板就可以,这样会增加一定的效率。

public ConcurrentHashMap(int initialCapacity,
                         float loadFactor, int concurrencyLevel) {
    // 非法参数判断。
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    // 并行级别越界重设。
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    
    // Find power-of-two sizes best matching arguments 找到2次幂大小的最佳匹配参数.
    int sshift = 0; // 偏移量吧因该是。
    int ssize = 1; // segment的数组长度,后面根据concurrencyLevel来计算
    
    // 计算并行级别 ssize,因为要保持并行级别是 2 的 n 次方
    // 如果这里我们的并行级别是16的话。
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 到这一步,sshift = 4, ssize = 16。

    // 那么计算出 segmentShift 为 28,segmentMask 为 15,后面会用到这两个值
    // 这两个值的左右其实就是为了计算某个key 在segment数组中的索引
    this.segmentShift = 32 - sshift; // 段偏移量
    this.segmentMask = ssize - 1; // 段掩码

    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;

    // initialCapacity 是设置整个 map 初始的大小,
    // 这里根据 initialCapacity 计算 Segment 数组中每个位置可以分到的大小。
    // 如 initialCapacity 为 64,那么每个 Segment 或称之为"槽"可以分到 4 个。
    // 那么此时段中哈希表的容量就为 4。
    int c = initialCapacity / ssize; // c 用于
    
    // 如果段中容量与segment的数组长度乘积小于了整个 ConcurrentHashMap 的初始容量,
    // 那我们就把目标容量(段中哈希表容量) + 1;
    if (c * ssize < initialCapacity)
        ++c;
    
    // 默认 MIN_SEGMENT_TABLE_CAPACITY 是 2,这个值也是有讲究的,因为这样的话,对于具体的槽上,
    // 插入一个元素不至于扩容,插入第二个的时候才会扩容
    int cap = MIN_SEGMENT_TABLE_CAPACITY; //先给此时的段中容量默认为 2 。
    // 如果段中默认容量小于目标容量的话就一直扩大到段中默认容量等于目标容量。
    while (cap < c)
        cap <<= 1;

    // 创建段数组的第一个元素 segment[0]
    Segment<K,V> s0 =
        new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                         (HashEntry<K,V>[])new HashEntry[cap]);
    // 创建段数组,数组长度就为 ssize 。
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    // 往段数组中有序的写入 segment[0]。
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

2.put操作,这个方法主要代码就是定位到相应的Segment,然后针对这个Segment进行put操作;

public V put(K key, V value) {
    // 先建一个段的临时桶。
    Segment<K,V> s;
    // 如果要put的值为空的话就抛出异常。
    if (value == null)
        throw new NullPointerException();
    
    // 计算 key 的 hash 值
    int hash = hash(key);
    
    // 根据 hash 值找到段表中的位置 j。
    // hash 是 32 位,无符号右移 segmentShift(28) 位,剩下高 4 位,
    // 然后和 segmentMask(15) 做一次与操作,也就是说 j 是 hash 值的高 4 位,也就是槽的数组下标
    /*
    	0000 0000 0000 0000 0000 0000 0000 1101
    	0000 0000 0000 0000 0000 0000 0000 1111
    										&
    	----------------------------------------
    	0000 0000 0000 0000 0000 0000 0000 1101
    */
    
    int j = (hash >>> segmentShift) & segmentMask;
    // 刚刚说了,初始化的时候初始化了 segment[0],但是其他位置还是 null。
    // 所以先判断segments[j]位置是否初始化了桶,如果为空就要去进行初始化
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
         (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j); // 初始化 j 处的桶。
    // 插入新值到 s 中。
    return s.put(key, hash, value, false);
}

【ensureSegment方法】

在put方法中,首先尝试获取j位置的桶对象,如果这个位置桶对象不为null,那么就直接执行后续的put操作,如果为空,则进入ensureSegment方法,首先看一下ensureSegment()这个方法;

看到这个方法里面,还是不断的通过UNSAFE.getObjectVolatile(ss, u)这个方法去判断对应坐标位置的Segment对象被其他线程创建了,不断的去判断是因为创建Segment对象需要初始化大量的属性,如果在cas创建Segment之前仅做一次判断,那么可能会浪费内存空间。

确保Segment对象为null时,就通过CAS来创建Segment对象,这个操作是原子性的

最终方法返回seg对象,不管这个seg对象是哪个线程创建的,只要被创建了,就把他返回。

private Segment<K,V> ensureSegment(int k) {
    // 临时段表。
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset 计算下标
    Segment<K,V> seg;
    // 注意,这里是getObjectVolatile这个方法,这个方法的意思就是,别的线程要是修改了segment[k],这个线程是可见的。
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 这里看到为什么之前要初始化 segment[0] 了,
        // segment[0] 就相当于一个初始化模板,
        // 使用当前 segment[0] 处的数组长度和负载因子来初始化 segment[k],
        // 为什么要用“当前”,因为 segment[0] 可能早就扩容过了。
        Segment<K,V> proto = ss[0]; // 获取当前 segment[0] 作为初始化模板。
        int cap = proto.table.length;
        float lf = proto.loadFactor;
        int threshold = (int)(cap * lf);

        // 初始化 segment[k] 内部的哈希表。
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        // 再次检查一遍该槽是否被其他线程初始化了。
        // 也就是在做上面那些操作时,看看是否有别的线程操作过 segment[k]。
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
            == null) { 
			
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 构建新的 segment 对象。
       
            // 再次检查 segment [k] 是否为null。
            // 注意,这里是while,之前的if,也是起到如果下面操作失败,再次检查的作用。
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                   == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    // CAS操作,这里的比较并交换在CPU里面就是一条指令,保证原子性的。
                    // 不存在那种比较完毕之后的间隙,突然切换到别的线程来修改这个值的情况。
                    break;
            }
        }
    }
    
    // 返回 segment 对象。
    // 这里返回的seg可能是自己new的,也可能是别的线程new的,反正只要其中一个就好了。
    return seg;
}

得到Segment对象后,就可以对这个Segment对象内部的HashEntry执行put操作了

ReentrantLock中:

tryLock() 非阻塞的 ---- 加到锁返回true,加不到返回false

Lock() 阻塞的 ---- 加不到锁当前线程就阻塞了。

while(!tryLock()){}   从效果上看就等于Lock(),优点是可以在执行其他的事情,缺点是拿不到锁会一直循环,浪费CPU --- 在scanAndLockForPut方法就就用到了这个

put操作首先获取锁,如果获取成功,就在try代码块中尝试put了,put的过程和hashmap 其实基本是一样的,如果key相同,就替换值,key不相同,就尝试插入,插入前还要判断是否需要扩容等,最后采用头插法插入,然后解锁。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {

    // 再 put 到指定段中之前,我们得获取到当前段表中这个桶的独占的锁。
    // 以此来保证整个过程,只有我们一个线程在对这个桶做操作。
    HashEntry<K,V> node = tryLock() ? null :
        scanAndLockForPut(key, hash, value);
   
    V oldValue; // 用来存储被覆盖的值。
    // 下面就是hashmap添加代码的那套方法了
    try {
        // 这个是段表某个桶内部的哈希表。
        HashEntry<K,V>[] tab = table;
        // 再次利用待插入键值对 key 的 hash 值,求应该放置的数组下标。
        int index = (tab.length - 1) & hash;
        // first 是桶中哈希表的待插入桶处的链表的表头。
        HashEntry<K,V> first = entryAt(tab, index);

        // 遍历链表。
        for (HashEntry<K,V> e = first;;) {
            // 这个遍历主要是为了找出,key重复的情况。
            if (e != null) {
                K k;
                // 如果当前链表节点的key重复了的话。
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k)))
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆盖旧的值。
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
            
                e = e.next;
            }
            else {
                // node 到底是不是 null,这个要看获取锁的过程,不过和这里都没有关系。
                // 如果不为 null。
                if (node != null)
                    // 那就把它设置为链表表头,JDK1.7使用头插法。
                    node.setNext(first);
                else
                    // 否则如果是null,就先初始化它再设置为链表表头。
                    node = new HashEntry<K,V>(hash, key, value, first);

                int c = count + 1;
                // 如果超过了段表中该桶中哈希表的阈值,这个哈希表就需要扩容。
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node); // 扩容。
                else
                    // 没有达到阈值,将 node 放到哈希表的 index 位置,
                    // 其实就是将新的节点设置成原链表的表头,使用头插法。
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁
        unlock();
    }
    return oldValue;
}

scanAndLockForPut()获取写入锁

首先尝试获取锁,如果未获取到,则执行scanAndLockForPut方法,这个方法就是不断的循环获取锁,如果获取失败了,就进行一些准备工作。(与其自旋浪费CPU性能,不如趁着这个功夫去做一些准备工作)

当然也不会让线程不断的自旋占用CPU资源,重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁,避免cpu空转。

(retries & 1) == 0这个判断代表重试次数为偶数的时候进行后面的判断(头插法下,头节点是否变更)为什么会有等于偶数才进行判断这个设计呢?我想可能是因为后面的代码执行也需要时间,有可能在执行后续代码的时候,此线程失去了获取锁的机会,造成锁饥饿吧。

    private HashEntry<K, V> scanAndLockForPut(K key, int hash, V value) {
        // 简单来说就是拿到段表中某个桶中的哈希表数组中通过hash计算的那个下标下的第一个节点。
        HashEntry<K, V> first = entryForHash(this, hash);
        // 备份一下当前节点的内容。
        HashEntry<K, V> e = first;
        // 辅助变量。
        HashEntry<K, V> node = null;
        int retries = -1; // 用于记录获取锁的次数。

        // 循环获取锁。
        // 如果获取失败,就会去进行一些准备工作。
        while (!tryLock()) {
            // 辅助变量用于重复检查,
            // 用来检查对应段表中那个桶上的哈希表数组中对应索引桶处,之前取出来的第一个节点是否还是我们之前取得那个。
            HashEntry<K, V> f; // to recheck first below

            // 下面就是准备工作。
            // 因为准备工作也不需要每次循环都去做对吧,最好的预期,做一次准备工作就够了。
            if (retries < 0) {
                // 判断段表中对应桶中的哈希表的对应桶上的节点 HashEntry 是不是还没被初始化。
                if (e == null) {
                    // 进到这里说明数组该位置的链表是空的,没有任何元素。
                    // 当然,进到这里的另一个原因是 tryLock() 失败,所以该槽存在并发,不一定是该位置。
                    // 将我们即将插进去的元素,构建成一个HashEntry节点对象。
                    if (node == null) {
                        node = new HashEntry<K, V>(hash, key, value, null);
                    }
                    // 将 retries 赋值为0,不让准备工作重复执行。
                    retries = 0;
                } else if (key.equals(e.key)) {
                    // 否则的话,判断 key 是否有重复的。
                    // 如果有重复的,将 retries 赋值为0,不让准备工作重复执行。
                    retries = 0;
                } else {
                    // 如果key不重复否则顺着链表往下走。在外面的while循环中还会走到这里面
                    e = e.next;
                }
            } else if (++retries > MAX_SCAN_RETRIES) {
                // 重试次数如果超过 MAX_SCAN_RETRIES(单核1多核64),那么不抢了,进入到阻塞队列等待锁,避免cpu空转。
                // lock() 是阻塞方法,直到获取锁后返回。
                lock();
                break;
            } else if ((retries & 1) == 0 && // 偶数次数才进行后面的判断-- 为了提升效率?
                // 这个时候出现问题了,那就是有新的元素进到了链表,成为了新的表头。
                // 也可以说是链表的表头被其他线程改变了。
                // 所以这边的策略是,相当于重新走一遍这个 scanAndLockForPut 方法。
                // 为什么只要判断头节点是否改变呢?因为1.7用的是头插法
                (f = entryForHash(this, hash)) != first) {

                // 此时怎么做呢,
                // 别的线程修改了该segment的节点,重新赋值e和first为最初值,和第一二行代码一样的效果。
                e = first = f; // re-traverse if entry changed
                retries = -1;
            }
        }

        // 将准备工作制作好的节点返回。
        return node;
    }

 【rehash方法】1.7下的扩容

注意,扩容方法针对的是segment中的hashEntry数组,而不是针对segment,也就是说,扩容可能只涉及到某一个桶(单个线程操作),不同的桶内,hashEntry数组的长度可能会不一样。

    // Segment内HashEntry数组的扩容方法
    private void rehash(HashEntry<K,V> node) {
        HashEntry<K,V>[] oldTable = table;  // 旧数组
        int oldCapacity = oldTable.length;  // 旧长度
        int newCapacity = oldCapacity << 1;  // 新容量为原来的2倍
        threshold = (int)(newCapacity * loadFactor);  // 新阈值
        HashEntry<K,V>[] newTable =
            (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 创建新数组
        int sizeMask = newCapacity - 1;
        for (int i = 0; i < oldCapacity ; i++) {  // 遍历旧数组
            HashEntry<K,V> e = oldTable[i];  
            if (e != null) {  // 如果e不为null
                HashEntry<K,V> next = e.next;  // next
                int idx = e.hash & sizeMask;  // 计算在新数组中的位置
                if (next == null)   //  如果本身只有一个HashEntry
                    newTable[idx] = e;  // 直接插入到新数组
                else { // Reuse consecutive sequence at same slot
                    HashEntry<K,V> lastRun = e;  // 最后一次运行的HashEntry
                    int lastIdx = idx;  // lastRun对应要插入的位置
                    for (HashEntry<K,V> last = next;
                            last != null;
                            last = last.next) {
                        int k = last.hash & sizeMask;
                        if (k != lastIdx) {   // 如果和lastIdx不一样
                            lastIdx = k;    // 进行替换
                            lastRun = last;
                        }
                    }
                    // lastRun保证了后面的HashEntry在新数组中都是相同位置,减少了循环次数
                    newTable[lastIdx] = lastRun;
                    // Clone remaining nodes
                    // 从开头到lastRun
                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                        // 重新计算要插入的位置
                        V v = p.value;
                        int h = p.hash;
                        int k = h & sizeMask;
                        HashEntry<K,V> n = newTable[k];
                        // 迁移到新数组上
                        newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                    }
                }
            }
        }
        // 头插法插入新元素
        int nodeIndex = node.hash & sizeMask; // add the new node
        node.setNext(newTable[nodeIndex]);
        newTable[nodeIndex] = node;
        table = newTable;
    }

【size 方法】

先尝试不加锁获取size,将每个segemnt的count加起来作为总数,期间把每个segment的modCount加起来sum作为结构是否被修改的判断依据

  • 如果相同,说明在统计过程中没有发生并发修改操作,因此统计出来的count就是正确的
  • 如果不同,说明发生了并发修改操作,这种情况下就对所有的segemnt进行加锁,进行统计 --- 一旦全部加锁,统计出来的数据肯定是正确的,但是此时无法进行其他的并发操作,会影响并发效率

在HashMap也有同样的modCount这个变量,它负责记录容器中数据的变化,如每次进行put,remove,replace都会去修改这个值;

而ConcurrentHashMap使用modCount和HashMap略有不同:

  • HashMap使用modCount是为了检测一些比较明显的并发问题,会抛出ConcurrentModificationException异常
  • 而ConcurrentHashMap的modCount则主要是使用在size方法中,使用modCount来记录每次数据变更操作
public int size() {
    final Segment<K,V>[] segments = this.segments;
    int size;
    boolean overflow; // true if size overflows 32 bits
    long sum;         // sum of modCounts  写操作次数
    long last = 0L;   // previous sum  上一次写操作次数
    int retries = -1; // first iteration isn't retry
    try {
        for (;;) {
            // RETRIES_BEFORE_LOCK为常量2,先尝试两次不加锁,获取不成功在给全部segment加锁
            // 超过统计阈值,给每个Segment加锁
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j)
                    //ensureSegment(j)通过CAS得到内存上最新的Segment,保证上锁前没有并发问题
                    ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) {
                    // modCount:每个Segment中记录的修改次数
                    // count : 每个Segment中 hashEntry的数目,就是单个桶中的size
                    sum += seg.modCount;
                    int c = seg.count;
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }
            // 计算过程中未发生元素的写操作表示统计正确
            // 当前统计和上次统计的一样,说明在这个期间没有发生并发写修改操作
            if (sum == last)
                break;
            last = sum;
        }
    } finally {
        // 解锁, 进行判断,加锁了才需要解锁
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

JDK1.8下的ConcurrentHashMap的源码

jdk1.8下,ConcurrentHashMap摒弃了segment分段锁的概念,并沿用了同时期hashMap的思想,底层采用“数组+链表+红黑树”的结构,为了做到并发,同时增加了很多辅助类,如TreeBin,Traverser 等内部类

1、ConcurrentHashMap内部重要 的一些属性和内部类

sizeCtl:ConcurrentHashMap中的一个控制符

  • -1:代表正在初始化
  • -(1+n): 代表有n个线程正在进行扩容操作(-2表示一个线程正在扩容,-3表示2个线程正在扩容)
  • 0:未初始化,数组初始容量是16
  • 正数:如果数组没有初始化,记录的数据是数组的初始化容量,如果已经初始化,记录的是数组的扩容阈值(数组的初始容量*扩容因子)

    /**
     * Table initialization and resizing control.  When negative, the
     * table is being initialized or resized: -1 for initialization,
     * else -(1 + the number of active resizing threads).  Otherwise,
     * when table is null, holds the initial table size to use upon
     * creation, or 0 for default. After initialization, holds the
     * next element count value upon which to resize the table.
     */
    private transient volatile int sizeCtl;

Node 类 :最核心的内部类,包装了键值对,与HashMap中的定义类似,但是他对value和next属性用volatile进行了修饰,包装了可见性

TreeNode类:树节点类,链表过长的时候,转为TreeNode,但是与HashMap不同的是,并不会直接转换为红黑树,而是将节点包装成TreeNode,放在TreeBin对象中,然后由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。

TreeBin类 : 负责包装TreeNode节点,代替了TreeNode的根节点,也就是说ConcurrentHashMap的数组中存放的是TreeBin对象,而不是TreeNode对象。(这个点和HashMap是不一样的,因为在多线程并发下,取的锁是table[i]的第一个元素,如果是存放的是红黑树的话,那么取的就是这个 红黑树的根节点,但是红黑树插入的过程中根节点是可能会改变的,即锁也会发生变化,因此ConcurrentHashMap将TreeBin对象代替红黑树的根节点,这样就不会发生锁对象变化的情况。)

ForwardingNode类:用于连接两个table的节点类,包含一个nextTable指针,用于指向下一张表。这个节点的 key value next 指针全为null,hash值为-1;其中的find方法是从nextTable例查询节点,而不是以自身为头节点进行查找。

2、ConcurrentHashMap内部重要 的一些方法

  • tabAt(Node<K,V>[] tab, int i) :获取在i位置上的node节点
  • casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v):利用CAS算法设置i位置上的Node节点
  • setTabAt(Node<K,V>[] tab, int i, Node<K,V> v):利用volatile设置节点位置的值
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }

3、构造器

  1. JDK1.8里面,ConcurrentHashMap调用构造器只是针对传入的值对一些属性进行赋值,并没有构造table数组等,table数组的初始化是在向ConcurrentHashMap中插入元素的时候发生的。
  2. 注意传入initialCapacity时,实际初始化容量的区别(都遵循一个原则,最终初始化的容量为2^n):
    1. JDK1.8:看源码,是要经过一系列计算的,比如传入16,实际初始化容量为32
    2. JDK1.7:传入16,实际就是16
    3. hashmap:传入16,实际就是16
    // 无参构造器,不做操作
    public ConcurrentHashMap() {
    }

    // 进传入一个初始化容量。
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        // 注意看这个位置,tableSizeFor方法还是hashmap那一套计算方法,但是传入的参数不一样了
        // 比如initialCapacity传16,实际初始化容量应该是32了
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        // 计算出来的初始化容量赋值给sizeCtl 
        // 现在数组还没有初始化呢,sizeCtl就是数组的初始容量
        this.sizeCtl = cap;
    }
    

    // 这个loadFactor好像只与计算数组初始化长度有关,实际扩容还是用的0.75
    // 但是hashMap中传入的loadFactor就会参与每次扩容计算
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        // 初始化容量= 比这个算出来的size大的 2^n
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }

    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }

4、putVal 方法:其实和HashMap 是很相似的,只不过增加了一些同步的操作。

大致步骤为:

  • 校验key和value的值,这两个值都不能为null,而hashmap中的key和value都是可以为null的
  • 死循环并更新tab数组的值,如果容器没有初始化,就调用initTable 方法进行初始化
  • 根据计算的hash值找到数组下标,如果对应的位置为空,就创建一个Node对象用CAS方式添加进去,跳出循环
  • 如果产生hash冲突,则进行添加操作(链表或者红黑树),这个时候,头节点是被锁住的,保证了同一时间只能有一个线程对这个链表或者红黑树进行修改
  • 判断是否需要从链表转为红黑树(树化)
  • 判断是否需要扩容
    // onlyIfAbsent :如果已经存在了key,取值为true,就不做操作,取值false,就进行修改
    final V putVal(K key, V value, boolean onlyIfAbsent) {
    	// key和 value都不允许为空
        if (key == null || value == null) throw new NullPointerException();
        // 计算key的hash,类似hashmap,高16位也参与运算,减少哈希冲突
        int hash = spread(key.hashCode());
        int binCount = 0;
        // 死循环执行,添加或者修改成功,就会跳出去
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
    		// 如果数组没有初始化,就进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
    		// tabAt:获取tab索引i位置的元素
		    // casTabAt:cas的方式更新tab索引i位置的元素
		    // 这个分支其实就是判断当前索引位置为空后,将新节点放入到对应的位置
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // CAS 进行插入
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 当前索引位置有值了,且 hash 值为 -1,说明是 ForwardingNode 对象(这是一个占位符对象,保存了扩容后的容器)
            // 这种情况说明数组正在扩容,此时元素不能添加到这里,而是去协助扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 如果 hash 冲突了,且 hash 值不为 -1,需要将数据挂在链表或者红黑树上
            else {
                V oldVal = null;
			    // f就是当前索引位置的头节点,jdk1.8就是只锁头节点
                synchronized (f) {
				    // 判断当前索引头节点和锁是不是一个
                    // 防止树化或者扩容导致锁改变?
                    if (tabAt(tab, i) == f) {
					    // 当前节点hash值,大于等于0没有特别含义,就代表是一个链表或者空(此时是不会空的)
                        if (fh >= 0) {
                            // binCount是个计数器,其实就是当前索引下链表的长度
                            binCount = 1;
                            // 循环遍历链表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // key 是同一个,是修改操作
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
								    // 看onlyIfAbsent的取值,是修改,还是什么都不做
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
							    // 添加操作,最新数据插入到链表最后面
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 当前节点是个树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 链表长度大于等于8时,将该节点改成红黑树树
                // treeifyBin里面会先判断数组长度是否小于64,只有大于等于64才会树化
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 判断是否需要扩容
        addCount(1L, binCount);
        return null;
    }


// 计算key的hash,类似hashmap,高16位也参与运算,减少哈希冲突
// &HASH_BITS 的意义是保证计算出来的hash一定是个正数(最高位为0),因为负数有特别含义
static final int spread(int h) {
	return (h ^ (h >>> 16)) & HASH_BITS;
}


static final int MOVED     = -1; // hash for forwarding nodes 
static final int TREEBIN   = -2; // hash for roots of trees 
static final int RESERVED  = -3; // hash for transient reservations 
static final int HASH_BITS = 0x7fffffff; // 32位的二进制,最高位是0,其他任何数和他进行&运算,最高位一定是0,也就是说代表正数

5、initTable 方法

初始化table方法主要是应用了属性sizeCtl ,如果sizeCtl <0,标识其他线程挣扎进行初始化,当前线程就放弃此次操作(初始化只能由一个线程操作)。如果获得了初始化的权限,就用CAS的方法将sizeCtl设置为-1,防止其他线程进入再次初始化,当前线程初完成数组的初始化后,将sizeCtl值设置为当前数组长度的0.75.

    /**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            // 小于0说明 数组正在初始化或者正在扩容,就让出当前线程
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            // CAS 修改 sizeCtl 的值为-1,代表正在初始化,如果成功则进行初始化操作
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
				    //此时准备要初始化了,初始化前要再次判断,原因是:
				    //如果一个线程走到了sc = n - (n >>> 2)表示初始化完成了,sc的值不是-1了,
				    // 又有一个线程要执行刚刚的cas操作,将sc从其他值改为-1,也会成功,就会再次初始化,不符合期望
                    if ((tab = table) == null || tab.length == 0) {
                        // 前面说了,如果数组没有初始化,sc = sizeCtl 记录的数据是数组的初始化容量
                        // 无参构造就是默认值16,否则就是初始化计算出来的初始化容量
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        // 创建数组
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // sizeCtl 计算后作为扩容的阈值 :可以看到,扩容因子就是0.75
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

6、addCount方法

首先理解如下的场景:

例如HashMap中就有一个size属性,用于记录集合中的元素数量,每次有元素数量的改变时,直接修改size就可以。但ConcurrentHashMap是一个并发集合,如果只用一个属性统计元素个数,为了保证线程安全,势必要加锁,竞争激烈的情况下影响性能,那么ConcurrentHashMap是怎么解决效率问题的呢?引入两个参数:

  1. baseCount :就是类似于HashMap中的size,元素个数改变时先去争抢这个锁使用
  2. counterCells:一个数组,baseCount锁拿不到时,在去争抢counterCells[i] 这个锁

那么,ConcurrentHashMap中实际的元素个数,就是通过baseCount + counterCells数组元素累加得到

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }
private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    // if分支有个CAS操作,如果CAS修改成功了,就直接在baseCount中修改了,不需要进入if分支
    // counterCells不为空,或者通过CAS没有成功修改baseCount的值,就可以到这个if分支
    // 这个x的值,就是put操作完给的值,为1,意味着加了一个元素
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        // ThreadLocalRandom.getProbe() 线程的哈希探针 尽量避免线程争用同一数组元素
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
                U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            // 这个方法很复杂,简答介绍一下
            // 作用就是初始化CounterCell,来记录元素个数,里面包含CounterCell初始化,扩容等操作
            // 如果CounterCell的长度大于等于CPU核数了,就不需要扩容了
            fullAddCount(x, uncontended);
            // fullAddCount里面处理的事情太多,当前线程就不参与到扩容的逻辑中了
            return;
        }
        // check是是否需要扩容的主要标识
        // put方法中,传进来的binCount = check,
        // binCount >= 1 当前数组下标下链表的长度,为1 也有可能是key相同,即修改的场景,不需要扩容
        // binCount == 0,当前数组下标为空,节点直接放到桶中了,理论上来说走到这里面,不会为0
        // binCount == 2  桶位下已经树化
        // remove时,check =-1
        if (check <= 1)
            return;
        // 获取当前元素的个数
        s = sumCount();
    }
    // 只有添加动作的时候会走到这个里面
    if (check >= 0) {
        // tab 就是当前数组,nt= nextTable 代表扩容后的数组
        // 扩容过程中,会把新的table赋值给nextTable保持引用关系,扩容结束后,从新赋值为null
        Node<K,V>[] tab, nt; int n, sc;
        /**
         * 条件1:s >= (long)(sc = sizeCtl)
         * true: sizeCtl为负数,表示正在扩容, sizeCtl为正数,表示大于了扩容阈值,需要扩容
         * false:不需要扩容 ,跳出循环
         */
        // (tab = table) != null  当前情况下,应该是恒成立的
        // (n = tab.length) < MAXIMUM_CAPACITY 当前长度小于最大限制,可以扩容
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
            (n = tab.length) < MAXIMUM_CAPACITY) {
            // 扩容的标识戳
            int rs = resizeStamp(n);
            // sc < 0 时,表示正在扩容,进入if分支,协助扩容
            if (sc < 0) {
                // 判断当前线程是否需要参与到本次扩容,不需要则直接break了
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                // cas修改sc的低16位,成功代表多了一个线程参与工作
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    // 协助扩容线程说明已经持有了nextTable,直接传入就行
                    transfer(tab, nt);
            }
            // 当前线程是触发扩容的第一个线程
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                (rs << RESIZE_STAMP_SHIFT) + 2))
                // 触发扩容的线程还没有nextTable,需要做一些扩容准备工作
                transfer(tab, null); // 扩容操作
            s = sumCount();
        }
    }
}

7、transfer扩容方法

  • 计算CPU核心数和Map数组长度得到每个线程需要处理多少个桶,并且每个线程处理的桶数量都是平均的,默认每个线程处理16个桶,因此如果长度只有16,那么就只有一个线程扩容。
  • 整个扩容分为两个部分
    • 一个是构建一个新的nextTable,容量是原来table 的两倍,nextTable是volatile修饰的
    • 另一个是将原来table中的元素复制到nextTable中,这是允许多线程操作的
  • 单线程完成扩容的整体流程(16--->32)
    • 首先获得最小下标bound和最大下标i,(这两个下标就是前面所说的每个线程处理的桶的数量)后面转移遍历的时候,采用从后往前的方式进行遍历table
    • 因为>0.75扩容,因此15~12的时候是null,遍历这些桶的时候,会将该桶的位置存入一个forwardingNode节点 (标志着当前节点已经处理过了,而且forwardingNode节点的hash值是初始化为moved 也就是-1)
    • 从11 开始,桶上有节点了,就开始涉及到转移复制了,分成两个分支,链表一个分支,红黑树一个分支,这里我们假设桶上是个链表
    • 首先拿到链表的头节点,并定义一个lastRun节点,循环遍历这个链表,最后将lastRun标记为链表尾部同色节点的第一个节点。
    • 计算runBit:当前节点hash & 原数组长度n ,值为0或者1,标记在新数组的位置,通过runBit来对当前桶上的链表分割成低位链表和高位链表
    • 通过CAS的方式添加到新table中,遍历所有的节点后就完成了扩容的操作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。

  • 多线程扩容是如何完成的呢?
    • 我们注意到,在单线程操作扩容的时候,每个处理过的节点都会将该位置标记为forwarding节点,这个节点的hash值为-1;
    • 在第 行有个判断 (fh = f.hash) == MOVED,如果遍历到的节点时forwarding节点,说明这个节点已经被处理过了,就跳过继续遍历,这样就交叉完成了赋值工作并且很好的解决了线程安全的问题。

/**
 * 扩容方法
 *
 * @param tab 当前数组
 * @param nextTab 扩容的数组,触发扩容的线程这个值为null,协助扩容的这个值不为null
 */
private void transfer(ConcurrentHashMap.Node<K, V>[] tab, ConcurrentHashMap.Node<K, V>[] nextTab) {
    int n = tab.length, stride;
    // NCPU 和当前的电脑配置相关,如四核八线程,则NCPU为8;如果桶比较多的话,就切割成多断,每个线程完成一部分桶的迁移
    // stride为每个线程负责迁移的最小数组长度,这个值如果计算出小于MIN_TRANSFER_STRIDE(16),就赋值为16
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    }
    // 创建新的数组,扩容后的 触发扩容的线程会进入到这个分支里面
    if (nextTab == null) {            // initiating
        try {
            ConcurrentHashMap.Node<K, V>[] nt = (ConcurrentHashMap.Node<K, V>[]) new ConcurrentHashMap.Node<?, ?>[n
                << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        // 转移的下标,比如从16扩容到32,那么就把16赋值给transferIndex,后面迁移的时候从后向前遍历
        // 这个是分配每个线程负责的转移步长的一个下标,从后往前遍历
        transferIndex = n;
    }
    int nextn = nextTab.length;
    // 初始化一个Forwarding节点,他的hash值赋值为-1;
    ConcurrentHashMap.ForwardingNode<K, V> fwd = new ConcurrentHashMap.ForwardingNode<K, V>(nextTab);
    // 推进标记, true表示继续推进
    boolean advance = true;
    // 完成标记
    boolean finishing = false; // to ensure sweep before committing nextTab
    for (int i = 0, bound = 0; ; ) {
        ConcurrentHashMap.Node<K, V> f;
        int fh;
        /**
         * while就是给当前线程分配 要迁移的工作区间,比如tab[12]-tab[15]
         */
        while (advance) {
            // nextIndex分配任务开始下标 -- 会在下面的else if中赋值,第一次的值就是原数组的长度
            // nextBound分配任务结束下标
            int nextIndex, nextBound;
            // --i >= bound 成立表示:当前线程的任务还没有完成,还有相应的区间的桶需要处理
            // 不成立表示: 当前线程任务已经完成,或者是还没有分配
            if (--i >= bound || finishing) {
                advance = false;
            }
            // 条件成立:全局范围内的桶都已经分配完毕了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            // CAS操作,修改transferIndex的值为nextBound
            else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
                nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
                // 第一个开始扩容的线程会进入到这个位置,将bound设置为最小下标,i设置为最大下标
                // bound到i 这个坐标区间就是当前线程需要负责迁移的数组区间。
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                // 所有节点都已经完成复制工作,将nextTable赋值给table,并清空临时对象nextTable,sizeCtl也设置为当前容量的0.75
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // CAS更新扩容阈值,如果sizeCtl-1,说明有一个新的线程加入到扩容操作
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                // 这个判断为true,说明当前线程扩容结束,结束当前线程方法
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
                    return;
                }
                // 代表没有线程帮助扩容了,也就是扩容结束了,下个循环会走上面的if分支,完成扩容
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        // 当前坐标下为空,就往这个位置塞一个 forward节点
        else if ((f = tabAt(tab, i)) == null) {
            // 因为>0.75扩容,因此15~12的时候是null,会进来这个位置,将fwd对象放到对应的位置,advance设置为true,继续循环
            advance = casTabAt(tab, i, null, fwd);
        }
        // 表示当前已经塞了forward节点了
        else if ((fh = f.hash) == MOVED) {
            advance = true; // already processed
        } else {
            // 开始实际的数据迁移工作
            // 对头节点加锁,避免其他线程并发添加元素
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // ln 低位链表的引用
                    // hn 高位链表的引用
                    ConcurrentHashMap.Node<K, V> ln, hn;
                    // fh是当前节点的hash值, 大于等于0 表示是链表节点
                    if (fh >= 0) {
                        // 注意,这里进行了两次遍历,目的是为了对当前链表的节点进行分类
                        // runBit:当前节点hash & 原数组长度n ,值为0或者1,标记在新数组的位置
                        int runBit = fh & n;
                        // lastRun从头节点开始往后,一直遍历到最后
                        ConcurrentHashMap.Node<K, V> lastRun = f;
                        // 开始对当前桶的链表进行循环,这个循环将lastRun标记为最后同色的节点的第一个。
                        // 比如一个链表为 aabababbbb ,那么这个lastrun就代表倒数第四个b
                        // 比如一个链表为 ababababaa ,那么这个lastrun就代表倒数第二个a
                        for (ConcurrentHashMap.Node<K, V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun; // 低位链表
                            hn = null; // 高位链表
                        } else {
                            hn = lastRun;
                            ln = null;
                        }

                        // 再次循环链表,生成两个链表,lastRun作为终止条件,
                        // 因为第一次遍历后,已经得出lastRun及后面的节点都是可以放在同一个桶中的
                        for (ConcurrentHashMap.Node<K, V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash;
                            K pk = p.key;
                            V pv = p.val;
                            if ((ph & n) == 0) {
                                ln = new ConcurrentHashMap.Node<K, V>(ph, pk, pv, ln);
                            } else {
                                hn = new ConcurrentHashMap.Node<K, V>(ph, pk, pv, hn);
                            }
                        }
                        setTabAt(nextTab, i, ln); // 低位链表放到原下标位置
                        setTabAt(nextTab, i + n, hn); // 高位链表放到 i + n 位置
                        setTabAt(tab, i, fwd); // 最后在旧数组tab的i位置插入forwardNode节点,表示已经处理过该处
                        advance = true;
                    } else if (f instanceof ConcurrentHashMap.TreeBin) {
                        ConcurrentHashMap.TreeBin<K, V> t = (ConcurrentHashMap.TreeBin<K, V>) f;
                        ConcurrentHashMap.TreeNode<K, V> lo = null, loTail = null;
                        ConcurrentHashMap.TreeNode<K, V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (ConcurrentHashMap.Node<K, V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            ConcurrentHashMap.TreeNode<K, V> p = new ConcurrentHashMap.TreeNode<K, V>(h, e.key,
                                e.val, null, null);
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null) {
                                    lo = p;
                                } else {
                                    loTail.next = p;
                                }
                                loTail = p;
                                ++lc;
                            } else {
                                if ((p.prev = hiTail) == null) {
                                    hi = p;
                                } else {
                                    hiTail.next = p;
                                }
                                hiTail = p;
                                ++hc;
                            }
                        }
                        ln = (lc <= UNTREEIFY_THRESHOLD)
                            ? untreeify(lo)
                            : (hc != 0) ? new ConcurrentHashMap.TreeBin<K, V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD)
                            ? untreeify(hi)
                            : (lc != 0) ? new ConcurrentHashMap.TreeBin<K, V>(hi) : t;
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

/**
 * 当 node 的 hash 等于 MOVED的时候会进行 helpTransfer方法
 */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
    Node<K,V>[] nextTab; int sc;
    // 如果 table 不是空 且 node 节点是转移类型,数据检验
    // 且 node 节点的 nextTable(新 table) 不是空,同样也是数据校验
    // 尝试帮助扩容
    if (tab != null && (f instanceof ForwardingNode) &&
        (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
        // 根据 length 得到一个标识符号
        int rs = resizeStamp(tab.length);
        // 如果 nextTab 没有被并发修改 且 tab 也没有被并发修改
        // 且 sizeCtl  < 0 (说明还在扩容)
        while (nextTab == nextTable && table == tab &&
            (sc = sizeCtl) < 0) {
            // 如果 sizeCtl 无符号右移  16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了)
            // 或者 sizeCtl == rs + 1  (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)这里其实是个bug
            // 或者 sizeCtl == rs + 65535  (如果达到最大帮助线程的数量,即 65535)
            // 或者转移下标正在调整 (扩容结束)
            // 结束循环,返回 table
            if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                sc == rs + MAX_RESIZERS || transferIndex <= 0)
                break;
            // 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
            if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                // 进行转移
                transfer(tab, nextTab);
                // 结束循环
                break;
            }
        }
        return nextTab;
    }
    return table;
}

ConcurrentHashMap 的get 方法执行逻辑:

1、JDK1.7:根据key计算出hash值定位到具体的segment,然后在获取定位HashEntry对象,并对HashEntry对象进行链表遍历,找到对应的元素。

2、JDK1.8:根据key计算出hash值,判断数组是否为空,是首节点就直接返回,是红黑树就查询红黑树,是链表就遍历链表

那么针对于ConcurrentHashMap 的get方法是否需要加锁呢??答案是不需要的,因为Node 元素的 value 和 指针next都是用volatile修饰的,因此在多线程的环境中,A线程修改节点的value或者新增节点,对B线程是可见的。这也是ConcurrentHashMap比其他并发集合如HashTable 效率高的原因之一。

ConcurrentHashMap中不支持key或者value 为null的原因?

1、首先源码层面进行了非空判断做限制

2、ConcurrentHashMap是多线程的,通过get(key)方法获取对应的value的值为null时,你无法判断他是put的时候value本身就是null还是说通过这个key没有找到value。

参考资料


面试被问到 ConcurrentHashMap答不出 ,看这一篇就够了!_Java烂猪皮V的博客-CSDN博客

ConcurrentHashMap源码分析(JDK8版本)_小小旭GISer的博客-CSDN博客_concurrenthashmap源码分析

理解Java7和8里面HashMap+ConcurrentHashMap的扩容策略