动物园铲屎官Zookeeper——实战篇

    科技2022-07-14  128

    目录

     一、客户端命令行操作 

    1.启动客户端

    2.显示所有操作命令

    3.查看当前znode中所包含的内容

    4.查看当前节点数据并能看到更新次数等数据

    5.创建普通节点

    6.获得节点的值

    7.创建短暂节点

    8.创建带序号的节点

    9.修改节点数据值

    10.节点的值变化监听

    11.节点的子节点变化监听(路径变化)

    12.删除节点

    13.递归删除节点

    14.查看节点状态

    二、API应用

    1、Idea环境搭建

    3、Java客户端操作(带监听)

    三、监听服务器动态上下线案例

    1.需求

    2.需求分析

    3.代码实现


     一、客户端命令行操作 

    命令基本语法

    功能描述

    help

    显示所有操作命令

    ls path [watch]

    使用 ls 命令来查看当前znode中所包含的内容

    ls2 path [watch]

    查看当前节点数据并能看到更新次数等数据

    create

    普通创建

    -s  含有序列

    -e  临时(重启或者超时消失)

    get path [watch]

    获得节点的值

    set

    设置节点的具体值

    stat

    查看节点状态

    delete

    删除节点

    rmr

    递归删除节点

    1.启动客户端

    [root@hadoop003 zookeeper-3.4.10]$ bin/zkCli.sh

    2.显示所有操作命令

    [zk: localhost:2181(CONNECTED) 1] help

    3.查看当前znode中所包含的内容

    [zk: localhost:2181(CONNECTED) 0] ls / [zookeeper]

    4.查看当前节点数据并能看到更新次数等数据

    [zk: localhost:2181(CONNECTED) 1] ls2 / [zookeeper] cZxid = 0x0 ctime = Thu Jan 01 08:00:00 CST 1970 mZxid = 0x0 mtime = Thu Jan 01 08:00:00 CST 1970 pZxid = 0x0 cversion = -1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1

    5.创建普通节点

    [zk: localhost:2181(CONNECTED) 2] create /app1 "hello app1" Created /app1 [zk: localhost:2181(CONNECTED) 4] create /app1/server101 "192.168.1.101" Created /app1/server101

    6.获得节点的值

    [zk: localhost:2181(CONNECTED) 6] get /app1 hello app1 cZxid = 0x20000000a ctime = Mon Jul 17 16:08:35 CST 2017 mZxid = 0x20000000a mtime = Mon Jul 17 16:08:35 CST 2017 pZxid = 0x20000000b cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 10 numChildren = 1 [zk: localhost:2181(CONNECTED) 8] get /app1/server101 192.168.1.101 cZxid = 0x20000000b ctime = Mon Jul 17 16:11:04 CST 2017 mZxid = 0x20000000b mtime = Mon Jul 17 16:11:04 CST 2017 pZxid = 0x20000000b cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 13 numChildren = 0

    7.创建短暂节点

    [zk: localhost:2181(CONNECTED) 9] create -e /app-emphemeral 8888

    (1)在当前客户端是能查看到的

    [zk: localhost:2181(CONNECTED) 10] ls / [app1, app-emphemeral, zookeeper]

    (2)退出当前客户端然后再重启客户端

    [zk: localhost:2181(CONNECTED) 12] quit [bigdata@hadoop104 zookeeper-3.4.10]$ bin/zkCli.sh

    (3)再次查看根目录下短暂节点已经删除

    [zk: localhost:2181(CONNECTED) 0] ls / [app1, zookeeper]

    8.创建带序号的节点

    (1)先创建一个普通的根节点app2

    [zk: localhost:2181(CONNECTED) 11] create /app2 "app2"

    (2)创建带序号的节点

    [zk: localhost:2181(CONNECTED) 13] create -s /app2/aa 888 Created /app2/aa0000000000 [zk: localhost:2181(CONNECTED) 14] create -s /app2/bb 888 Created /app2/bb0000000001 [zk: localhost:2181(CONNECTED) 15] create -s /app2/cc 888 Created /app2/cc0000000002 如果原节点下有1个节点,则再排序时从1开始,以此类推。 [zk: localhost:2181(CONNECTED) 16] create -s /app1/aa 888 Created /app1/aa0000000001

    9.修改节点数据值

    [zk: localhost:2181(CONNECTED) 2] set /app1 999

    10.节点的值变化监听

    (1)在hadoop005主机上注册监听/app1节点数据变化

    [zk: localhost:2181(CONNECTED) 26] get /app1 watch

    (2)在hadoop004主机上修改/app1节点的数据

    [zk: localhost:2181(CONNECTED) 5] set /app1  777

    (3)观察hadoop005主机收到数据变化的监听

    WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/app1

    11.节点的子节点变化监听(路径变化)

    (1)在hadoop005主机上注册监听/app1节点的子节点变化

    [zk: localhost:2181(CONNECTED) 1] ls /app1 watch [aa0000000001, server101]

    (2)在hadoop004主机/app1节点上创建子节点

    [zk: localhost:2181(CONNECTED) 6] create /app1/bb 666 Created /app1/bb

    (3)观察hadoop005主机收到子节点变化的监听

    WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/app1

    12.删除节点

    [zk: localhost:2181(CONNECTED) 4] delete /app1/bb

    13.递归删除节点

    [zk: localhost:2181(CONNECTED) 7] rmr /app2

    14.查看节点状态

    [zk: localhost:2181(CONNECTED) 12] stat /app1 cZxid = 0x20000000a ctime = Mon Jul 17 16:08:35 CST 2017 mZxid = 0x200000018 mtime = Mon Jul 17 16:54:38 CST 2017 pZxid = 0x20000001c cversion = 4 dataVersion = 2 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 3 numChildren = 2

    注意:监听只管用一次,要想再次监听,只能再次执行监控命令。

    二、API应用

    1、Idea环境搭建

    (1)创建一个Maven工程

    (2)添加pom文件

    <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency> </dependencies>

    (3)拷贝log4j.properties文件到项目根目录

    2、Java客户端操作(不带监听)

    imp

    import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; import org.junit.Before; import org.junit.Test; import java.util.List; public class MyZookeeperTestWithoutWatch { ZooKeeper client = null; @Before // 获取客户端 public void getClient()throws Exception { client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, null); System.out.println(client); } @Test // 获取指定节点的所有子节点 public void getAllChildren()throws Exception { List<String> children = client.getChildren("/story", false); for (String child : children) { System.out.println(child); // aaa bbb } } @Test // 创建节点 public void createNode()throws Exception { String s = client.create("/idea", "bigdata".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println(s); // /idea } @Test // 查询指定节点的数据 public void getNodeData()throws Exception { // Stat stat = new Stat(); // stat.setCzxid(0x200000022L); byte[] data = client.getData("/idea", false, null); System.out.println(new String(data)); } @Test // 修改指定节点数据 public void modifyNodeData()throws Exception { // 第一个参数:要修改的节点名称 // 第二个参数:要修改的数据 // 第三个参数:数据版本号 Stat stat = client.setData("/idea", "bigdata".getBytes(), -1); System.out.println("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhh"+stat); } @Test // 删除节点 public void deleteNode()throws Exception { client.delete("/idea",-1); System.out.println("删除成功"); } @Test // 判断节点是否存在 public void existNode()throws Exception { Stat exists = client.exists("/idea", false); if(exists == null){ System.out.println("节点不存在"); } }

    3、Java客户端操作(带监听)

    import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.junit.Before; import org.junit.Test; import java.util.List; public class MyZookeeperTestWithWatch { ZooKeeper client = null; @Before public void getClient()throws Exception { client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, new Watcher() { // 先委托zookeeper帮我监听,如果数据真的发生变化了,那么zookeeper就会调用该方法通知我们, // 我们在这里实现自己的业务 public void process(WatchedEvent event) { System.out.println("回调方法执行:"+event.getPath() + "--" + event.getType()); try { //client.getChildren("/story", true); client.getData("/story", true, null); } catch (Exception e) { e.printStackTrace(); } } }); System.out.println("获取到客户端:"+client); } @Test // 监听某节点的子节点的增减变化 public void getChildrenNode()throws Exception { List<String> children = client.getChildren("/story", true); for (String child : children) { System.out.println(child); } Thread.sleep(Long.MAX_VALUE); } @Test // 监听某节点的数据变化 public void getDataWatch()throws Exception { byte[] data = client.getData("/story", true, null); System.out.println(new String(data)); Thread.sleep(Long.MAX_VALUE); } }

    三、监听服务器动态上下线案例

    1.需求

    某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

    2.需求分析

    1)提供时间查询服务,客户端系统调用该时间查询服务。

    2)动态上线,下线该时间查询服务的节点,让客户端实时感知服务器列表变化,查询时候访问最新的机器节点。

    3)具体实现:

            先在集群上创建/servers节点

    [zk: localhost:2181(CONNECTED) 10] create /servers "servers" Created /servers

    3.代码实现

    import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; public class TimeServer { ZooKeeper client = null; public static void main(String[] args) throws Exception { TimeServer timeServer = new TimeServer(); // 1 获取zookeeper客户端 timeServer.getClient(); // 2 向zookeeper注册自身的ip和端口号,其实就是在/servers/下创建临时节点,并把数据写上去 timeServer.registInfo(args[0],args[1]); // 3 开始授时服务 new TimeService(Integer.parseInt(args[1])).start(); } public void getClient()throws Exception { client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, null); } public void registInfo(String ip,String port) throws KeeperException, InterruptedException { // /servers/server0000001 127.0.0.1:5555 String s = client.create("/servers/server", (ip + ":" + port).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("时间服务器上线了,"+"ip:"+ip+",port:"+port+",创建临时节点:"+s); } } import java.io.IOException; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Date; public class TimeService extends Thread { private int port ; public TimeService(int port){ this.port = port; } @Override public void run() { System.out.println("授时服务开始了"); try { ServerSocket serverSocket = new ServerSocket(port); while(true){ //是个阻塞方法,当有客户端连接的时候,才会建立Socket对象,跟该客户端通信 Socket accept = serverSocket.accept(); OutputStream outputStream = accept.getOutputStream(); outputStream.write(new Date().toString().getBytes()); outputStream.flush(); } } catch (IOException e) { e.printStackTrace(); } } } import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.Random; public class TimeClient { ZooKeeper client = null; List<String> serverList = null; public static void main(String[] args) throws Exception { TimeClient timeClient = new TimeClient(); // 1 获取zookeeper客户端 timeClient.getClient(); // 2 获取时间服务器列表,并委托监听 timeClient.getServerList(); // 3 跟时间服务器通信,获取时间 timeClient.sendRequest(); } public void getClient()throws Exception { client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, new Watcher() { public void process(WatchedEvent event) { System.out.println("回调方法执行:"+event.getPath() + "--" + event.getType()); try { getServerList(); } catch (Exception e) { e.printStackTrace(); } } }); } public void getServerList() throws KeeperException, InterruptedException { List<String> ipPortList = new ArrayList(); List<String> children = client.getChildren("/servers", true); for (String child : children) { // server0000001 byte[] data = client.getData("/servers/" + child, false, null); String ipAndPort = new String(data); ipPortList.add(ipAndPort); } serverList = ipPortList; System.out.println("获取到最新的时间服务器列表:"+serverList); } public void sendRequest() throws IOException, InterruptedException { while (true){ Random random = new Random(); int index = random.nextInt(serverList.size()); // 127.0.0.1:5555 String ipAndPort = serverList.get(index); String[] split = ipAndPort.split(":"); String ip = split[0]; int port = Integer.parseInt(split[1]); Socket socket = new Socket(ip, port); OutputStream outputStream = socket.getOutputStream(); outputStream.write("haha".getBytes()); outputStream.flush(); InputStream inputStream = socket.getInputStream(); byte[] bytes = new byte[1024]; inputStream.read(bytes); System.out.println("从时间服务器:"+ipAndPort+"获取时间:"+new String(bytes)); inputStream.close(); outputStream.close(); socket.close(); Thread.sleep(5000); } } }

     

    Processed: 0.012, SQL: 8