【hadoop二次开发】检查磁盘资源的守护线程类

    科技2022-08-15  102

    007-hadoop二次开发-NameNode启动流程

    启动NameNode的场景 打开源码文件NameNode.java,找到main函数,找到createNameNode方法进入,经过模式匹配,匹配到NameNode进入,

    /** * 1、对namenode做参数的注册(fs.defaultFS、rpc地址等) * 2、初始化 * 3、根据初始化处理的结果,namenode进入对应的状态(active、backup、standby) * */ protected NameNode(Configuration conf, NamenodeRole role) throws IOException { this.conf = conf; this.role = role;//保存NameNode的角色信息 //设置clients访问nomenode或nameservice的访问地址 配置项fs.defaultFS:hadoop01:9000 setClientNamenodeAddress(conf); String nsId = getNameServiceId(conf); String namenodeId = HAUtil.getNameNodeId(conf, nsId); //ha相关 this.haEnabled = HAUtil.isHAEnabled(conf, nsId); //根据用户设置的启动参数,确定启动以后的初始状态,如果是正常启动,则全部直接进入Standby状态 state = createHAState(getStartupOption(conf)); this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf); //TODO 在创建HA的时候,也启动了standByNameNode的服务 this.haContext = createHAContext(); try { //给联邦模式下准备的,主要是设置联邦模式下namenode的地址和RPC地址 initializeGenericKeys(conf, nsId, namenodeId); //TODO initialize(conf); // HA相关 try { haContext.writeLock(); state.prepareToEnterState(haContext); state.enterState(haContext); } finally { haContext.writeUnlock(); } } catch (IOException e) { this.stop(); throw e; } catch (HadoopIllegalArgumentException e) { this.stop(); throw e; } this.started.set(true); }

    在初始化的过程中,会创建一个createHAContent()方法进入,

    protected HAContext createHAContext() { return new NameNodeHAContext(); }

    有个NameNodeHAContext()构造函数,此方法有很多@Override回调函数,这里看一下startActiveServices()

    @Override public void startActiveServices() throws IOException { try { //HA启动开始阶段,初始化 namesystem.startActiveServices(); startTrashEmptier(conf); } catch (Throwable t) { doImmediateShutdown(t); } }

    进入namesystem.startActiveServices();整体流程在写锁范围内进行的, 首先通过元数据获取editLog文件,然后判断editLog文件是否被写入,在判断中够了一个守护线程Daemon,资源监控的类,说明是一个线程类。 进入NameNodeResourceMonitor()

    /** * Periodically calls hasAvailableResources of NameNodeResourceChecker, and if * there are found to be insufficient resources available, causes the NN to * enter safe mode. If resources are later found to have returned to * acceptable levels, this daemon will cause the NN to exit safe mode. */ class NameNodeResourceMonitor implements Runnable { boolean shouldNNRmRun = true; @Override public void run () { try { while (fsRunning && shouldNNRmRun) { //TODO checkAvailableResources(); if(!nameNodeHasResourcesAvailable()) { String lowResourcesMsg = "NameNode low on available disk space. "; if (!isInSafeMode()) { LOG.warn(lowResourcesMsg + "Entering safe mode."); } else { LOG.warn(lowResourcesMsg + "Already in safe mode."); } //进入安全模式了 enterSafeMode(true); } try { // 每隔5秒进行资源的检查 Thread.sleep(resourceRecheckInterval); } catch (InterruptedException ie) { // Deliberately ignore } } } catch (Exception e) { FSNamesystem.LOG.error("Exception in NameNodeResourceMonitor: ", e); } } public void stopMonitor() { shouldNNRmRun = false; } }

    实现了Runnable,可以运行run函数,在while循环中每个5秒,不断的通过checkAvailableResources();去检查资源数。

    源码文件FSNamesystem.java

    /** * Perform resource checks and cache the results. */ void checkAvailableResources() { Preconditions.checkState(nnResourceChecker != null, "nnResourceChecker not initialized"); hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace(); }

    源码文件NameNodeResourceChecker.java

    /** * Return true if disk space is available on at least one of the configured * redundant volumes, and all of the configured required volumes. * * @return True if the configured amount of disk space is available on at * least one redundant volume and all of the required volumes, false * otherwise. */ //监控NameNode主机上的磁盘还是否可用(空间) //此处代码是在class NameNodeResourceMonitor implements Runnable中循环调用 /** * 如果一旦发现有资源不足的情况,会使NameNode进入安全模式。 * 如果随后返回的状态代表资源大小到达可使用的级别,那么这个线程就使NameNode退出安全模式。 依照这个注释,去解读run()方法的代码逻辑:在一个while循环里,首先判断资源是否可用, 如果不可用,日志里就会发出一个警告信息,然后调用enterSafeMode();进入安全模式。 * */ public boolean hasAvailableDiskSpace() { return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(), minimumRedundantVolumes); }

    源码文件NameNodeResourcePolicy.java

    /** * Given a set of checkable resources, this class is capable of determining * whether sufficient resources are available for the NN to continue operating. */ @InterfaceAudience.Private final class NameNodeResourcePolicy { /** * Return true if and only if there are sufficient NN * resources to continue logging edits. * * @param resources the collection of resources to check. * @param minimumRedundantResources the minimum number of redundant resources * required to continue operation. * @return true if and only if there are sufficient NN resources to * continue logging edits. */ /** * 主要对volumns里面的url进行检查,看看这些url路径是否可用,是否满足继续运行的最小资源数 * */ static boolean areResourcesAvailable( Collection<? extends CheckableNameNodeResource> resources, int minimumRedundantResources) { // TODO: workaround: // - during startup, if there are no edits dirs on disk, then there is // a call to areResourcesAvailable() with no dirs at all, which was // previously causing the NN to enter safemode //如果resources为null,则说明没有本地的edits目录,那么可能是刚启动或者刚格式化 if (resources.isEmpty()) { return true; } //需要的数量 int requiredResourceCount = 0; //冗余的数量 int redundantResourceCount = 0; //无法使用的冗余资源数 int disabledRedundantResourceCount = 0; /** * for{ * if(不需要的){ * 冗余的数量++ * if(url不可用){ * 不可用 ++ * } * }else{ * 需要的数量 ++ * if(需要的,但是确不可用){ * return false * } * } * * } * */ for (CheckableNameNodeResource resource : resources) { //如果不是当前namenode需要的资源(edits路径),则redundantResourceCount++; if (!resource.isRequired()) { redundantResourceCount++; //如果目录不可用,则disabledRedundantResourceCount++; if (!resource.isResourceAvailable()) {//isResourceAvailable --》检查目录空间大小 disabledRedundantResourceCount++; } } else {//如果当前的路径是namenode需要的,并且空间不够100M,那么返回false,直接进入安全模式 requiredResourceCount++; if (!resource.isResourceAvailable()) { // Short circuit - a required resource is not available. return false; } } } if (redundantResourceCount == 0) { // If there are no redundant resources, return true if there are any // required resources available. return requiredResourceCount > 0; } else { //minimumRedundantResources 继续运行所需要的最少冗余资源数 //冗余的数量 - 无法使用的冗余资源数 >= 继续运行所需要的最少冗余资源数 return redundantResourceCount - disabledRedundantResourceCount >= minimumRedundantResources; } } }
    Processed: 0.010, SQL: 9