Java8下的ConcurrentHashMap源码分析


ConcurrentHashMap分析

ConcurrentHashMap是HashMap的线程安全版本的实现版本

Node的数据结构

想要线程安全就要从数据结构上做调整

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;

    Node(int hash, K key, V val, Node<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.val = val;
        this.next = next;
    }

    public final K getKey()       { return key; }
    public final V getValue()     { return val; }
    public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
    public final String toString(){ return key + "=" + val; }
    public final V setValue(V value) {
        throw new UnsupportedOperationException();
    }

    public final boolean equals(Object o) {
        Object k, v, u; Map.Entry<?,?> e;
        return ((o instanceof Map.Entry) &&
                (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                (v = e.getValue()) != null &&
                (k == key || k.equals(key)) &&
                (v == (u = val) || v.equals(u)));
    }

    /**
     * Virtualized support for map.get(); overridden in subclasses.
     */
    Node<K,V> find(int h, Object k) {
        Node<K,V> e = this;
        if (k != null) {
            do {
                K ek;
                if (e.hash == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
            } while ((e = e.next) != null);
        }
        return null;
    }
}

Table(位桶)的初始化

重要的变量:sizeCtl。

  • sizeCtl是一个用于同步多个线程的共享变量,如果它的当前值为负数,则说明table正在被某个线程初始化或者扩容

  • 如果某个线程想要初始化table或者对table扩容,需要去竞争sizeCtl这个共享变量,获得变量的线程才有许可去进行接下来的操作

  • volatile关键字的作用时确保被该关键字修饰的共享变量的可见性,以及避免指令重排

    也就是说,java线程内存模型保证了所有线程看到该变量的值是唯一的。

private transient volatile int sizeCtl;

table

有volatile关键字修饰

transient volatile Node<K,V>[] table;

初始化方法

private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

size方法

可以看出来最终的size是baseCount+counterCell

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

变量讲解

baseCount

  • 当没有争用时,使用这个变量计数。一个 volatile 的变量,在 addCount 方法中会使用它,而 addCount 方法在 put 结束后会调用。在 addCount 方法中,会对这个变量做 CAS 加法。
  • 但是如果并发导致 CAS 失败了,怎么办呢?使用 counterCells。详情看文章put方法内的更新数量段。
private transient volatile long baseCount;

CounterCell

@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

@sun.misc.Contended这个注解就牛逼了,通过追加字节码,填充满整个缓存行可以优化性能,这种方式可以避免伪共享,避免了多个变量写到了同一个缓存行中引起互相锁定(追加多少?是否有用? 取决于处理器)

避免伪共享(false sharing)。
先引用个伪共享的解释:
缓存系统中是以缓存行(cache line)为单位存储的。缓存行是2的整数幂个连续字节,
一般为32-256个字节。最常见的缓存行大小是64个字节。当多线程修改互相独立的变量时,
如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。

所以伪共享对性能危害极大。

JDK 8 版本之前没有这个注解,Doug Lea 使用拼接来解决这个问题,把缓存行加满,让缓存之间的修改互不影响。

@sun.misc.Contended 注解标识,这个注解是防止伪共享的。是 1.8 新增的。使用时,需要加上 -XX:-RestrictContended 参数。


sumCount源码

final long sumCount() {
    CounterCell[] as = counterCells; 
    CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

get方法

看起来和HashMap的get方法没什么区别,并发下的读一般是没有什么问题的,只要保证了共享变量的可见性,参考Node的实现的确如此。

且table数组是被volatile关键字修饰的,这就代表我们不需要担心table数组的线程可见性问题,也就没有必要再加锁来实现并发了。

所以该段代码并没有通过锁来实现,因为使用了🔒效率低下。

public V get(Object key) {
      Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
      int h = spread(key.hashCode());
      if ((tab = table) != null && (n = tab.length) > 0 &&
          (e = tabAt(tab, (n - 1) & h)) != null) {
          if ((eh = e.hash) == h) {
              if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                  return e.val;
          }
          else if (eh < 0)
              return (p = e.find(h, key)) != null ? p.val : null;
          while ((e = e.next) != null) {
              if (e.hash == h &&
                  ((ek = e.key) == key || (ek != null && key.equals(ek))))
                  return e.val;
          }
      }
      return null;
  }

put方法

put方法就有学问了,因为线程不安全大多发生在写的时候,会发生不一致问题。观看源码发现用了!!!!!

锁住了根据索引取出的 Node节点!

synchronized (f) 

ConcurrentHashMap和HashMap的区别还有一点,就是HashMap允许一个key和value为null,而ConcurrentHashMap则不允许key和value为null,如果发现key或者value为null,则会抛出NPE。 因为判断key==null在多线程环境下会引发一系列问题,key的值在某个时间点上可能是null也可能不是,所以ConcurrentHashMap禁止键null。

public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

更新数量

HashMap中的数量直接 ++size即可。然ConcurrentHashMap 不是这样的

数量是共享的,涉及到线程安全问题。

addCount(1L, binCount);

ConcurrentHashMap维护baseCount来表示当前的记录数量,这在后面获取记录数量的size方法中会用到,而在put操作和remove操作的时候回通过调用方法addCount来更新它。

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
        !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
              U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

remove方法

删除操作属于写类型的操作,所以在进行删除的时候需要对table中的index位置加锁,ConcurrentHashMap使用synchronized关键字将table中的index位置锁住,然后进行删除,remove方法调用了replaceNode方法来进行实际的操作,而删除操作的步骤首先依然是计算记录的hashCode,然后根据hashCode来计算table中的index值,然后根据table中的index位置上是一条链表还是一棵红黑树来使用不同的方法来删除这个记录,删除记录的操作需要进行记录数量的更新(调用addCount方法进行)。

代码实现和put类似

public V remove(Object key) {
    return replaceNode(key, null, null);
}

/**
 * Implementation for the four public remove/replace methods:
 * Replaces node value with v, conditional upon match of cv if
 * non-null.  If resulting value is null, delete.
 */
final V replaceNode(Object key, V value, Object cv) {
    int hash = spread(key.hashCode());
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        if (tab == null || (n = tab.length) == 0 ||
            (f = tabAt(tab, i = (n - 1) & hash)) == null)
            break;
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            boolean validated = false;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        validated = true;
                        for (Node<K,V> e = f, pred = null;;) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                V ev = e.val;
                                if (cv == null || cv == ev ||
                                    (ev != null && cv.equals(ev))) {
                                    oldVal = ev;
                                    if (value != null)
                                        e.val = value;
                                    else if (pred != null)
                                        pred.next = e.next;
                                    else
                                        setTabAt(tab, i, e.next);
                                }
                                break;
                            }
                            pred = e;
                            if ((e = e.next) == null)
                                break;
                        }
                    }
                    else if (f instanceof TreeBin) {
                        validated = true;
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> r, p;
                        if ((r = t.root) != null &&
                            (p = r.findTreeNode(hash, key, null)) != null) {
                            V pv = p.val;
                            if (cv == null || cv == pv ||
                                (pv != null && cv.equals(pv))) {
                                oldVal = pv;
                                if (value != null)
                                    p.val = value;
                                else if (t.removeTreeNode(p))
                                    setTabAt(tab, i, untreeify(t.first));
                            }
                        }
                    }
                }
            }
            if (validated) {
                if (oldVal != null) {
                    if (value == null)
                        addCount(-1L, -1);
                    return oldVal;
                }
                break;
            }
        }
    }
    return null;
}

文章作者: Bxan
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Bxan !
  目录