Zookeeper 存储数据的结构是一个树
分为持久节点 和 瞬时节点持久节点你的会话结束或者是 Zookeeper 重启,这些节点呢都是不会消失的,它会一直存在不会消失的,除非你手动删除这些节点瞬时节点(有序),Zookeeper 重启或者连接Zookeeper 会话断了,那么瞬时节点呢就会自动的去消失。瞬时节点不可以再去拥有子节点,如果当前节点是瞬时节点,那么子节点是不可能会存在的,为什么呢,因为如果你自动消失了,你这个瞬时节点的子节点是不是也会跟着消失啊,所以这个顾虑 Zookeeper 规定,瞬时节点是不能拥有子节点的,瞬时节点它还可以一个有序,有序是什么意思,就是你在创建这个瞬时节点的时候呢,你的名称是按照序号排序的,这一点是非常的重要的Zookeeper 另外一个非常重要的概念,就是这个观察器,它是可以检查Zookeeper 里边某个节点的变化,比如这个节点由存在到不存在,就是节点消失了,有可能节点数据发生了变化,比如这个节点里面的内容改变了,也有可能它的子节点发生了变化。我们都可以检测到这些节点的变化,如果它有变化呢,它会立刻通知到客户端,就是咱们在程序当中连接 Zookeeper 对吧。它会立刻通知你的程序,我这个节点数据发生变化呢,你接下来要做哪些操作,这个就是Zookeeper 的观察器 观察器可以监测3个方法: getData(); getChildren(); exists();
观察器只能监听一次,如果你要再次监控这个节点的话呢,你需要重新设置这个观察器。这是Zookeeper 的一个特点
主要是利用了 Zookeeper 瞬时有序节点这样的一个特性
多线程并发创建的时候,它的这个节点并不会只有一个,比如说咱们有10个线程,同时去Zookeeper 里边创建节点,并发并不代表说Zookeeper 只创建了一个节点,不是这样。10个线程同时去创建瞬时节点,它会有10个瞬时节点创建成功,但是这10个瞬时节点呢,它是有序的,就是说你10个线程创建10个节点。它的名称是不一样的,它的序号也是不一样的。咱们规定序号最小的这个线程获得这把锁,其他的线程呢都没有获得锁,这个就是分布式的情况了。你两个JVM 或者多个JVM ,同时去Zookeeper 里边创建这个瞬时节点,你所有的线程都创建成功了,但是序号最小的那个线程获得锁。其他线程都是获取锁失败。就是按照这么一个逻辑去实现 Zookeeper 的分布式锁其他的线程怎么办?
其他的线程就监听 自己序号的前一个序号,比如说你有 10个线程,创建了10个瞬时节点,序号最小的是1 ,那就是第1个线程是获得到了锁,它去执行后面的逻辑。那第2个线程怎么办,第2个线程要监听序号为1的这个节点。等序号为1的节点消失了。消失是什么意思,就是第1个线程执行完了,序号为1的节点消失了,它立刻发送一个通知,因为我们设置观察器嘛,序号为2的这个线程监听序号1的这个节点。序号为1的这个节点消失了。它会马上通知序号为2的这个线程。这个线程2就开始执行了
maven
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.14</version> </dependency>实现层
package com.example.zookeeperlock.util; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.io.IOException; import java.util.Collections; import java.util.List; /** * @Author: qiuj * @Description: * @Date: 2020-10-24 11:28 */ @Slf4j public class ZkLock implements AutoCloseable,Watcher { public ZkLock() throws IOException { this.zooKeeper = new ZooKeeper("localhost:2181", 10000,this); } private ZooKeeper zooKeeper; private String currentNode; public boolean getLock (String businessCode) throws KeeperException, InterruptedException { try { // 设置根路径 Stat stat = zooKeeper.exists("/" + businessCode,false); if (stat == null) { zooKeeper.create("/" + businessCode,businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } // 创建子节点 currentNode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_",businessCode.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 查询出所有节点 List<String> nodeList = zooKeeper.getChildren("/" + businessCode,false); // 从小到大排序 Collections.sort(nodeList); // 查询出第一个节点 和当前创建节点进行匹配是否一致 一致则获得到锁 String firstNode = nodeList.get(0); if (currentNode.endsWith(firstNode)) { return true; } // 没有获得到锁 则监听当前节点的上一个节点 for (String node : nodeList) { if (currentNode.endsWith(node)) { zooKeeper.exists("/" + businessCode + "/" + firstNode,true); break; } else { firstNode = node; } } // 让该线程进入等待 把执行wait的前提条件放入synchronized中,这样才可以避免wait执行之前执行notify synchronized (this) { wait(); } return true; } catch (Exception e) { e.printStackTrace();; } return false; } @Override public void process(WatchedEvent event) { // watch观察器 监听触发此方法 当前节点删除则唤醒该线程 ,也间接说明获得了锁 if (Event.EventType.NodeDeleted == event.getType()) { synchronized (this) { notify(); } } } @Override public void close() throws Exception { // 方法调用结束之后将当前节点删除 并释放锁 zooKeeper.delete(currentNode,-1); zooKeeper.close(); log.info("释放锁"); } }Controller 层调用
package com.example.zookeeperlock; import com.example.zookeeperlock.util.ZkLock; import lombok.extern.slf4j.Slf4j; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; /** * @Author: qiuj * @Description: * @Date: 2020-10-05 14:47 */ @Slf4j @RestController public class ZookeeperLockController { @RequestMapping("/zkLock") public String zkLock () { log.info("我进入了方法!"); try (ZkLock zkLock = new ZkLock()) { if (zkLock.getLock("order")) { log.info("我获得了锁"); Thread.sleep(10000); } } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } log.info("方法执行完成!"); return "方法执行完成!"; } }开启两个应用 8080 8081
8080 19秒进入方法 19秒获得了锁 29秒释放了锁
8081 20秒进入方法 29秒获得了锁
说明在多个JVM 情况下,分布式锁是有效的
maven
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.2.0</version> </dependency>Applicaton 启动类
package com.example.zookeeperlock; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class ZookeeperLockApplication { public static void main(String[] args) { SpringApplication.run(ZookeeperLockApplication.class, args); } /** * 注入Bean * @return * @throws Exception */ @Bean(initMethod="start",destroyMethod="close") public CuratorFramework getCuratorFramework () throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy); return client; } }Controller 层
package com.example.zookeeperlock; import com.example.zookeeperlock.util.ZkLock; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; /** * @Author: qiuj * @Description: * @Date: 2020-10-05 14:47 */ @Slf4j @RestController public class ZookeeperLockController { @Autowired private CuratorFramework client; @RequestMapping("/zk") public String curatorLock () { log.info("我进入了方法!"); InterProcessMutex lock = new InterProcessMutex(client,"/curatorLock"); try { if ( lock.acquire(30, TimeUnit.SECONDS) ) { log.info("我获得了锁"); Thread.sleep(10000); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); log.info("释放锁"); } catch (Exception e) { e.printStackTrace(); } } log.info("方法执行完成!"); return "方法执行完成!"; } }