本博客日IP超过2000,PV 3000 左右,急需赞助商。
极客时间所有课程通过我的二维码购买后返现24元微信红包,请加博主新的微信号:xttblog2,之前的微信号好友位已满,备注:返现
受密码保护的文章请关注“业余草”公众号,回复关键字“0”获得密码
所有面试题(java、前端、数据库、springboot等)一网打尽,请关注文末小程序
腾讯云】1核2G5M轻量应用服务器50元首年,高性价比,助您轻松上云
看过我博客的网友都知道,我前面写锁写了非常多的文章。包括悲观锁、乐观锁、自旋锁、适应性自旋锁、无锁、偏向锁、轻量级锁、重量级锁、公平锁、非公平锁、可重入锁、非可重入锁、独享锁、共享锁等。本文我们借助 Zookeeper 实现一个不可重入的分布式锁!
前面文章推荐:
- 学会 Java 中的锁,你只需要记住 6 句法则即可!
- 从 ReentrantReadWriteLock 说独享锁(排他锁)与共享锁
- 图解 Java 中的可重入锁与非可重入锁
- 图解 Java 中的公平锁与非公平锁
- InnoDB 的 select 行锁还是表锁
关于锁的文章还有很多,我就先贴出上面的 5 个。
分布式锁对 Java 的 JUC 包来说就显得无能为力了。所以我们要借助 Zookeeper 的最小版本,Redis 的 set 函数,数据库锁来实现分布式锁。
我们知道在 Zookeeper 中是使用文件目录的格式存放节点内容,其中节点类型分为:
- 持久节点(PERSISTENT ):节点创建后,一直存在,直到主动删除了该节点。
- 临时节点(EPHEMERAL):生命周期和客户端会话绑定,一旦客户端会话失效,这个节点就会自动删除。
- 序列节点(SEQUENTIAL ):多个线程创建同一个顺序节点时候,每个线程会得到一个带有编号的节点,节点编号是递增不重复的,如下图:
如上图,三个线程分别创建路径为 /root/node 的节点,可知在 zk 服务器端会在 root 下存在三个 node 节点,并且线程编号是唯一递增。
具体在节点创建过程中,可以混合使用,比如临时顺序节点(EPHEMERAL_SEQUENTIAL),本文我们就使用临时顺序节点来实现分布式锁。
先说一下实现原理:
创建临时顺序节点,比如 /root/node,假设返回结果为 nodeId。
获取 /root 下所有孩子节点,用自己创建的 nodeId 的序号与所有子节点比较,看看自己是不是编号最小的。如果是最小的则就相当于获取到了锁,如果自己不是最小的,则从所有子节点里面获取比自己次小的一个节点,然后设置监听该节点的事件,然后挂起当前线程。
当最小编号的线程获取锁,处理完业务后删除自己对应的 nodeId,删除后会激活比自己大一号的节点的线程从阻塞变为运行态,被激活的线程应该就是当前 node 序列号最小的了,然后就会获取到锁。
明白原理后,代码写起来就非常的简单了。
public class ZookeeperDistributedLock { public final static Joiner j = Joiner.on("|").useForNull(""); //zk客户端 private ZooKeeper zk; //zk是一个目录结构,root为最外层目录 private String root = "/locks"; //锁的名称 private String lockName; //当前线程创建的序列node private ThreadLocal<String> nodeId = new ThreadLocal<>(); //用来同步等待zkclient链接到了服务端 private CountDownLatch connectedSignal = new CountDownLatch(1); private final static int sessionTimeout = 3000; private final static byte[] data= new byte[0]; public ZookeeperDistributedLock(String config, String lockName) { this.lockName = lockName; try { zk = new ZooKeeper(config, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 建立连接 if (event.getState() == KeeperState.SyncConnected) { connectedSignal.countDown(); } } }); connectedSignal.await(); Stat stat = zk.exists(root, false); if (null == stat) { // 创建根节点 zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { throw new RuntimeException(e); } } class LockWatcher implements Watcher { private CountDownLatch latch = null; public LockWatcher(CountDownLatch latch) { this.latch = latch; } @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted) latch.countDown(); } } public void lock() { try { // 创建临时子节点 String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(j.join(Thread.currentThread().getName() + myNode, "created")); // 取出所有子节点 List<String> subNodes = zk.getChildren(root, false); TreeSet<String> sortedNodes = new TreeSet<>(); for(String node :subNodes) { sortedNodes.add(root +"/" +node); } String smallNode = sortedNodes.first(); String preNode = sortedNodes.lower(myNode); if (myNode.equals( smallNode)) { // 如果是最小的节点,则表示取得锁 System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock")); this.nodeId.set(myNode); return; } CountDownLatch latch = new CountDownLatch(1); Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。 // 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听 if (stat != null) { System.out.println(j.join(Thread.currentThread().getName(), myNode, " waiting for " + root + "/" + preNode + " released lock")); latch.await();// 等待,这里应该一直等待其他线程释放锁 nodeId.set(myNode); latch = null; } } catch (Exception e) { throw new RuntimeException(e); } } public void unlock() { try { System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock ")); if (null != nodeId) { zk.delete(nodeId.get(), -1); } nodeId.remove(); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } }
ZookeeperDistributedLock 的构造函数创建 zkclient,并且注册了监听事件,然后调用 connectedSignal.await() 挂起当前线程。当 zkclient 链接到服务器后,会给监听器发送 SyncConnected 事件,监听器判断当前链接已经建立了,则调用 connectedSignal.countDown(); 激活当前线程,然后创建 root 节点。
获取锁的方法 lock,内部首先创建 /root/lockName 的顺序临时节点,然后获取 /root 下所有的孩子节点,并对子节点进行排序,然后判断自己是不是最小的编号,如果是直接返回 true 标示获取锁成功。否者看比自己小一个号的节点是否存在,存在则注册该节点的事件,然后挂起当前线程,等待比自己小一个数的节点释放锁后发送节点删除事件,事件里面激活当前线程。
释放锁的方法 unlock 比较简单,就是简单的删除获取锁时候创建的节点。
Zookeeper 非常的强大,当你真正的了解后,你会发现它能做很多事情。
光分布锁方面,它都能帮助我们实现好几种,如下面我列举的这些:
- 可重入锁Shared Reentrant Lock
- 不可重入锁Shared Lock
- 可重入读写锁Shared Reentrant Read Write Lock
- 信号量Shared Semaphore
- 多锁对象 Multi Shared Lock
其实本文所解释的这个分布式锁,还可以在进一步的优化优化。具体怎么优化,大家可以思考一下!
最后,欢迎关注我的个人微信公众号:业余草(yyucao)!可加作者微信号:xttblog2。备注:“1”,添加博主微信拉你进微信群。备注错误不会同意好友申请。再次感谢您的关注!后续有精彩内容会第一时间发给您!原创文章投稿请发送至532009913@qq.com邮箱。商务合作也可添加作者微信进行联系!
本文原文出处:业余草: » Java + Zookeeper 实现不可重入的分布式锁