接上文单机Zookeeper服务端启动过程
1.上文提到启动过程会调用startup方法org.apache.zookeeper.server.NIOServerCnxnFactory#startup
public synchronized void startup() { //session跟踪器 if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); //初始化RequestProcessor调用链 setupRequestProcessors(); //限流很重要 startRequestThrottler(); registerJMX(); startJvmPauseMonitor(); registerMetrics(); //更新zk为运行中状态 setState(State.RUNNING); requestPathMetricsCollector.start(); localSessionEnabled = sessionTracker.isLocalSessionsEnabled(); //唤醒前面等待的线程处理请求 notifyAll(); }startRequestThrottler()该方法内启动RequestThrottler线程,该线程从submittedRequests队列获取请求,并判断正在处理请求的数量是否小于maxRequests如果超过就停止处理请求,如果通过那么通过zks.submitRequestNow(request)方法调用下文的三大处理器。 2.org.apache.zookeeper.server.ZooKeeperServer#setupRequestProcessors初始化调用链
protected void setupRequestProcessors() { // PrepRequestProcessor线程-->SyncRequestProcessor线程-->FinalRequestProcessor RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor) syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor) firstProcessor).start(); }其中PrepRequestProcessor与SyncRequestProcessor都为线程,也就是说这两个处理器都是异步进行FinalRequestProcessor是在SyncRequestProcessor处理完由它调用。下面分析这三个处理器都做了那些事情。 3.PrepRequestProcessor处理器,主要方法org.apache.zookeeper.server.PrepRequestProcessor#processRequest仅有一行关键代码submittedRequests.add(request);仅把request添加到队列实现解耦,下面看看该队列由谁去消费。 4.上面提到了三大处理器而且前两个处理器都是线程,那么先来分析PrepRequestProcessor的run()方法,代码比较长就不贴出来了,大部分都是switch case操作针对create,delete,acl,multi等分别构造相应的request给后面的处理器处理。比如我们以create命令作为切入点,该处理器先把队列的request转化为record并做了一些常规校验,以及修改parent一些信息注入版本加一,子节点数量加一,生成CreateMode(节点类型)等,注意这里会生成两条record信息,一条是父record,另一条是本record,想想create -s /parent/child1会发生什么吧就很好理解了。最后把生成的两条record加入队列。再有就是run()方法最后一步调用了nextProcessor.processRequest(request)调用下一个处理器SyncRequestProcessor。 5.org.apache.zookeeper.server.PrepRequestProcessor#addChangeRecord
//父子record分别调用此方法把生成的ChangeRecord加入队列 protected void addChangeRecord(ChangeRecord c) { synchronized (zks.outstandingChanges) { zks.outstandingChanges.add(c); zks.outstandingChangesForPath.put(c.path, c); ServerMetrics.getMetrics().OUTSTANDING_CHANGES_QUEUED.add(1); } }6.org.apache.zookeeper.server.SyncRequestProcessor#processRequest是由PrepRequestProcessor调用过来的
public void processRequest(final Request request) { Objects.requireNonNull(request, "Request cannot be null"); request.syncQueueStartTime = Time.currentElapsedTime(); //又是入队列,在FinalRequestProcessor会用到该队列 queuedRequests.add(request); ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1); }prep处理器调用sync处理器把请求扔到了sync处理器的队列(queuedRequests)中 7.SyncRequestProcessor处理器主要负责生成日志与快照文件。类似redis的AOF与RDB文件。 日志文件:为每条修改命令生成日志并持久化到磁盘。 快照文件:对内存生成镜像到磁盘中。 主要分析一下org.apache.zookeeper.server.SyncRequestProcessor#run方法
public void run() { try { // we do this in an attempt to ensure that not all of the servers // in the ensemble take a snapshot at the same time resetSnapshotStats(); lastFlushTime = Time.currentElapsedTime(); while (true) { ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size()); long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay()); //从队列中取request Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS); //说明过了pollTime也没有请求过来,那么不等待了直接把日志提交,先说明下日志提交是两阶段,第一个阶段仅仅写入outputstream中,第二阶段flush时候才持久化到磁盘 if (si == null) { /* We timed out looking for more writes to batch, go ahead and flush immediately */ flush(); //持久化完日志再从队列里获取请求,如果没有获取到那么阻塞等待 si = queuedRequests.take(); } //毒丸消息停止服务端 if (si == REQUEST_OF_DEATH) { break; } long startProcessTime = Time.currentElapsedTime(); //监控指标忽略 ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(startProcessTime - si.syncQueueStartTime); //客户端发来的修改命令(写请求)写入到log文件中,自己跟进去看很简单 if (zks.getZKDatabase().append(si)) { // 判断是否打快照 if (shouldSnapshot()) { resetSnapshotStats(); //下次进来会生成新的log zks.getZKDatabase().rollLog(); // take a snapshot if (!snapThreadMutex.tryAcquire()) { LOG.warn("Too busy to snap, skipping"); } else { //开启单独线程异步打快照DataTree new ZooKeeperThread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch (Exception e) { LOG.warn("Unexpected exception", e); } finally { snapThreadMutex.release(); } } }.start(); } } } else if (toFlush.isEmpty()) { //之前的请求都flush完之后来了一个读请求会进来 //flush完之后toFlush队列为空,这时候可以调用后面的处理器 if (nextProcessor != null) { //调用FinalRequestProcessor nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } } continue; } //读写请求都入队列 toFlush.add(si); //判断是否满足日志提交条件,两阶段的最后一段,提交 if (shouldFlush()) { //提交log,flush到磁盘 flush(); } ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime); } } catch (Throwable t) { handleException(this.getName(), t); } LOG.info("SyncRequestProcessor exited!"); }上面代码的思路还是不错的,作为开发人员可以从中受益。 8.最有一个处理器FinalRequestProcessor该处理器主要功能如下
触发watch机制移除不合格的写请求的ChangeRecord记录,4中提到的record集群模式下会提交proposal我们讲的单机模式下可以忽略记录auditlog给客户端发送response 有兴趣的读者可以自己看下org.apache.zookeeper.server.FinalRequestProcessor#processRequest