队列同步器AQS

前言


我们知道,Lock接口的实现基本都是通过聚合了一个同步器的子类来完成线程的访问控制的。队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,以下简称AQS。网上关于AQS的总结已经很到位了,这边主要记录一下自己的学习过程。

一、AQS到底是什么?

AQS是一个用来构建锁和其他同步组件的基础框架,像是ReentrantLock、ReentrantReadWriteLock、CountDownLatch等等,这些锁或者同步组件都不陌生,但是之前我并不了解他们的具体底层实现,在了解了AQS之后才感觉到它们其实都是有联系的,而其中的联系就是AQS。AQS内部通过内部类Node构成一个双向的FIFO队列来进行锁的获取,通过一个int类型的成员变量state来控制同步状态,当state=0时,说明没有任何线程占有共享资源的锁,当state=1时,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待,这个state是volatile类型的,也就是可以保证对所有线程的可见性。注意,在AQS中有两种队列的形式,一种是同步队列,用于存放所有等待获取锁的线程,另一种是等待队列(非必要的,只有在Condition的时候才会有,是一个单向的链表),通过Condition调用await()方法释放锁后,将加入等待队列。

二、AQS简单分析

AQS示意图

同步状态是用一个int类型的state来表示的。

队列中每一个节点都相当于一个线程,都在等待当前占用资源的线程释放锁。每个节点(Node)的数据结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static final class Node {
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;

//此线程取消了争抢这个锁
static final int CANCELLED = 1;
//等待被唤醒状态,当前node的后继节点对应的线程需要被唤醒。也就是说如果A节点被设置为SIGNAL,假如B是A的后继节点,那么B需要依赖A节点来唤醒才能拿到锁
static final int SIGNAL = -1;
// 节点线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中
static final int CONDITION = -2;
// 下一次共享式同步状态获取将会无条件地被传播下去
static final int PROPAGATE = -3;

// 可以设置四种状态,就是上面的四种。
volatile int waitStatus;

// 同步队列中的前驱
volatile Node prev;

// 同步队列中的后继
volatile Node next;

// 当前同步队列中该节点代表的线程
volatile Thread thread;

// 等待队列中的后继节点。注意,等待队列是一个单向的链表,是与condition相关的
Node nextWaiter;

//判断是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}

// 获取前驱结点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

// 三种构造函数
Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

同步状态state可以通过三个函数进行操作

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final int getState() {
return state;
}

protected final void setState(int newState) {
state = newState;
}

//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

在没有看ReentrantLock源码前简单想一想,如果我们想要用AQS来实现一个ReentrantLock,应该怎么做呢?首先需要有一个双向的FIFO队列来管理锁,线程会首先获取锁,如果失败的话就包装成一个Node节点加入到同步队列中,我们知道ReentrantLock是独占锁,所以只允许有一个线程持有当前的锁,所以在同步队列中,只有当前节点的前驱是head的时候才有资格去不断地自旋去获得这个锁。当持有锁的线程释放锁的时候,才会唤醒队列的后继线程。

同样的,实现一个Semaphore/CountDownLatch的时候,因为是共享锁的原因,允许有多个线程同时竞争锁,这样就要另外一种操作同步队列的方式。

在AQS中只写了相应的接口,在不同的同步容器或者锁中需要根据具体的情况来实现,所以说,AQS更像是一个框架,而不提供具体的实现。

1
2
3
4
5
6
7
8
9
10
11
12
AQS中的接口
//独占模式
//尝试获取资源,获取成功的话返回true,否则false
tryAcquire(int)
//尝试释放资源,释放成功的话返回true,否则false
tryRealease(int)

//共享模式
//尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryAcquireShared(int)
//尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
tryReleaseShared(int)

三、ReentrantLock的实现

在了解的AQS之后,再来了解一下ReentrantLock是如何实现的。我们知道ReentrantLock是独占锁,并且默认是非公平锁。

ReentrantLock类

ReentrantLock中有三个内置类,其中Sync继承了AbstractQueuedSynchronizer类,因为ReentrantLock有公平锁和非公平锁两种实现,所以NonfairSync和FairSync都是继承自Sync类。

下面以非公平锁NonfairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

// 加锁
final void lock() {
// 执行CAS操作,获取锁
if (compareAndSetState(0, 1))
// 成功则将独占锁线程设置为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
//否则再次请求同步状态
acquire(1);
}

// 尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

对比公平锁FairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

// 尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

其中,它们两个的acquire(1);方法是一样的,都调用的是AbstractQueuedSynchronizer类中的方法如下

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

但是tryAcquire(arg)中的实现是不一样的,分别调用的是NonfairSync内置类和FairSync内置类中的方法,这也是造成公平锁和非公平锁两种实现的本质区别。总结来说,公平锁的tryAcquire(arg)中,会调用hasQueuedPredecessors()判断同步队列是否存在结点,如果存在必须先执行完同步队列中结点的线程,当前线程封装成Node加入同步队列中进行等待。而非公平锁的话,当线程请求到来时,不管同步队列是否存在线程结点,直接尝试获取同步状态,获取成功直接访问共享资源。所以这也是非公平锁的效率高于公平锁的原因。

具体的代码分析可以参考如下两篇博客,都写的十分不错,值得一读:

深入剖析基于并发AQS的(独占锁)重入锁(ReetrantLock)及其Condition实现原理

初步了解AQS是什么(一)

四、再写生产者消费者问题

在此前介绍wait()和notify()的博客中我们实现了一个基于Synchronized的生产者消费者问题的解决方案,但是存在一个问题,notifyAll()的时候唤醒的是所有的线程,会将所有的生产者和消费者都唤醒,而不能仅唤醒生产者或者仅唤醒消费者,我们知道在ReentrantLock中,可以存在多个等待队列,所以,这边也就能用这个来实现仅唤醒生产者或者消费者了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package test.waitAndnotify;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @Author: hqf
* @description:
* @Data: Create in 16:37 2019/10/27
* @Modified By:
*/
public class ProducerConsumer2 {
private static final int CAPACITY = 5;
//创建一个锁对象。
static Lock lock = new ReentrantLock();
//通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者。
static Condition producer_con = lock.newCondition();
static Condition consumer_con = lock.newCondition();
/**
* 生产者
*/
static class Producer extends Thread{
private Queue<Integer> queue;
int maxSize;
int i = 0;

public Producer(Queue<Integer> queue, int maxSize) {
this.queue = queue;
this.maxSize = maxSize;
}

@Override
public void run() {
while (true) {
lock.lock();
try {
while (queue.size() == maxSize) {
try {
System.out.println(Thread.currentThread().getName() + " 生产者生产数量已满,等待消费者线程消费");
producer_con.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + " 生产出" + i);
queue.add(i);
i++;
consumer_con.signalAll(); //通知所有消费者可以消费
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}

/**
* 消费者
*/
static class Consumer extends Thread{
private Queue<Integer> queue;
int maxSize;

public Consumer(Queue<Integer> queue, int maxSize) {
this.queue = queue;
this.maxSize = maxSize;
}

@Override
public void run() {
while (true) {
lock.lock();
try {
while (queue.isEmpty()) {
System.out.println(Thread.currentThread().getName() + " 当前生产队列为空,消费者等待生产者生产");
try {
consumer_con.await();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int x = queue.poll();
System.out.println(Thread.currentThread().getName() + " 消费者消费 " + x);
producer_con.signalAll(); //通知所有生产者可以继续生产
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}

public static void main(String[] args) {
Queue<Integer> queue = new LinkedList<>();
ProducerConsumer.Producer producer1 = new ProducerConsumer.Producer(queue, CAPACITY);
ProducerConsumer.Producer producer2 = new ProducerConsumer.Producer(queue, CAPACITY);
ProducerConsumer.Consumer consumer1 = new ProducerConsumer.Consumer(queue, CAPACITY);
ProducerConsumer.Consumer consumer2 = new ProducerConsumer.Consumer(queue, CAPACITY);
ProducerConsumer.Consumer consumer3 = new ProducerConsumer.Consumer(queue, CAPACITY);
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
consumer3.start();
}
}

五、Synchronized和ReentrantLock区别

ReentrantLock除了能实现Synchronized的功能外,还有三个高级的功能。

等待可中断:在持有锁的线程长时间不释放锁的时候,等待的线程可以选择放弃等待,tryLock(long timeout, TimeUnit unit)

公平锁:按照申请锁的顺序来一次获得锁称为公平锁,synchronized的是非公平锁,ReentrantLock可以通过构造函数实现公平锁。new RenentrantLock(boolean fair)

绑定多个Condition:通过多次newCondition可以获得多个Condition对象,可以简单的实现比较负责的线程同步的功能,通过await(),signal();,比如上面第四节的生产者消费者问题。