前言
在之前的博客中,我介绍了HashMap的具体实现以及一些值得注意的地方。HashMap是线程不安全的,换句话说,当我们在并发的情况下使用HashMap的时候,会造成数据的错误。所以在并发的情况下,通常使用HashTable和ConcurrentHashMap这种线程安全的并发容器来代替HashMap这种线程不安全的集合。本篇主要介绍HashTable和ConcurrentHashMap是如何在并发的情况下实现线程安全的。
一、HashTable
HashTable容器使用的是synchronized来保证线程安全,所以在线程竞争激烈的情况下HashTable的效率是非常低下的。主要原因是当有多个线程访问HashTable的时候,会造成阻塞和自旋的情况,我在之前的博客中介绍过这种情况,所以在线程竞争激烈的情况下是不适合用HashTable的。下面我通过HashTable的源码对它如何实现线程安全进行分析。
1.底层数据存储结构
首先要知道的是,HashTable是基于数组+链表的形式,和HashMap类似,其基本的节点类型是Entry<K,V>,都是实现了Map.Entry<K,V>接口,Entry类中的变量都是和HashMap中的一致。另外一点,HashTable与HashMap不同,HashTable是继承自古老的Dictionary类,这个类似乎已经被废弃。
1 | private static class Entry<K,V> implements Map.Entry<K,V> { |
2.构造函数
1 | public Hashtable(int initialCapacity, float loadFactor) { |
HashTable的默认初始化容量是11,HashMap中的是16,而且我们知道,HashMap中的初始容量initialCapacity仅仅是用来计算阈值用的,table的初始化也不在构造函数中进行,而是在扩容的时候初始化。但是HashTable不同, 它在构造函数中就进行table的初始化,并且初始化容量就是initialCapacity,阈值是取了MAX_ARRAY_SIZE + 1和initialCapacity * loadFactor中的最小值。另外我们可以发现,Hashtable并没有太多的常量,比如默认容量大小都是直接写在代码中,而没使用常量。
3.get方法
1 | public synchronized V get(Object key) { |
get方法比较简单,添加了synchronized用来保证线程安全。
4.put方法
1 | public synchronized V put(K key, V value) { |
首先要注意的一点,key不能为null,和HashMap不同,HashMap中的key是允许为空的。并且同样加上了synchronized关键字来保证线程安全。下面来看一下addEntry()函数
1 | private void addEntry(int hash, K key, V value, int index) { |
这边值得注意的一点是,在插入的时候用的是头插法,和HashMap的有所不同,HashMap在插入的时候是在链表或者红黑树的尾端,需要遍历到最后再进行插入操作。
再来看看扩容的rehash()函数。
1 | protected void rehash() { |
这里值得注意的一点是,在rehash的时候采用的依然是头插法,而HashMap在jdk1.7以及之前也用的是头插法,但是因为没有synchronized关键字,不能保证线程安全,并且会产生死循环的问题,虽然在1.8之后改用了另外的方法,避免了死循环的问题,但是线程安全的问题仍然不能解决,而HashTable用的也是头插法,但是用了synchronized,保证线程安全。
但是,总的来说HashTable使用的频率非常的低,主要和它的性能有关,所以一般在考虑并发的Map的时候,我们多会使用ConcurrentHashMap来实现。
二、ConcurrentHashMap
从第一节我们可以看到,HashTable效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器中的一部分数据。在jdk1.8之前,ConcurrentHashMap的实现原理是当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分为一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程所访问。但是在jdk1.8后,实现线程安全的思想也完全不一样了,是利用CAS+synchronized来实现的,底层是和HashMap一样的数组+链表+红黑树的实现方式。由于ConcurrentHashMap和HashMap结构类似,所以本篇我主要对比他们的源码来进行描述,关于HashMap的可以看这篇博客。
ConcurrentHashMap1.7
https://juejin.im/post/5df8d7346fb9a015ff64eaf9https://juejin.im/post/5df8d7346fb9a015ff64eaf9
ConcurrentHashMap1.8
1.基本属性和构造函数
与HashMap类似,ConcurrentHashMap也有初始容量,负载因子等这些属性,初始也是和HashMap一样,默认初始容量是16,负载因子为0.75等等。这边主要介绍几个HashMap中没有的属性
1 | private static final int DEFAULT_CONCURRENCY_LEVEL = 16; |
DEFAULT_CONCURRENCY_LEVEL:默认的最大并发度为16,也就是默认是允许16个线程同时操作一个map实例。
1 | private transient volatile int sizeCtl; |
sizeCtl:表初始化和扩容的控制位。其中,当为-1时,表示当前table数组正在被初始化;当为-N时,表示有N-1个线程在进行扩容操作;当为0(默认值)时,表示当前table还未使用,此时table为null;当为其余正整数时,表示table的容量,默认是table大小的0.75倍,这边在initTable()函数中进行默认的初始化。
再来看看构造函数
1 | public ConcurrentHashMap() { |
我们主要关注一下第二个构造函数,它给定默认的初始容量。这边和HashMap一样,也要调用一个tableSizeFor(int x)函数,目的是返回一个大于x且最接近x的2的正数幂的数字,这边的x为initialCapacity + (initialCapacity >>> 1) + 1),最终赋值给sizeCtl。
2.底层数据存储结构
Node类
1 | static class Node<K,V> implements Map.Entry<K,V> { |
与HashMap的Node类对比,有以下几个不同点:
1.val和next都设置了volatile同步锁。
2.它不允许调用setValue方法直接改变value的值。
3.增加了find方法来支持map.get()。
TreeNode类
1 | static final class TreeNode<K,V> extends Node<K,V> { |
该类是树的节点类,我们知道当链表的长度超过8的时候,链表会转化为红黑树,但是与HashMap不同的是,ConcurrentHashMap并不是直接转化为红黑树,而是把树的节点类包装成TreeNode放在TreeBin对象中,再由TreeBin完成红黑树的包装。
再来简单看看TreeBin类吧
1 | static final class TreeBin<K,V> extends Node<K,V> { |
3.插入
put()
1 | public V put(K key, V value) { |
先不考虑里面调用的一些函数,整个流程看下来是和HashMap的put方法十分类似的,步骤是:
- 不允许key或者value为null,否则抛出异常。
- 用
spread()函数计算hash值。 - 遍历table数组,若table为空,则用
initTable()来初始化table。若table不为空,则tabAt()方法获得桶的位置,这个tabAt()是一个CAS操作。 - 如果恰好桶的位置没有数据,则使用CAS操作
casTabAt()把数据放进去,结束循环。注意3和4两步和HashMap是不同的,这边通过CAS操作要保证操作的可见性和线程的安全性。 - 如果当前槽位所对应的hash值是MOVED,说明当前的table正在扩容迁移节点,那么就调用helpTransfer帮助迁移。
- 如果前面3、4、5都没有满足的情况,则表示当前这个桶位置的元素很多,可能是小于8个的情况,那就是个链表,也可能大于8,那就是红黑树的结构。所以后续的操作和HashMap基本类似,对链表和红黑树分情况讨论。注意,这步操作是要对桶位置的Node节点加上一个synchronized锁的,从而保证线程的安全性。
- 链表:如果在链表中找到一个hash值和key都相等的点,则用新值覆盖掉它。如果没有找到,则说明找到了链表的结尾处,就在尾部插入Node。
- 红黑树:如果是红黑树,则调用红黑树的插入方法。
- 并且在这个过程中是要用binCount来记录长度的。
- 最后如果binCount已经大于或等于8了,那么就调用treeifyBin将链表红黑树化。
上面还提到了MOVED,其实hash值有三种特殊的状态
- MOVED -1
hash值为-1的Node,表示当前节点已经被处理过了,这中情况将出现多线程扩容的情况下。 - TREEBIN -2
hash值为-2的Node,表示当前的节点是TreeBin - RESERVED -3
保留的hash值,用在ReservationNode上面
下面来详细的看一下里面的函数initTable()、tabAt(tab, i = (n - 1) & hash))、casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))、helpTransfer(tab, f)、treeifyBin(tab, i)、addCount(1L, binCount)
initTable()
1 | private final Node<K,V>[] initTable() { |
这个方法的独到之处就是在多线程情况下,只允许一个线程去初始化table,实现table只初始化一次。
tabAt(tab, i = (n - 1) & hash))
1 | ("unchecked") |
casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))
1 | static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, |
helpTransfer(tab, f)
这个函数用于帮助table的扩容操作,当检测到当前哈希表正在进行扩容操作,就让当前线程去协助扩容,提高扩容的效率。这里面有一个ForwardingNode 的节点类型,它是一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张hash表。而且这个节点的key、value、next指针全部为null,它的hash值为MOVED(static final int MOVED = -1)。简单看一下ForwardingNode。
1 | static final class ForwardingNode<K,V> extends Node<K,V> { |
下面回到helpTransfer(tab, f)函数。
1 | final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { |
既然说到了扩容,那就接着看一下扩容操作吧。扩容的一个整体思想分为两步:1.构建一个nextTable,大小要为table的两倍,把table的数据复制到nextTable中。当然,在扩容过程中要支持并发插入的操作。
1 | private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { |
treeifyBin(tab, i)
转化为红黑树的相关操作。
1 | private final void treeifyBin(Node<K,V>[] tab, int index) { |
addCount(1L, binCount)
统计结点数量,检查是否需要resize()操作,扩容操作和之前的是一样的。
1 | private final void addCount(long x, int check) { |
4.查找
在插入的基础上,查找就显得很简单了。
1 | public V get(Object key) { |
参考: