Zookeeper-watcher

    科技2024-06-27  65

    1.Watcher监听机制

    Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper实现分布式锁,发布订阅(多个订阅者同时监听某一个主题对象,当这个主题对象自身状态发生变化时,会通知所有订阅者)等功能。

    2.Watcher 特性

    当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息(Watcher 是一次性的操作)。 可以通过循环监听去达到永久监听效果。

    特性说明一次性watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册客户端顺序回调watcher回调是顺序串行的,只有回调后客户端才能看到最新数据状态。一个 watcher回调逻辑不应太多,以免影响其他 watcher的执行轻量级watchEvent是最小的通信单位,结构上只包含通知状态、事件类型和节点路径。,并不会告诉节点变化前后的具体内容时效性watcher只有在当前session彻底失效时才会无效,若session有效期内快速重连成功,则 watcher依然存在,仍可接收通知

    3.如何注册事件

    ZooKeeper 的 Watcher 机制,总的来说可以分为三个过程:客户端注册 Watcher、服务器处理Watcher 和客户端回调 Watcher客户端。 注册 watcher 有 3 种方式,exists、getData、getChildren

    事件类型说明None客户端与服务端成功建立会话NodeCreated节点创建事件NodeDeleted节点删除事件NodeDataChanged节点数据变化事件NodeChildrenChanged子节点变化事件(创建、删除)

    4.Java API

    POM的JAR版本根据使用Zookeeper服务的版本而定(zookeeper-3.6.2)

    <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.6.2</version> </dependency> import org.apache.commons.lang.StringUtils; import org.apache.zookeeper.data.Stat; import org.jboss.netty.util.internal.StringUtil; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; public class Zk_Watcher { ZooKeeper zk; @Before public void init() throws IOException, KeeperException, InterruptedException { zk= new ZooKeeper("127.0.0.1:2181", Integer.MAX_VALUE,new Watcher() { //全局监听 public void process(WatchedEvent watchedEvent) { //客户端回调Watcher System.out.println("-----------------------------------------"); System.out.println("连接状态:" + watchedEvent.getState()); System.out.println("事件类型:" + watchedEvent.getType()); System.out.println("节点路径:" + watchedEvent.getPath()); System.out.println("-----------------------------------------"); } } ); } /** * exists监听事件: * NodeCreated:节点创建 * NodeDeleted:节点删除 * NodeDataChanged:节点内容变化 * @throws IOException * @throws KeeperException * @throws InterruptedException */ @Test public void test1() throws IOException, KeeperException, InterruptedException { //exists注册监听 zk.exists("/watcher-exists", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("-----------------------------------------"); System.out.println("连接状态:" + watchedEvent.getState()); System.out.println("事件类型:" + watchedEvent.getType()); System.out.println("节点路径:" + watchedEvent.getPath()); System.out.println("-----------------------------------------"); try { zk.exists("/watcher-exists",this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); //不开启ACL,以持久化自动生成序列方式创建 zk.create("/watcher-exists", "watcher-exists".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //通过修改的事务类型操作来触发监听事件 zk.setData("/watcher-exists", "watcher-exists2".getBytes(), -1); //删除节点看看能否触发监听事件 zk.delete("/watcher-exists", -1); Thread.sleep(Integer.MAX_VALUE); } /** * getData监听事件: * NodeDeleted:节点删除 * NodeDataChange:节点内容发生变化 * @throws IOException * @throws KeeperException * @throws InterruptedException */ @Test public void test2() throws IOException, KeeperException, InterruptedException { //不开启ACL,以持久化自动生成序列方式创建 zk.create("/watcher-getData", "watcher-getData".getBytes() , ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //getData注册监听 zk.getData("/watcher-getData", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("-----------------------------------------"); System.out.println("连接状态:" + watchedEvent.getState()); System.out.println("事件类型:" + watchedEvent.getType()); System.out.println("节点路径:" + watchedEvent.getPath()); System.out.println("-----------------------------------------"); try { zk.exists("/watcher-getData",this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } },null); //通过修改的事务类型操作来触发监听事件 zk.setData("/watcher-getData", "watcher-getData2".getBytes(), -1); ///删除节点看看能否触发监听事件 zk.delete("/watcher-getData", -1); //Thread.sleep(Integer.MAX_VALUE); } /** * getChildren监听事件: * NodeChildrenChanged:子节点发生变化(创建,删除) * NodeDeleted:节点删除 * @throws IOException * @throws KeeperException * @throws InterruptedException */ @Test public void test3() throws IOException, KeeperException, InterruptedException { zk.create("/watcher-getChildren",null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //getChildren注册监听 zk.getChildren("/watcher-getChildren", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { System.out.println("-----------------------------------------"); System.out.println("连接状态:" + watchedEvent.getState()); System.out.println("事件类型:" + watchedEvent.getType()); System.out.println("节点路径:" + watchedEvent.getPath()); System.out.println("-----------------------------------------"); try { zk.getChildren("/watcher-getChildren",this); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); zk.create("/watcher-getChildren/watcher-getChildren01","watcher-getChildren01".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk.setData("/watcher-getChildren/watcher-getChildren01","watcher-getChildren02".getBytes(), -1);//修改子节点 zk.delete("/watcher-getChildren/watcher-getChildren01", -1);//删除子节点 zk.setData("/watcher-getChildren","ccccc".getBytes(),-1); zk.delete("/watcher-getChildren", -1);//删除根节点 } }
    Processed: 0.013, SQL: 9