分布式锁是控制分布式系统之间同步访问共享资源的一种方式。分布式锁的实现方式有很多种,比如 Redis 、数据库 、zookeeper 等。这篇文章主要介绍用 Zookeeper 实现分布式锁。

Zookeeper 分布式锁实现原理

先说结论:Zookeeper 是基于临时顺序节点以及 Watcher 监听器机制实现分布式锁的

(1)ZooKeeper 的每一个节点都是一个天然的顺序发号器

在每一个节点下面创建临时顺序节点(EPHEMERAL_SEQUENTIAL)类型,新的子节点后面会加上一个次序编号,而这个生成的次序编号是上一个生成的次序编号加一。

例如,有一个用于发号的节点 “/test/lock” 为父节点,可以在这个父节点下面创建相同前缀的临时顺序子节点,假定相同的前缀为 “/test/lock/seq-”。第一个创建的子节点基本上应该为 /test/lock/seq-0000000001,下一个节点则为 /test/lock/seq-0000000002,依次类推。

(2)ZooKeeper 节点的递增有序性可以确保锁的公平

一个 ZooKeeper 分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT 类型),然后每个要获得锁的线程都在这个节点下创建一个临时顺序节点,该节点是按照创建的次序依次递增的。

为了确保公平,可以简单的规定:编号最小的那个节点表示获得了锁。所以,每个线程在尝试占用锁之前,首先判断自己是序号是不是当前最小,如果是则获取锁。

(3)ZooKeeper 的节点监听机制,可以保障占有锁的传递有序而且高效

每个线程抢占锁之前,先尝试创建自己的 ZNode。同样,释放锁的时候需要删除创建的 Znode。创建成功后,如果不是序号最小的节点,就处于等待通知的状态。每一个等通知的 Znode 节点,需要监视(watch)序号在自己前面的那个 Znode,以获取其删除事件。只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,自己就获得锁。就这样不断地通知后一个 ZNode 节点。

另外,ZooKeeper 的内部优越的机制,能保证由于网络异常或者其他原因,集群中占用锁的客户端失联时锁能够被有效释放。什么机制呢,就是临时顺序节点。一旦占用 Znode 锁的客户端与 ZooKeeper 集群服务器失去联系,这个临时 Znode 也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。

也正是这个原因,zk 中不需要向 redis 那样考虑锁可能出现的无法释放的问题了,因为当客户端挂了,节点也挂了,锁也释放了。

(四)ZooKeeper 的节点监听机制,能避免羊群效应

ZooKeeper 这种首尾相接、后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是一个节点挂掉,所有节点都去监听,然后做出反应,这样会给服务器带来巨大压力。有了临时顺序节点以及节点监听机制,当一个节点挂掉,只有它后面的那一个节点才做出反应。


具体流程

  1. 一把分布式锁通常使用一个 Znode 节点表示;如果锁对应的 Znode 节点不存在,首先创建 Znode 节点。这里假设为 /test/lock,代表了一把需要创建的分布式锁。
  2. 抢占锁的所有客户端,使用锁的 Znode 节点的子节点列表来表示;如果某个客户端需要占用锁,则在 /test/lock 下创建一个临时顺序的子节点。比如,如果子节点的前缀为 /test/lock/seq-,则第一次抢锁对应的子节点为 /test/lock/seq-000000001,第二次抢锁对应的子节点为 /test/lock/seq-000000002,以此类推。
  3. 当客户端创建子节点后,需要进行判断:自己创建的子节点,是否为当前子节点列表中序号最小的子节点。如果是,则加锁成功;如果不是,则监听前一个 Znode 子节点变更消息,等待前一个节点释放锁。
  4. 一旦队列中的后面的节点,获得前一个子节点变更通知,则开始进行判断,判断自己是否为当前子节点列表中序号最小的子节点,如果是,则认为加锁成功;如果不是,则持续监听,一直到获得锁。
  5. 获取锁后,开始处理业务流程。完成业务流程后,删除自己的对应的子节点,完成释放锁的工作,以方面后继节点能捕获到节点变更通知,获得分布式锁。

代码实现

Curator 是Netflix公司开源的一套 ZooKeeper Java客户端框架,相比于 Zookeeper 自带的客户端 zookeeper 来说,Curator 的封装更加完善,各种 API 都可以比较方便地使用。

这里使用 Curator 作为 Zookeeper 的客户端实现。需要先导入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.2.1</version>
</dependency>

客户端创建工厂类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ClientFactory {
//连接地址
private static final String connectionString = "127.0.0.1:2181";
//等待事件的基础单位,单位毫秒
private static final int BASE_SLEEP_TIME = 1000;
//最大重试次数
private static final int MAX_RETRIES = 3;
private static volatile CuratorFramework zkClient;

public static CuratorFramework getClient() { //单例
if (zkClient == null) {
synchronized (ClientFactory.class) {
if (zkClient == null) {
createSimple();
}
}
}
return zkClient;
}

public static void createSimple() {
//重试策略: 第一次重试等待1秒,第二次重试等待2秒,第三次重试等待4秒
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
zkClient.start();
}

public static void createWithOptions(int connectionTimeoutMs, int sessionTimeoutMs) {
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
zkClient = CuratorFrameworkFactory.builder()
.connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs) //连接超时
.sessionTimeoutMs(sessionTimeoutMs) //会话超时
.build();
zkClient.start();
}
}

创建 Lock 锁接口

1
2
3
4
5
6
public interface Lock {
//加锁
boolean lock() throws Exception;
//释放锁
boolean unlock() throws Exception;
}

Lock 实现类(ZkLock)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
public class ZkLock implements Lock{

private String zkPath; //分布式锁节点,如"/test/lock"
private String lockPrefix; //子节点前缀,如"/test/lock/seq-"
private long waitTime; //超时等待
CuratorFramework zkClient; //ZK客户端
private Thread thread; //当前线程
private String lockPath; //当前加锁节点
private String waitPath; //前一个等待节点
final AtomicInteger lockCount = new AtomicInteger(0); //重入计数器

public ZkLock(String zkPath) throws Exception {
this.zkPath = zkPath;
this.lockPrefix = zkPath + "/seq-";
this.waitTime = 0L;
this.zkClient = ClientFactory.getClient();
try {
if (zkClient.checkExists().forPath(zkPath) == null) {
zkClient.create().creatingParentsIfNeeded().forPath(zkPath);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public ZkLock(String zkPath, long waitTime) {
this.zkPath = zkPath;
this.lockPrefix = zkPath + "/seq-";
this.waitTime = waitTime;
this.zkClient = ClientFactory.getClient();
try {
if (zkClient.checkExists().forPath(zkPath) == null) {
zkClient.create().creatingParentsIfNeeded().forPath(zkPath);
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 加锁
*/
@Override
public boolean lock() throws Exception {
//可重入
synchronized (this) {
if (lockCount.get() == 0) {
thread = Thread.currentThread();
lockCount.incrementAndGet();
} else {
if (!thread.equals(Thread.currentThread())) {
return false;
}
lockCount.incrementAndGet();
return true;
}
}
return tryLock();
}

/**
* 尝试获取锁
*/
private boolean tryLock() throws Exception {
lockPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(lockPrefix);
List<String> childList = zkClient.getChildren().forPath(zkPath);
if (childList.size() == 1) {
return true;
} else {
Collections.sort(childList);
String curNode = lockPath.substring(zkPath.length() + 1);
int index = childList.indexOf(curNode);
if (index < 0) {
throw new Exception("加锁异常");
} else if (index == 0) {
//第一个节点,加锁成功
return true;
} else {
//监听前一个节点
waitPath = zkPath + "/" + childList.get(index - 1);
final CountDownLatch waitLatch = new CountDownLatch(1);
Watcher w = new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getType() == Event.EventType.NodeDeleted &&
watchedEvent.getPath().equals(waitPath)) {
System.out.println("监听到节点删除事件:" + watchedEvent);
waitLatch.countDown();
}
}
};
zkClient.getData().usingWatcher(w).forPath(waitPath);
if (waitTime == 0L) {
waitLatch.await();
return true;
} else {
return waitLatch.await(waitTime, TimeUnit.SECONDS);
}
}
}
}

/**
* 释放锁
*/
@Override
public boolean unlock() throws Exception {
if (!thread.equals(Thread.currentThread())) {
return false;
}
int newLockCount = lockCount.decrementAndGet();
if (newLockCount < 0) {
throw new Exception("解锁异常");
} else if (newLockCount > 0) {
return true;
} else {
try {
if (zkClient.checkExists().forPath(lockPath) != null) {
zkClient.delete().forPath(lockPath);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
}
}

自定义 ZK 分布式锁测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class ZkLockTest {

public static void main(String[] args) throws Exception {
System.out.println("开始测试ZK分布式锁...");

new Thread(new Runnable() {
@Override
public void run() {
Lock zkLock = new ZkLock("/test/lock", 3L);
System.out.println("线程1启动");
try {
boolean lock = zkLock.lock();
if (lock) {
System.out.println("线程1获取到锁");
Thread.sleep(2000);
zkLock.unlock();
System.out.println("线程1释放锁");
} else {
System.out.println("线程1获取锁失败");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
Lock zkLock = new ZkLock("/test/lock", 3L);
System.out.println("线程2启动");
try {
boolean lock = zkLock.lock();
if (lock) {
System.out.println("线程2获取到锁");
Thread.sleep(2000);
zkLock.unlock();
System.out.println("线程2释放锁");
} else {
System.out.println("线程2获取锁失败");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();
}
}

测试结果:


独占锁 & 共享锁

上面讲的都是基于独占锁的,那么能否实现共享锁呢?答案是可以的。

当操作是读请求,也就是要获取共享锁,如果没有比自己更小的节点,或比自己小的节点都是读请求 ,则可以获取到读锁。若比自己小的节点中有写请求 ,则当前客户端无法获取到读锁,只能等待前面的写请求完成。

如果操作是写请求,也就是要获取独占锁,如果没有比自己更小的节点 ,则表示当前客户端可以直接获取到写锁,对数据进行修改。如果发现有比自己更小的节点,无论是读操作还是写操作,当前客户端都无法获取到写锁,等待前面所有的操作完成。


Curator 实现分布式锁

实际开发过程中,建议使用 Curator 客户端封装的 API 帮助我们实现分布式锁。

Curator 的几种锁方案:

  • InterProcessMutex:分布式可重入排它锁
  • InterProcessSemaphoreMutex:分布式排它锁
  • InterProcessReadWriteLock:分布式读写锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class InterProcessMutexTest {

public static void main(String[] args) {
CuratorFramework zkClient = ClientFactory.getClient();
InterProcessMutex zkMutex = new InterProcessMutex(zkClient, "/test/mutex");

new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程1启动");
try {
zkMutex.acquire(); //阻塞等待,也可超时等待
System.out.println("线程1获取到锁");
Thread.sleep(2000);
zkMutex.release();
System.out.println("线程1释放锁");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();

new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程2启动");
try {
zkMutex.acquire();
System.out.println("线程2获取到锁");
Thread.sleep(2000);
zkMutex.release();
System.out.println("线程2释放锁");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}).start();
}
}

ZooKeeper 分布式锁的优缺点

这里把 Zookeeper 与 Redis 实现分布式锁对比一下:

  • 优点:ZooKeeper分布式锁(如 InterProcessMutex),除了独占锁、可重入锁,还能实现读写锁,并且可靠性比 Redis 更好。
  • 缺点:ZooKeeper实现的分布式锁,性能并不太高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。而 ZK 中创建和删除节点只能通过 Leader 服务器来执行,然后 Leader 服务器还需要将数据同不到所有的 Follower 机器上,这样频繁的网络通信,性能的短板是非常突出的。

在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁,可以使用 Redis 分布式锁。而由于ZooKeeper的高可用特性,所以在并发量不是太高的场景,推荐使用ZooKeeper的分布式锁。

使用 zk 临时节点会存在另一个问题:由于 zk 依靠 session 定期的心跳来维持客户端,如果客户端进入长时间的 GC,可能会导致 zk 认为客户端宕机而释放锁,让其他的客户端获取锁,但是客户端在 GC 恢复后,会认为自己还持有锁,从而可能出现多个客户端同时获取到锁的情形。

针对这种情况,可以通过 JVM 调优,尽量避免长时间 GC 的情况发生。



参考资料:《Java 高并发核心编程——卷1》