Java并发容器(转载整理)

前言


在之前的博客中,我介绍了HashMap的具体实现以及一些值得注意的地方。HashMap是线程不安全的,换句话说,当我们在并发的情况下使用HashMap的时候,会造成数据的错误。所以在并发的情况下,通常使用HashTable和ConcurrentHashMap这种线程安全的并发容器来代替HashMap这种线程不安全的集合。本篇主要介绍HashTable和ConcurrentHashMap是如何在并发的情况下实现线程安全的。

一、HashTable

HashTable容器使用的是synchronized来保证线程安全,所以在线程竞争激烈的情况下HashTable的效率是非常低下的。主要原因是当有多个线程访问HashTable的时候,会造成阻塞和自旋的情况,我在之前的博客中介绍过这种情况,所以在线程竞争激烈的情况下是不适合用HashTable的。下面我通过HashTable的源码对它如何实现线程安全进行分析。

1.底层数据存储结构

首先要知道的是,HashTable是基于数组+链表的形式,和HashMap类似,其基本的节点类型是Entry<K,V>,都是实现了Map.Entry<K,V>接口,Entry类中的变量都是和HashMap中的一致。另外一点,HashTable与HashMap不同,HashTable是继承自古老的Dictionary类,这个类似乎已经被废弃。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
private static class Entry<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
V value;
Entry<K,V> next;

protected Entry(int hash, K key, V value, Entry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}

@SuppressWarnings("unchecked")
protected Object clone() {
return new Entry<>(hash, key, value,
(next==null ? null : (Entry<K,V>) next.clone()));
}

// Map.Entry Ops

public K getKey() {
return key;
}

public V getValue() {
return value;
}

public V setValue(V value) {
if (value == null)
throw new NullPointerException();

V oldValue = this.value;
this.value = value;
return oldValue;
}

public boolean equals(Object o) {
if (!(o instanceof Map.Entry))
return false;
Map.Entry<?,?> e = (Map.Entry<?,?>)o;

return (key==null ? e.getKey()==null : key.equals(e.getKey())) &&
(value==null ? e.getValue()==null : value.equals(e.getValue()));
}

public int hashCode() {
return hash ^ Objects.hashCode(value);
}

public String toString() {
return key.toString()+"="+value.toString();
}
}

2.构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Hashtable(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal Capacity: "+
initialCapacity);
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal Load: "+loadFactor);

if (initialCapacity==0)
initialCapacity = 1;
this.loadFactor = loadFactor;
table = new Entry<?,?>[initialCapacity];
threshold = (int)Math.min(initialCapacity * loadFactor, MAX_ARRAY_SIZE + 1);
}

public Hashtable(int initialCapacity) {
this(initialCapacity, 0.75f);
}

public Hashtable() {
this(11, 0.75f);
}

public Hashtable(Map<? extends K, ? extends V> t) {
this(Math.max(2*t.size(), 11), 0.75f);
putAll(t);
}

HashTable的默认初始化容量是11,HashMap中的是16,而且我们知道,HashMap中的初始容量initialCapacity仅仅是用来计算阈值用的,table的初始化也不在构造函数中进行,而是在扩容的时候初始化。但是HashTable不同, 它在构造函数中就进行table的初始化,并且初始化容量就是initialCapacity,阈值是取了MAX_ARRAY_SIZE + 1initialCapacity * loadFactor中的最小值。另外我们可以发现,Hashtable并没有太多的常量,比如默认容量大小都是直接写在代码中,而没使用常量。

3.get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public synchronized V get(Object key) {
Entry<?,?> tab[] = table;
int hash = key.hashCode();
// 计算key在table中的位置
int index = (hash & 0x7FFFFFFF) % tab.length;
// 找到index位置,遍历链表找到等于key的对象并返回
for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return (V)e.value;
}
}
return null;
}

get方法比较简单,添加了synchronized用来保证线程安全。

4.put方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public synchronized V put(K key, V value) {
// Make sure the value is not null
if (value == null) {
throw new NullPointerException();
}

// Makes sure the key is not already in the hashtable.
Entry<?,?> tab[] = table;
int hash = key.hashCode();
// 查找key对应的数组下标 以便获取所在的链表
int index = (hash & 0x7FFFFFFF) % tab.length;
@SuppressWarnings("unchecked")
Entry<K,V> entry = (Entry<K,V>)tab[index];
// 如果在hashtable中找到了key值相同的一个点,则替换掉它,并返回旧值。
for(; entry != null ; entry = entry.next) {
if ((entry.hash == hash) && entry.key.equals(key)) {
V old = entry.value;
entry.value = value;
return old;
}
}
// 添加该插入的节点
addEntry(hash, key, value, index);
return null;
}

首先要注意的一点,key不能为null,和HashMap不同,HashMap中的key是允许为空的。并且同样加上了synchronized关键字来保证线程安全。下面来看一下addEntry()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void addEntry(int hash, K key, V value, int index) {
modCount++;

Entry<?,?> tab[] = table;
if (count >= threshold) {
// Rehash the table if the threshold is exceeded
// 如果大于阈值,则扩容
rehash();

tab = table;
hash = key.hashCode();
index = (hash & 0x7FFFFFFF) % tab.length;
}

// Creates the new entry.
@SuppressWarnings("unchecked")
// index指的是当前桶的位置,这边添加一个新节点用的头插法
Entry<K,V> e = (Entry<K,V>) tab[index];
// 把当前新节点的next指向e,其中e里面存的是原来的头,现在新的节点放在了index位置,作为头。
tab[index] = new Entry<>(hash, key, value, e);
count++;
}

这边值得注意的一点是,在插入的时候用的是头插法,和HashMap的有所不同,HashMap在插入的时候是在链表或者红黑树的尾端,需要遍历到最后再进行插入操作。

再来看看扩容的rehash()函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected void rehash() {
int oldCapacity = table.length;
Entry<?,?>[] oldMap = table;

// overflow-conscious code
// 扩容的容量为原来的两倍加1
int newCapacity = (oldCapacity << 1) + 1;
// 扩容后的数量校验,不能超过数组的最大长度MAX_ARRAY_SIZE
if (newCapacity - MAX_ARRAY_SIZE > 0) {
if (oldCapacity == MAX_ARRAY_SIZE)
// Keep running with MAX_ARRAY_SIZE buckets
return;
newCapacity = MAX_ARRAY_SIZE;
}
// 重新开辟一个新数组
Entry<?,?>[] newMap = new Entry<?,?>[newCapacity];

modCount++;
// 新的阈值计算
threshold = (int)Math.min(newCapacity * loadFactor, MAX_ARRAY_SIZE + 1);
table = newMap;

// 将原数组中的重新求hash,然后放到新数组的对应位置,注意,这边用的是头插法。
for (int i = oldCapacity ; i-- > 0 ;) {
for (Entry<K,V> old = (Entry<K,V>)oldMap[i] ; old != null ; ) {
Entry<K,V> e = old;
old = old.next;

int index = (e.hash & 0x7FFFFFFF) % newCapacity;
e.next = (Entry<K,V>)newMap[index];
newMap[index] = e;
}
}
}

这里值得注意的一点是,在rehash的时候采用的依然是头插法,而HashMap在jdk1.7以及之前也用的是头插法,但是因为没有synchronized关键字,不能保证线程安全,并且会产生死循环的问题,虽然在1.8之后改用了另外的方法,避免了死循环的问题,但是线程安全的问题仍然不能解决,而HashTable用的也是头插法,但是用了synchronized,保证线程安全。

但是,总的来说HashTable使用的频率非常的低,主要和它的性能有关,所以一般在考虑并发的Map的时候,我们多会使用ConcurrentHashMap来实现。

二、ConcurrentHashMap

从第一节我们可以看到,HashTable效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器中的一部分数据。在jdk1.8之前,ConcurrentHashMap的实现原理是当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分为一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程所访问。但是在jdk1.8后,实现线程安全的思想也完全不一样了,是利用CAS+synchronized来实现的,底层是和HashMap一样的数组+链表+红黑树的实现方式。由于ConcurrentHashMap和HashMap结构类似,所以本篇我主要对比他们的源码来进行描述,关于HashMap的可以看这篇博客

ConcurrentHashMap1.7

https://juejin.im/post/5df8d7346fb9a015ff64eaf9https://juejin.im/post/5df8d7346fb9a015ff64eaf9

ConcurrentHashMap1.8

1.基本属性和构造函数

与HashMap类似,ConcurrentHashMap也有初始容量,负载因子等这些属性,初始也是和HashMap一样,默认初始容量是16,负载因子为0.75等等。这边主要介绍几个HashMap中没有的属性

1
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

DEFAULT_CONCURRENCY_LEVEL:默认的最大并发度为16,也就是默认是允许16个线程同时操作一个map实例。

1
private transient volatile int sizeCtl;

sizeCtl:表初始化和扩容的控制位。其中,当为-1时,表示当前table数组正在被初始化;当为-N时,表示有N-1个线程在进行扩容操作;当为0(默认值)时,表示当前table还未使用,此时table为null;当为其余正整数时,表示table的容量,默认是table大小的0.75倍,这边在initTable()函数中进行默认的初始化。

再来看看构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

我们主要关注一下第二个构造函数,它给定默认的初始容量。这边和HashMap一样,也要调用一个tableSizeFor(int x)函数,目的是返回一个大于x且最接近x的2的正数幂的数字,这边的x为initialCapacity + (initialCapacity >>> 1) + 1),最终赋值给sizeCtl。

2.底层数据存储结构

Node类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val; // 设置了volatile同步锁
volatile Node<K,V> next; // 设置了volatile同步锁

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}

与HashMap的Node类对比,有以下几个不同点:

1.val和next都设置了volatile同步锁。

2.它不允许调用setValue方法直接改变value的值。

3.增加了find方法来支持map.get()。

TreeNode类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/**省略部分方法*/
}

该类是树的节点类,我们知道当链表的长度超过8的时候,链表会转化为红黑树,但是与HashMap不同的是,ConcurrentHashMap并不是直接转化为红黑树,而是把树的节点类包装成TreeNode放在TreeBin对象中,再由TreeBin完成红黑树的包装。

再来简单看看TreeBin类吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
//构造方法,把TreeNode作为参数来构造红黑树
TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;
TreeNode<K,V> r = null;
for (TreeNode<K,V> x = b, next; x != null; x = next) {
next = (TreeNode<K,V>)x.next;
x.left = x.right = null;
if (r == null) {
x.parent = null;
x.red = false;
r = x;
}
else {
K k = x.key;
int h = x.hash;
Class<?> kc = null;
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;
assert checkInvariants(root);
}
}

3.插入

put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 得到hash值,其内部实现是 (h ^ (h >>> 16)) & HASH_BITS; 也就是hashcode的高16位和低16位异或再与上HASH_BITS。这边又和HashMap有所不同,HashMap虽然也是高16位和低16位异或,但是并没有与的操作,这边猜测是为了消除符号位的影响。
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// table没有初始化,初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 如果恰好桶的位置没有数据,则使用CAS操作把数据放进去。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// MOVED是hash值为-1的Node,表示当前节点已经被处理过了,这中情况将出现多线程扩容的情况下。此时当前的线程会支持扩容。
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 如果发生了冲突,则需要锁住当前这个位置上的Node对象,也就是链表的头。然后将新的Node添加到这个链表或者红黑树上。
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
// 如果hash值大于0则代表这个节点里的数据是链表
if (fh >= 0) {
binCount = 1; // 链表长度计数
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 如果hash值和key值相同,则覆盖旧值,结束。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e; //记录链表的最后一个节点
// 如果next为null,则新创建一个Node,添加到队尾。
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果是红黑树
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 前面注释了binCount这个变量,为链表的长度,当链表长度大于阈值8的时候,调用这个方法,也就是转化为红黑树。
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 统计结点数量,检查是否需要resize()操作【扩容】
addCount(1L, binCount);
return null;
}

先不考虑里面调用的一些函数,整个流程看下来是和HashMap的put方法十分类似的,步骤是:

  1. 不允许key或者value为null,否则抛出异常。
  2. spread()函数计算hash值。
  3. 遍历table数组,若table为空,则用initTable()来初始化table。若table不为空,则tabAt()方法获得桶的位置,这个tabAt()是一个CAS操作。
  4. 如果恰好桶的位置没有数据,则使用CAS操作casTabAt()把数据放进去,结束循环。注意3和4两步和HashMap是不同的,这边通过CAS操作要保证操作的可见性和线程的安全性。
  5. 如果当前槽位所对应的hash值是MOVED,说明当前的table正在扩容迁移节点,那么就调用helpTransfer帮助迁移。
  6. 如果前面3、4、5都没有满足的情况,则表示当前这个桶位置的元素很多,可能是小于8个的情况,那就是个链表,也可能大于8,那就是红黑树的结构。所以后续的操作和HashMap基本类似,对链表和红黑树分情况讨论。注意,这步操作是要对桶位置的Node节点加上一个synchronized锁的,从而保证线程的安全性。
    1. 链表:如果在链表中找到一个hash值和key都相等的点,则用新值覆盖掉它。如果没有找到,则说明找到了链表的结尾处,就在尾部插入Node。
    2. 红黑树:如果是红黑树,则调用红黑树的插入方法。
    3. 并且在这个过程中是要用binCount来记录长度的。
  7. 最后如果binCount已经大于或等于8了,那么就调用treeifyBin将链表红黑树化。

上面还提到了MOVED,其实hash值有三种特殊的状态

  • MOVED -1
    hash值为-1的Node,表示当前节点已经被处理过了,这中情况将出现多线程扩容的情况下。
  • TREEBIN -2
    hash值为-2的Node,表示当前的节点是TreeBin
  • RESERVED -3
    保留的hash值,用在ReservationNode上面

下面来详细的看一下里面的函数initTable()tabAt(tab, i = (n - 1) & hash))casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))helpTransfer(tab, f)treeifyBin(tab, i)addCount(1L, binCount)

initTable()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果sc小于0,当为-1时,表示当前table数组正在被初始化;当为-N时,表示有N-1个线程在进行扩容操作。这边应该指的是有其他线程在初始化的情况。线程就挂起,也就是只允许有一个线程初始化table。
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 用CAS操作将SIZECTL值赋值为-1,表示当前已经有线程在初始化,这样可以让其他线程都执行上面的if,被挂起,只允许一个线程执行下来。sc还是为0,因为初始的sizeCtl就是为0,通过CAS改为-1了。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 初始化数组时,因为sc为0,所以将sc的值改为初始的容量16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 创建一个初始容量的table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 将sc的设置为总容量的75%,如果 n 为 16 的话,那么这里 sc = 12
sc = n - (n >>> 2);
}
} finally {
// 最后将sizeCtl设置为sc
sizeCtl = sc;
}
break;
}
}
return tab;
}

这个方法的独到之处就是在多线程情况下,只允许一个线程去初始化table,实现table只初始化一次。

tabAt(tab, i = (n - 1) & hash))

1
2
3
4
5
@SuppressWarnings("unchecked")
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
// 保证可见性地去拿到桶位置上的Node
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))

1
2
3
4
5
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
// 用CAS插入一个点
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

helpTransfer(tab, f)

这个函数用于帮助table的扩容操作,当检测到当前哈希表正在进行扩容操作,就让当前线程去协助扩容,提高扩容的效率。这里面有一个ForwardingNode 的节点类型,它是一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张hash表。而且这个节点的key、value、next指针全部为null,它的hash值为MOVED(static final int MOVED = -1)。简单看一下ForwardingNode

1
2
3
4
5
6
7
8
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
/**find方法这边省略*/
}

下面回到helpTransfer(tab, f)函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { // 新的table,并且指向了下一张哈希表
int rs = resizeStamp(tab.length); //返回一个 16 位长度的扩容校验标识
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) { //sizeCtl<0 如果处于扩容状态的话
//这里判断校验标识是否相等,如果校验符不等或者扩容操作已经完成了,直接退出循环,不用协助它们扩容了
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { //sc + 1 标识增加了一个线程进行扩容
transfer(tab, nextTab); //进行扩容
break;
}
}
return nextTab;
}
return table;
}

既然说到了扩容,那就接着看一下扩容操作吧。扩容的一个整体思想分为两步:1.构建一个nextTable,大小要为table的两倍,把table的数据复制到nextTable中。当然,在扩容过程中要支持并发插入的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//计算每条线程处理的桶个数,每条线程处理的桶数量一样,如果CPU为单核,则使用一条线程处理所有桶
//每条线程至少处理16个桶,如果计算出来的结果少于16,则一条线程处理16个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 刚开始扩容,初始化nextTab
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
// transferIndex指向最右边桶的位置
transferIndex = n;
}
int nextn = nextTab.length;
//新建一个占位对象,该占位对象的 hash 值为 -1 该占位对象存在时表示集合正在扩容状态,key、value、next 属性均为 null ,nextTable 属性指向扩容后的数组
//该占位对象主要有两个用途:
// 1、占位作用,用于标识数组该位置的桶已经迁移完毕,处于扩容中的状态。
// 2、作为一个转发的作用,扩容期间如果遇到查询操作,遇到转发节点,会把该查询操作转发到新的数组上去,不会阻塞查询操作。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//该标识用于控制是否继续处理下一个桶,为 true 则表示已经处理完当前桶,可以继续迁移下一个桶的数据
boolean advance = true;
//该标识用于控制扩容何时结束,该标识还有一个用途是最后一个扩容线程会负责重新检查一遍数组查看是否有遗漏的桶
boolean finishing = false; // to ensure sweep before committing nextTab
//这个循环用于处理一个 stride 长度的任务,i 后面会被赋值为该 stride 内最大的下标,而 bound 后面会被赋值为该 stride 内最小的下标
//通过循环不断减小 i 的值,从右往左依次迁移桶上面的数据,直到 i 小于 bound 时结束该次长度为 stride 的迁移任务
//结束这次的任务后会通过外层 addCount、helpTransfer、tryPresize 方法的 while 循环达到继续领取其他任务的效果
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
//每处理完一个hash桶就将 bound 进行减 1 操作
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
// //transferIndex <= 0 说明数组的hash桶已被线程分配完毕,没有了待分配的hash桶,将 i 设置为 -1 ,后面的代码根据这个数值退出当前线的扩容操作
i = -1;
advance = false;
}
//只有首次进入for循环才会进入这个判断里面去,设置 bound 和 i 的值,也就是领取到的迁移任务的数组区间
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//扩容结束后做后续工作,将 nextTable 设置为 null,表示扩容已结束,将 table 指向新数组,sizeCtl 设置为扩容阈值
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//每当一条线程扩容结束就会更新一次 sizeCtl 的值,进行减 1 操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
//遇到数组上空的位置直接放置一个占位对象,以便查询操作的转发和标识当前处于扩容状态
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//数组上遇到hash值为MOVED,也就是 -1 的位置,说明该位置已经被其他线程迁移过了,将 advance 设置为 true ,以便继续往下一个桶检查并进行迁移操作
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//该节点为链表结构
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//使用高位和低位两条链表进行迁移,使用头插法拼接链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//该节点为红黑树结构
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

treeifyBin(tab, i)

转化为红黑树的相关操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

addCount(1L, binCount)

统计结点数量,检查是否需要resize()操作,扩容操作和之前的是一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

4.查找

在插入的基础上,查找就显得很简单了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 得到hash值。
int h = spread(key.hashCode());
// 这边和HashMap类似的,(n-1)&h = h%n,目的是加快运算速度。
if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
// 当在桶的位置直接查找到就直接返回,也就是在链表的头找到。
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 在红黑树中查找。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 遍历链表查找key对应的val
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

参考:

https://blog.csdn.net/ZOKEKAI/article/details/90051567

https://blog.csdn.net/varyall/article/details/81283231