分布式锁的实现

前言


锁的出现防止了在多线程的情况下对一些共享变量的错误操作,一般用的最多的锁无非是synchronized和Lock,在单机的情况下能保证线程的安全性,但是在分布式的情况下这种加锁方式并不能保证线程的安全性,因此提出分布式锁的概念。分布式锁的实现主要有Redis、Zookeeper、Mysql等,本篇主要介绍基于Redis和Zookeeper的分布式锁实现。

基于Redis的分布式锁

分布式锁的想法

要实现基于Redis的分布式锁,一个非常重要的Redis方法就是setnx,这个方法的作用就是SET IF NOT EXIST,也就是当key的值不存在的时候,就进行set操作,当key存在则不操作。但是光设置了值远远是不够的,万一当在设置完值之后某台服务器的Redis就宕机了,这个锁岂不是永远都释放不了了嘛,所以这边还应该设置一个过期时间,即使在宕机之后也能释放掉锁。设置过期时间用到的就是expire操作。

那么是否把setnx命令和expire一起用就可以了呢,其实也是不行的,依然会存在锁释放不了的情况,因为setnx和expire两句话的执行并不是原子性的操作,在setnx之后宕机了依然还没有设置过期时间,不就和之前的那种情况一样了么。所以Redis还为我们提供了一个set的操作,它可以看作是setnx和expire的合并操作,且能保证原子性。

1
SET key value NX PX 1000

key就是键,这边设置为同一个key值就行,value就是属于每一个加锁的请求的一个id值,每次都要确保不一样,在解锁的时候就是靠这个id值来判断是否是当前请求加的锁。所以value可以用UUID或者雪花算法来确保生成的每个值都不一致。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class sellGoods extends Thread{
private static final String lock_key = "redis_lock"; //锁的key
private static myRedisLock redisLock = new myRedisLock();
private int goodsNums = 10; // 10个商品
private static int CustomsNums = 15; //15个顾客来抢商品
private static CountDownLatch latch = new CountDownLatch(CustomsNums);

@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
Synchronized(this) {
if (goodsNums > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
goodsNums--;
System.out.println(goodsNums);
}
}
}

public static void main(String[] args) {
sellGoods test = new sellGoods();
for (int i = 0 ; i < CustomsNums ; ++i) {
new Thread(test).start();
latch.countDown();
}
}
}

上面是模仿15个用户来争抢10个商品的情况,为了不超卖,一般都会加锁来实现,加Synchronized或者Lock其实都能实现,比如上面的这种直接加Synchronized的情况。而在实际情况中,系统是部署在多个机器中的,会出现多个进程并发问题,单纯的加线程的锁是无法解决问题的,为了方便就暂且用这种情况来模拟一下分布式锁的实现。

加锁的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 加锁
* @param lock_key
* @param requestId
* @return
*/
public boolean lock(String lock_key, String requestId) {
Jedis jedis = jedisPool.getResource();
long time1 = System.currentTimeMillis();
try {
while (true) {
// 加锁成功才会跳出
String result = jedis.set(lock_key, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if ("OK".equals(result)) {
return true;
}
// 否则循环等待
Thread.sleep(100);
// 如果超过超时时间,则返回false
if (System.currentTimeMillis()-time1 > timeout) {
return false;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
jedis.close();
}
return false;
}

简单再解释一下这边的jedis.set()

lock_key:key来当锁,并且它是唯一的。

requestId:为了保证解锁的请求是加锁的同一个请求,而不是把其他请求加的锁给解了,这边要求每个请求都要有一个requestId来保证加锁解锁是同一个顾客发起的请求。

SET_IF_NOT_EXIST:这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作。

SET_WITH_EXPIRE_TIME:这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。

expireTime:与第四个参数相呼应,代表key的过期时间。

解锁的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 解锁
* @param lock_key
* @param requestId
* @return
*/
public boolean unlock(String lock_key, String requestId) {
Jedis jedis = jedisPool.getResource();
// LUA脚本
String script =
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end";
try {
Object result = jedis.eval(script, Collections.singletonList(lock_key), Collections.singletonList(requestId));
if ("1".equals(result)) {
return true;
} else {
return false;
}
} finally {
jedis.close();
}
}

在解锁的时候,我们用LUA脚本来完成删除的这一原子操作。先判断当前锁的字符串是否与传入的值相等,是的话就删除Key,解锁成功,返回1。

测试函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class sellGoods extends Thread{
private static final String lock_key = "redis_lock"; //锁的key
private static myRedisLock redisLock = new myRedisLock();
private int goodsNums = 10; // 1000个商品
private static int CustomsNums = 15; //15个顾客来抢商品
private static CountDownLatch latch = new CountDownLatch(CustomsNums);

@Override
public void run() {
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
String requestId = UUID.randomUUID().toString();
redisLock.lock(lock_key, requestId);
if (goodsNums > 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
goodsNums--;
System.out.println(goodsNums);
}
redisLock.unlock(lock_key, requestId);
}

public static void main(String[] args) {
sellGoods test = new sellGoods();
for (int i = 0 ; i < CustomsNums ; ++i) {
new Thread(test).start();
latch.countDown();
}
}
}

其实在使用的时候,可以通过redisson这个客户端工具去直接使用。

基于Zookeeper的分布式锁

分布式锁的想法

通过ZK去实现分布式锁主要是因为ZK的结点唯一,也就是说,不能重复的去创建同一个名字的结点,与Redis中的类比,就像是Redis中的Setnx一样,保证了有值的时候不操作,没有值的时候创建结点。在ZK中有四种类型的结点,分别是持久化的结点、持久化顺序结点、临时的结点和临时顺序结点。这边需要考虑用哪种结点来实现我们的分布式锁。

首先一点,要考虑宕机的情况,如果我们采用持久化结点的做法,一旦获取到分布式锁之后机器宕机,那么结点就不能释放锁,所以一般考虑用临时结点来进行存放,避免死锁的问题。其次,如何知道是否可以去竞争锁了呢,难道要采用和redis一样的循环尝试么?其实ZK为我们提供了watch机制,也就是当一个锁已经释放(结点删除),就可以通知其他请求来争抢锁,但是这样也不是最好的,因为容易产生羊群效应,造成系统资源的浪费,因为大家都去争抢锁了,其实也就只有一个请求能成功获得锁。所以最好的做法是采用顺序的结点,当watch到前一个锁释放了,那么当前请求就获得锁,是按照一个顺序的方式去进行锁的分配。综上,结点的类型应该采用临时顺序结点。

来梳理一下整个实现流程:

分布式锁流程

代码实现

加锁与解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
public class myZKLock implements Lock {

private String lockPath; // 锁的目录
private ZkClient client; // 客户端
private ThreadLocal<String> curPath = new ThreadLocal<>(); // 当前锁的目录
private ThreadLocal<String> beforePath = new ThreadLocal<>(); // 前一个锁的目录

public myZKLock(String lockPath) {
if (lockPath.equals("") || lockPath == null) {
throw new IllegalArgumentException("路径不能为空!");
}
this.lockPath = lockPath;
client = new ZkClient("localhost:2181");
if (!this.client.exists(lockPath)) {
this.client.createPersistent(lockPath, true);
System.out.println("创建成功");
}
}

@Override
public void lock() {
// 如果成功获得锁,返回
if (tryLock()) {
return;
}
// 否则监听等待并尝试加锁
try {
waitForLock();
lock();
} catch (InterruptedException e) {
e.printStackTrace();
}

}

@Override
public void lockInterruptibly() throws InterruptedException {

}

/**
* 尝试获取分布式锁
* @return
*/
@Override
public boolean tryLock() {
if (this.curPath.get() == null || !this.client.exists(this.curPath.get())) {
// 如果是一个新的请求,还未获取过锁,就先创建一个临时有序的结点号,相当于先排队等待
String path = this.client.createEphemeralSequential(lockPath+"/", "lock");
curPath.set(path);
}
// 取得一个最小的结点序号,也就是最先创建的那个先获得锁
List<String> children = this.client.getChildren(lockPath);
Collections.sort(children);
// 如果当前请求是最小的,则获得分布式锁
if (curPath.get().equals(lockPath+"/"+children.get(0))) {
return true;
} else {
// 否则则获取它的前一个结点
int index = children.indexOf(curPath.get().substring(lockPath.length()+1));
System.out.println(Thread.currentThread().getName() + " " + index);
String prepath = lockPath+"/"+children.get(index-1);
beforePath.set(prepath);
}
return false;
}

/**
* 阻塞等待获得锁
* @throws InterruptedException
*/
public void waitForLock() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {

}

@Override
public void handleDataDeleted(String s) throws Exception {
// 监听到被删除,则不阻塞
latch.countDown();
}
};
this.client.subscribeDataChanges(this.beforePath.get(), listener);
// 当发现前一个结点还存在就一直阻塞
if (this.client.exists(this.beforePath.get())) {
latch.await();
}
// before删除,取消监听
client.unsubscribeDataChanges(this.beforePath.get(), listener);
}

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

/**
* 解锁
*/
@Override
public void unlock() {
if (this.curPath.get() != null) {
this.client.delete(this.curPath.get());
this.curPath.set(null);
}
}

@Override
public Condition newCondition() {
return null;
}
}

测试函数与Redis中的类似。

对比

性能上来说,Redis是NoSQL数据库,相对比来说Redis比Zookeeper性能要好。

可靠性来说,Redis有效期不是很好控制,容易产生Redis分布式锁过期时间到了,但是业务代码还没有执行完的情况,这样的话就需要续期(续期一般可以启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端还持有锁key,那么就会不断的延长锁key的生存时间)来操作,而Zookeeper可靠性比Redis更好,由于是临时结点,所以一旦宕机结点就会自动释放,不存在设置有效期的这种情况。