分布式锁是控制分布式系统之间同步访问共享资源的一种方式。分布式锁的实现方式有很多种,比如 Redis
、数据库 、zookeeper
等。这篇文章主要介绍用 Zookeeper
实现分布式锁。
Zookeeper 分布式锁实现原理
先说结论:Zookeeper 是基于临时顺序节点以及 Watcher 监听器机制实现分布式锁的。
(1)ZooKeeper 的每一个节点都是一个天然的顺序发号器。
在每一个节点下面创建临时顺序节点(EPHEMERAL_SEQUENTIAL)类型,新的子节点后面会加上一个次序编号,而这个生成的次序编号是上一个生成的次序编号加一。
例如,有一个用于发号的节点 “/test/lock” 为父节点,可以在这个父节点下面创建相同前缀的临时顺序子节点,假定相同的前缀为 “/test/lock/seq-”。第一个创建的子节点基本上应该为 /test/lock/seq-0000000001,下一个节点则为 /test/lock/seq-0000000002,依次类推。
(2)ZooKeeper 节点的递增有序性可以确保锁的公平。
一个 ZooKeeper 分布式锁,首先需要创建一个父节点,尽量是持久节点(PERSISTENT 类型),然后每个要获得锁的线程都在这个节点下创建一个临时顺序节点,该节点是按照创建的次序依次递增的。
为了确保公平,可以简单的规定:编号最小的那个节点表示获得了锁。所以,每个线程在尝试占用锁之前,首先判断自己是序号是不是当前最小,如果是则获取锁。
(3)ZooKeeper 的节点监听机制,可以保障占有锁的传递有序而且高效。
每个线程抢占锁之前,先尝试创建自己的 ZNode。同样,释放锁的时候需要删除创建的 Znode。创建成功后,如果不是序号最小的节点,就处于等待通知的状态。每一个等通知的 Znode 节点,需要监视(watch)序号在自己前面的那个 Znode,以获取其删除事件。只要上一个节点被删除了,就进行再一次判断,看看自己是不是序号最小的那个节点,如果是,自己就获得锁。就这样不断地通知后一个 ZNode 节点。
另外,ZooKeeper 的内部优越的机制,能保证由于网络异常或者其他原因,集群中占用锁的客户端失联时锁能够被有效释放。什么机制呢,就是临时顺序节点。一旦占用 Znode 锁的客户端与 ZooKeeper 集群服务器失去联系,这个临时 Znode 也将自动删除。排在它后面的那个节点,也能收到删除事件,从而获得锁。
也正是这个原因,zk
中不需要向 redis
那样考虑锁可能出现的无法释放的问题了,因为当客户端挂了,节点也挂了,锁也释放了。
(四)ZooKeeper 的节点监听机制,能避免羊群效应。
ZooKeeper 这种首尾相接、后面监听前面的方式,可以避免羊群效应。所谓羊群效应就是一个节点挂掉,所有节点都去监听,然后做出反应,这样会给服务器带来巨大压力。有了临时顺序节点以及节点监听机制,当一个节点挂掉,只有它后面的那一个节点才做出反应。
具体流程
- 一把分布式锁通常使用一个 Znode 节点表示;如果锁对应的 Znode 节点不存在,首先创建 Znode 节点。这里假设为
/test/lock
,代表了一把需要创建的分布式锁。
- 抢占锁的所有客户端,使用锁的 Znode 节点的子节点列表来表示;如果某个客户端需要占用锁,则在
/test/lock
下创建一个临时顺序的子节点。比如,如果子节点的前缀为 /test/lock/seq-
,则第一次抢锁对应的子节点为 /test/lock/seq-000000001
,第二次抢锁对应的子节点为 /test/lock/seq-000000002
,以此类推。
- 当客户端创建子节点后,需要进行判断:自己创建的子节点,是否为当前子节点列表中序号最小的子节点。如果是,则加锁成功;如果不是,则监听前一个 Znode 子节点变更消息,等待前一个节点释放锁。
- 一旦队列中的后面的节点,获得前一个子节点变更通知,则开始进行判断,判断自己是否为当前子节点列表中序号最小的子节点,如果是,则认为加锁成功;如果不是,则持续监听,一直到获得锁。
- 获取锁后,开始处理业务流程。完成业务流程后,删除自己的对应的子节点,完成释放锁的工作,以方面后继节点能捕获到节点变更通知,获得分布式锁。
代码实现
Curator 是Netflix公司开源的一套 ZooKeeper Java客户端框架,相比于 Zookeeper 自带的客户端 zookeeper 来说,Curator 的封装更加完善,各种 API 都可以比较方便地使用。
这里使用 Curator 作为 Zookeeper 的客户端实现。需要先导入依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>5.2.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>5.2.1</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>5.2.1</version> </dependency>
|
客户端创建工厂类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class ClientFactory { private static final String connectionString = "127.0.0.1:2181"; private static final int BASE_SLEEP_TIME = 1000; private static final int MAX_RETRIES = 3; private static volatile CuratorFramework zkClient;
public static CuratorFramework getClient() { if (zkClient == null) { synchronized (ClientFactory.class) { if (zkClient == null) { createSimple(); } } } return zkClient; }
public static void createSimple() { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); zkClient = CuratorFrameworkFactory.newClient(connectionString, retryPolicy); zkClient.start(); }
public static void createWithOptions(int connectionTimeoutMs, int sessionTimeoutMs) { ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); zkClient = CuratorFrameworkFactory.builder() .connectString(connectionString) .retryPolicy(retryPolicy) .connectionTimeoutMs(connectionTimeoutMs) .sessionTimeoutMs(sessionTimeoutMs) .build(); zkClient.start(); } }
|
创建 Lock 锁接口
1 2 3 4 5 6
| public interface Lock { boolean lock() throws Exception; boolean unlock() throws Exception; }
|
Lock 实现类(ZkLock)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| public class ZkLock implements Lock{ private String zkPath; private String lockPrefix; private long waitTime; CuratorFramework zkClient; private Thread thread; private String lockPath; private String waitPath; final AtomicInteger lockCount = new AtomicInteger(0);
public ZkLock(String zkPath) throws Exception { this.zkPath = zkPath; this.lockPrefix = zkPath + "/seq-"; this.waitTime = 0L; this.zkClient = ClientFactory.getClient(); try { if (zkClient.checkExists().forPath(zkPath) == null) { zkClient.create().creatingParentsIfNeeded().forPath(zkPath); } } catch (Exception e) { e.printStackTrace(); } } public ZkLock(String zkPath, long waitTime) { this.zkPath = zkPath; this.lockPrefix = zkPath + "/seq-"; this.waitTime = waitTime; this.zkClient = ClientFactory.getClient(); try { if (zkClient.checkExists().forPath(zkPath) == null) { zkClient.create().creatingParentsIfNeeded().forPath(zkPath); } } catch (Exception e) { e.printStackTrace(); } }
@Override public boolean lock() throws Exception { synchronized (this) { if (lockCount.get() == 0) { thread = Thread.currentThread(); lockCount.incrementAndGet(); } else { if (!thread.equals(Thread.currentThread())) { return false; } lockCount.incrementAndGet(); return true; } } return tryLock(); }
private boolean tryLock() throws Exception { lockPath = zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(lockPrefix); List<String> childList = zkClient.getChildren().forPath(zkPath); if (childList.size() == 1) { return true; } else { Collections.sort(childList); String curNode = lockPath.substring(zkPath.length() + 1); int index = childList.indexOf(curNode); if (index < 0) { throw new Exception("加锁异常"); } else if (index == 0) { return true; } else { waitPath = zkPath + "/" + childList.get(index - 1); final CountDownLatch waitLatch = new CountDownLatch(1); Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getType() == Event.EventType.NodeDeleted && watchedEvent.getPath().equals(waitPath)) { System.out.println("监听到节点删除事件:" + watchedEvent); waitLatch.countDown(); } } }; zkClient.getData().usingWatcher(w).forPath(waitPath); if (waitTime == 0L) { waitLatch.await(); return true; } else { return waitLatch.await(waitTime, TimeUnit.SECONDS); } } } }
@Override public boolean unlock() throws Exception { if (!thread.equals(Thread.currentThread())) { return false; } int newLockCount = lockCount.decrementAndGet(); if (newLockCount < 0) { throw new Exception("解锁异常"); } else if (newLockCount > 0) { return true; } else { try { if (zkClient.checkExists().forPath(lockPath) != null) { zkClient.delete().forPath(lockPath); } } catch (Exception e) { e.printStackTrace(); return false; } return true; } } }
|
自定义 ZK 分布式锁测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class ZkLockTest {
public static void main(String[] args) throws Exception { System.out.println("开始测试ZK分布式锁...");
new Thread(new Runnable() { @Override public void run() { Lock zkLock = new ZkLock("/test/lock", 3L); System.out.println("线程1启动"); try { boolean lock = zkLock.lock(); if (lock) { System.out.println("线程1获取到锁"); Thread.sleep(2000); zkLock.unlock(); System.out.println("线程1释放锁"); } else { System.out.println("线程1获取锁失败"); } } catch (Exception e) { throw new RuntimeException(e); } } }).start();
new Thread(new Runnable() { @Override public void run() { Lock zkLock = new ZkLock("/test/lock", 3L); System.out.println("线程2启动"); try { boolean lock = zkLock.lock(); if (lock) { System.out.println("线程2获取到锁"); Thread.sleep(2000); zkLock.unlock(); System.out.println("线程2释放锁"); } else { System.out.println("线程2获取锁失败"); } } catch (Exception e) { throw new RuntimeException(e); } } }).start(); } }
|
测试结果:
独占锁 & 共享锁
上面讲的都是基于独占锁的,那么能否实现共享锁呢?答案是可以的。
当操作是读请求,也就是要获取共享锁,如果没有比自己更小的节点,或比自己小的节点都是读请求 ,则可以获取到读锁。若比自己小的节点中有写请求 ,则当前客户端无法获取到读锁,只能等待前面的写请求完成。
如果操作是写请求,也就是要获取独占锁,如果没有比自己更小的节点 ,则表示当前客户端可以直接获取到写锁,对数据进行修改。如果发现有比自己更小的节点,无论是读操作还是写操作,当前客户端都无法获取到写锁,等待前面所有的操作完成。
Curator 实现分布式锁
实际开发过程中,建议使用 Curator 客户端封装的 API 帮助我们实现分布式锁。
Curator 的几种锁方案:
- InterProcessMutex:分布式可重入排它锁
- InterProcessSemaphoreMutex:分布式排它锁
- InterProcessReadWriteLock:分布式读写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class InterProcessMutexTest {
public static void main(String[] args) { CuratorFramework zkClient = ClientFactory.getClient(); InterProcessMutex zkMutex = new InterProcessMutex(zkClient, "/test/mutex");
new Thread(new Runnable() { @Override public void run() { System.out.println("线程1启动"); try { zkMutex.acquire(); System.out.println("线程1获取到锁"); Thread.sleep(2000); zkMutex.release(); System.out.println("线程1释放锁"); } catch (Exception e) { throw new RuntimeException(e); } } }).start();
new Thread(new Runnable() { @Override public void run() { System.out.println("线程2启动"); try { zkMutex.acquire(); System.out.println("线程2获取到锁"); Thread.sleep(2000); zkMutex.release(); System.out.println("线程2释放锁"); } catch (Exception e) { throw new RuntimeException(e); } } }).start(); } }
|
ZooKeeper 分布式锁的优缺点
这里把 Zookeeper 与 Redis 实现分布式锁对比一下:
- 优点:ZooKeeper分布式锁(如 InterProcessMutex),除了独占锁、可重入锁,还能实现读写锁,并且可靠性比 Redis 更好。
- 缺点:ZooKeeper实现的分布式锁,性能并不太高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。而 ZK 中创建和删除节点只能通过 Leader 服务器来执行,然后 Leader 服务器还需要将数据同不到所有的 Follower 机器上,这样频繁的网络通信,性能的短板是非常突出的。
在高性能,高并发的场景下,不建议使用ZooKeeper的分布式锁,可以使用 Redis 分布式锁。而由于ZooKeeper的高可用特性,所以在并发量不是太高的场景,推荐使用ZooKeeper的分布式锁。
使用 zk 临时节点会存在另一个问题:由于 zk 依靠 session 定期的心跳来维持客户端,如果客户端进入长时间的 GC,可能会导致 zk 认为客户端宕机而释放锁,让其他的客户端获取锁,但是客户端在 GC 恢复后,会认为自己还持有锁,从而可能出现多个客户端同时获取到锁的情形。
针对这种情况,可以通过 JVM 调优,尽量避免长时间 GC 的情况发生。
参考资料:《Java 高并发核心编程——卷1》