单机Zookeeper服务端启动过程

    科技2022-07-10  94

    单机zookeeper服务端启动过程

    准备

    去Github把zookeeper源码down下来,我当时下载的是3.6.1版本的zk源码,单机版zk启动流程还算简单,下面讲解下此过程。 1.查看启动脚本zkServer.cmd

    @echo off REM Licensed to the Apache Software Foundation (ASF) under one or more REM contributor license agreements. See the NOTICE file distributed with REM this work for additional information regarding copyright ownership. REM The ASF licenses this file to You under the Apache License, Version 2.0 REM (the "License"); you may not use this file except in compliance with REM the License. You may obtain a copy of the License at REM REM http://www.apache.org/licenses/LICENSE-2.0 REM REM Unless required by applicable law or agreed to in writing, software REM distributed under the License is distributed on an "AS IS" BASIS, REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. REM See the License for the specific language governing permissions and REM limitations under the License. setlocal call "%~dp0zkEnv.cmd" set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain set ZOO_LOG_FILE=zookeeper-%USERNAME%-server-%COMPUTERNAME%.log echo on call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" "-Dzookeeper.log.file=%ZOO_LOG_FILE%" "-XX:+HeapDumpOnOutOfMemoryError" "-XX:OnOutOfMemoryError=cmd /c taskkill /pid %%%%p /t /f" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFG%" %* endlocal

    可见启动的就是org.apache.zookeeper.server.quorum.QuorumPeerMain类 有效代码就一行

    public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { //关键代码 main.initializeAndRun(args); } catch (IllegalArgumentException e) { LOG.error("Invalid arguments, exiting abnormally", e); LOG.info(USAGE); System.err.println(USAGE); ZKAuditProvider.addServerStartFailureAuditLog(); ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue()); } catch (ConfigException e) { LOG.error("Invalid config, exiting abnormally", e); System.err.println("Invalid config, exiting abnormally"); ZKAuditProvider.addServerStartFailureAuditLog(); ServiceUtils.requestSystemExit(ExitCode.INVALID_INVOCATION.getValue()); } catch (DatadirException e) { LOG.error("Unable to access datadir, exiting abnormally", e); System.err.println("Unable to access datadir, exiting abnormally"); ZKAuditProvider.addServerStartFailureAuditLog(); ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_ACCESS_DATADIR.getValue()); } catch (AdminServerException e) { LOG.error("Unable to start AdminServer, exiting abnormally", e); System.err.println("Unable to start AdminServer, exiting abnormally"); ZKAuditProvider.addServerStartFailureAuditLog(); ServiceUtils.requestSystemExit(ExitCode.ERROR_STARTING_ADMIN_SERVER.getValue()); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); ZKAuditProvider.addServerStartFailureAuditLog(); ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); } LOG.info("Exiting normally"); ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue()); }

    2.org.apache.zookeeper.server.quorum.QuorumPeerMain#initializeAndRun代码如下

    protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); // 解析配置, if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task // 开启任务,清空多余的日志和快照文件 DatadirCleanupManager purgeMgr = new DatadirCleanupManager( config.getDataDir(), config.getDataLogDir(), config.getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); // if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running in standalone mode"); // there is only server in the quorum -- run as standalone //单机模式走此分支 ZooKeeperServerMain.main(args); } }

    3.org.apache.zookeeper.server.ZooKeeperServerMain#main只有一行关键代码

    ------------忽略 main.initializeAndRun(args); ------------忽略

    4.org.apache.zookeeper.server.ZooKeeperServerMain#initializeAndRun 代码比较简单主要做了解析配置文件与关于日志的JMX。 关键代码runFromConfig(config);

    protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } // 从配置文件中解析出ServerConfig ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); } // 关键代码 runFromConfig(config); }

    5.org.apache.zookeeper.server.ZooKeeperServerMain#runFromConfig 核心代码来了

    public void runFromConfig(ServerConfig config) throws IOException, AdminServerException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { try { metricsProvider = MetricsProviderBootstrap.startMetricsProvider( config.getMetricsProviderClassName(), config.getMetricsProviderConfiguration()); } catch (MetricsProviderLifeCycleException error) { throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error); } ServerMetrics.metricsProviderInitialized(metricsProvider); // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call // run() in this thread. // create a file logger url from the command line args // 实例化日志目录、数据目录的工具类 txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); //jvm停顿监控器是一个线程监控fullgc是否超时 JvmPauseMonitor jvmPauseMonitor = null; if (config.jvmPauseMonitorToRun) { jvmPauseMonitor = new JvmPauseMonitor(config); } // final ZooKeeperServer zkServer = new ZooKeeperServer(jvmPauseMonitor, txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, config.listenBacklog, null, config.initialConfig); txnLog.setServerStats(zkServer.serverStats()); // Registers shutdown handler which will be used to know the // server error or shutdown state changes. final CountDownLatch shutdownLatch = new CountDownLatch(1); zkServer.registerServerShutdownHandler(new ZooKeeperServerShutdownHandler(shutdownLatch)); //server启动jetty管理台可以通过http管理查看zk服务端 adminServer = AdminServerFactory.createAdminServer(); adminServer.setZooKeeperServer(zkServer); adminServer.start(); boolean needStartZKServer = true; if (config.getClientPortAddress() != null) { // 默认拿到NIOServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); // NIOServerCnxnFactory // ServerSocketChannel bind地址和端口 ,设置最大客户端连接限制数 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); // 启动accept连接器,select选择器 cnxnFactory.startup(zkServer); // zkServer has been started. So we don't need to start it again in secureCnxnFactory. needStartZKServer = false; } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true); secureCnxnFactory.startup(zkServer, needStartZKServer); } // 启动容器节点定时器 containerManager = new ContainerManager( zkServer.getZKDatabase(), zkServer.firstProcessor, Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), Integer.getInteger("znode.container.maxPerMinute", 10000), Long.getLong("znode.container.maxNeverUsedIntervalMs", 0) ); containerManager.start(); ZKAuditProvider.addZKStartStopAuditLog(); // Watch status of ZooKeeper server. It will do a graceful shutdown // if the server is not running or hits an internal error. // zkServer被shutdown时,这里就会解阻塞,当控制台发出停止指令shutdownLatch减一就会接阻塞,之后会关闭zk服务器 shutdownLatch.await(); // zkServer关闭了,其他东西也得关闭 shutdown(); if (cnxnFactory != null) { cnxnFactory.join(); } if (secureCnxnFactory != null) { secureCnxnFactory.join(); } if (zkServer.canShutdown()) { zkServer.shutdown(true); } } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Server interrupted", e); } finally { if (txnLog != null) { txnLog.close(); } if (metricsProvider != null) { try { metricsProvider.stop(); } catch (Throwable error) { LOG.warn("Error while stopping metrics", error); } } } }

    上面代码主要做了几件事 1)实例化日志,快照的工具类FileTxnSnapLog主要是zk服务端DataTree定时拷贝到磁盘,以及当客户端发送命令到服务端都会用到此工具“实例化”到磁盘持久化 2)JvmPauseMonitor监控JVM停顿时间,STW时间超过一定阈值会输出日志 3)实例化ZooKeeperServer 4)启动jetty管理控制台,可以方便运维人员,开发人员在控制台管控zk 5)创建NIOServerCnxnFactory,设置了端口及客户端最大连接数等 6)启动 cnxnFactory.startup(zkServer),主要初始化线程池WorkerService,启动SelectorThread-负责处理读写事件,启动AcceptThread-负责处理客户端连接事件 7)实例化ContainerManager容器管理器并启动 8)主线程阻塞 shutdownLatch.await();直到接收到关闭命令才会解阻塞,回收资源 主要代码就在if分支

    if (config.getClientPortAddress() != null) { // 默认拿到NIOServerCnxnFactory,还可以配置NettyServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory(); // ServerSocketChannel bind地址和端口 ,设置最大客户端连接限制数 cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false); // 启动accept,select cnxnFactory.startup(zkServer); needStartZKServer = false; }

    6.从cnxnFactory.startup(zkServer);继续跟代码org.apache.zookeeper.server.NIOServerCnxnFactory#startup

    @Override public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException { // 1. 初始化WorkerService 线程池 // 2. 启动SelectorThread,负责接收读写就绪事件 // 3. 启动AcceptThread, 负责接收连接事件 start(); setZooKeeperServer(zks); if (startServer) { // 初始化ZKDatabase,加载数据 zks.startdata(); // 1. 创建sessionTracker // 2. 初始化RequestProcessor Chain // 3. 创建requestThrottler // 4. 注册jmx // 5. 修改为RUNNING状态 // 6. notifyAll(), 因为上面的步骤中会启动线程,那些线程在运行的过程中如果发现一些其他的前置条件还没有满足,则会wait,而此处就会notify唤醒线程 zks.startup(); } }

    6.1org.apache.zookeeper.server.NIOServerCnxnFactory#start

    public void start() { stopped = false; // 工作线程池 if (workerPool == null) { workerPool = new WorkerService("NIOWorker", numWorkerThreads, false); } // 启动selector线程处理读写事件 for (SelectorThread thread : selectorThreads) { if (thread.getState() == Thread.State.NEW) { thread.start(); } } // 启动接收连接线程连接客户端请求并生成SocketChannel绑定到SelectorThread if (acceptThread.getState() == Thread.State.NEW) { acceptThread.start(); } if (expirerThread.getState() == Thread.State.NEW) { expirerThread.start(); } }

    1)acceptThread的run方法是一个循环 2)检查是否有客户端连接事件 3)有连接事件把SocketChannel绑定到selectorThread

    public void run() { try { while (!stopped && !acceptSocket.socket().isClosed()) { try { select(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } } finally { closeSelector(); // This will wake up the selector threads, and tell the // worker thread pool to begin shutdown. if (!reconfiguring) { NIOServerCnxnFactory.this.stop(); } LOG.info("accept thread exitted run method"); } } private void select() { try { selector.select(); Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); while (!stopped && selectedKeys.hasNext()) { SelectionKey key = selectedKeys.next(); selectedKeys.remove(); if (!key.isValid()) { continue; } // 连接事件 if (key.isAcceptable()) { // 接收一个socket连接 if (!doAccept()) { // If unable to pull a new connection off the accept // queue, pause accepting to give us time to free // up file descriptors and so the accept thread // doesn't spin in a tight loop. pauseAccept(10); } } else { LOG.warn("Unexpected ops in accept select {}", key.readyOps()); } } } catch (IOException e) { LOG.warn("Ignoring IOException while selecting", e); } } private boolean doAccept() { boolean accepted = false; SocketChannel sc = null; try { sc = acceptSocket.accept(); accepted = true; if (limitTotalNumberOfCnxns()) { throw new IOException("Too many connections max allowed is " + maxCnxns); } InetAddress ia = sc.socket().getInetAddress(); int cnxncount = getClientCnxnCount(ia); if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) { throw new IOException("Too many connections from " + ia + " - max is " + maxClientCnxns); } LOG.debug("Accepted socket connection from {}", sc.socket().getRemoteSocketAddress()); sc.configureBlocking(false); // Round-robin assign this connection to a selector thread // 每接收到一个socket连接,就以轮询的方式把当前socket连接交给一个selectorThread去处理该socket if (!selectorIterator.hasNext()) { selectorIterator = selectorThreads.iterator(); } SelectorThread selectorThread = selectorIterator.next(); // 把当前sc添加到selectorThread的队列中,selectorThread会处理它 if (!selectorThread.addAcceptedConnection(sc)) { throw new IOException("Unable to add connection to selector queue" + (stopped ? " (shutdown in progress)" : "")); } acceptErrorLogger.flush(); } catch (IOException e) { // accept, maxClientCnxns, configureBlocking ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1); acceptErrorLogger.rateLimitLog("Error accepting new connection: " + e.getMessage()); fastCloseSock(sc); } return accepted; }

    7.下面看下selectThread org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread#run

    public void run() { try { while (!stopped) { try { // 查询就绪事件 select(); processAcceptedConnections(); processInterestOpsUpdateRequests(); } catch (RuntimeException e) { LOG.warn("Ignoring unexpected runtime exception", e); } catch (Exception e) { LOG.warn("Ignoring unexpected exception", e); } } // Close connections still pending on the selector. Any others // with in-flight work, let drain out of the work queue. for (SelectionKey key : selector.keys()) { NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); if (cnxn.isSelectable()) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); } cleanupSelectionKey(key); } // SocketChannel accepted; while ((accepted = acceptedQueue.poll()) != null) { fastCloseSock(accepted); } updateQueue.clear(); } finally { closeSelector(); // This will wake up the accept thread and the other selector // threads, and tell the worker thread pool to begin shutdown. NIOServerCnxnFactory.this.stop(); LOG.info("selector thread exitted run method"); } } private void handleIO(SelectionKey key) { // key封装为一个IOWorkRequest对象,等下交给workerPool进行调度处理 IOWorkRequest workRequest = new IOWorkRequest(this, key); NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment(); // Stop selecting this key while processing on its // connection // 处理完一个key才去处理下一个key cnxn.disableSelectable(); key.interestOps(0); / // 续期 touchCnxn(cnxn); // 前面启动生成的线程池,处理workRequest workerPool.schedule(workRequest); }

    继续跟最后一行代码

    public void schedule(WorkRequest workRequest, long id) { if (stopped) { workRequest.cleanup(); return; } // 这是一个线程,会负责处理客户端请求 // IOWorkRequest ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest); // If we have a worker thread pool, use that; otherwise, do the work // directly. int size = workers.size(); if (size > 0) { try { // make sure to map negative ids as well to [0, size-1] // 单机模式下,服务端在处理命令时,这些命令会并发的交给线程来处理 int workerNum = ((int) (id % size) + size) % size; ExecutorService worker = workers.get(workerNum); worker.execute(scheduledWorkRequest); } catch (RejectedExecutionException e) { LOG.warn("ExecutorService rejected execution", e); workRequest.cleanup(); } } else { // When there is no worker thread pool, do the work directly // and wait for its completion scheduledWorkRequest.run(); } }

    ScheduledWorkRequest本身是个线程 ScheduledWorkRequest-》IOWorkRequest-》NIOServerCnxn

    public void run() { try { // Check if stopped while request was on queue if (stopped) { workRequest.cleanup(); return; } //调用IOWorkrequest workRequest.doWork(); } catch (Exception e) { LOG.warn("Unexpected exception", e); workRequest.cleanup(); } } } public void doWork() throws InterruptedException { if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } // 读就绪或写就绪 if (key.isReadable() || key.isWritable()) { // 处理key // 到这里,多个客户端请求还是并发处理的 //NIOServerCnxn的doIo方法 cnxn.doIO(key); // Check if we shutdown or doIO() closed this connection if (stopped) { cnxn.close(ServerCnxn.DisconnectReason.SERVER_SHUTDOWN); return; } if (!key.isValid()) { selectorThread.cleanupSelectionKey(key); return; } touchCnxn(cnxn); } // Mark this connection as once again ready for selection cnxn.enableSelectable(); // Push an update request on the queue to resume selecting // on the current set of interest ops, which may have changed // as a result of the I/O operations we just performed. if (!selectorThread.addInterestOpsUpdateRequest(key)) { cnxn.close(ServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED); } }

    最终调用到org.apache.zookeeper.server.NIOServerCnxn#doIO去读取客户端发来的数据,后面的代码就不发出来了,很简单有兴趣的读者可以继续看 org.apache.zookeeper.server.NIOServerCnxn#readPayload org.apache.zookeeper.server.NIOServerCnxn#readRequest处理客户端的命令 org.apache.zookeeper.server.ZooKeeperServer#processPacket反序列化packet org.apache.zookeeper.server.ZooKeeperServer#enqueueRequest入队 后面的processor处理过程下次再总结

    Processed: 0.019, SQL: 8