怎么创建RPC的呢?进入createRpcServer方法
/** * Create the RPC server implementation. Used as an extension point for the * BackupNode. */ protected NameNodeRpcServer createRpcServer(Configuration conf) throws IOException { return new NameNodeRpcServer(conf, this); }在这里new了一个Rpc对象,查看它的构造函数
/** * 根据代码的逻辑划分,主要有三部分: * 第一部分,实例化各种通信协议和服务对象,比如:负责创建目录、管理block、设置权限等操作的客户端同NameNode通信的 协议服务——ClientNameNodeProtocolServerSideTranslatorPB、负责datanode的启动时向NameNode注册、 发送心跳报告、block信息报告等操作的datanode同NameNode通信的协议服务——DatanodeProtocolServerSideTranslatorPB 第二部分,实例化了一个监听datanode请求的rpc server, 并且将第一部分实例化的各种ProtocolService同此rpc server进行绑定, 用于处理rpc server监听到的来自datanode的各种rpc请求。 第三部分,实例化了一个监听客户端请求的rpc server, 并将第一部分实例化的各种ProtocolService同此rpc server进行绑定, 用于处理监听到的来自客户端的rpc请求。 * */ public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { this.nn = nn; this.namesystem = nn.getNamesystem(); this.retryCache = namesystem.getRetryCache(); this.metrics = NameNode.getNameNodeMetrics(); //NameNode有一个工作线程池用来处理客户端的远程过程调用及集群守护进程的调用,默认线程数是10 int handlerCount = conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, DFS_NAMENODE_HANDLER_COUNT_DEFAULT); //TODO 第一部分 //设置ProtolEngine,目前只支持PB协议。表示接收到的RPC协议如果是ClientNamenodeProtocolPB, //那么处理这个RPC协议的引擎是ProtobufRpcEngine RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);然后对各个协议进行绑定初始化。 接下进入第二部分
//TODO 第二部分 启动ServiceRpcServer //这个服务是用来namenode和datanode之间的调用(握手、注册、心跳、数据块的汇报) this.serviceRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build(); //TODO 将前面实例化的各种协议service添加到这个监听客户端请求的RPC server // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class, refreshAuthService, serviceRpcServer); DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, serviceRpcServer); // We support Refreshing call queue here in case the client RPC queue is full DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class, refreshCallQueueService, serviceRpcServer); DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class, genericRefreshService, serviceRpcServer); DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, serviceRpcServer); DFSUtil.addPBProtocol(conf, TraceAdminProtocolPB.class, traceAdminService, serviceRpcServer); // Update the address with the correct port InetSocketAddress listenAddr = serviceRpcServer.getListenerAddress(); serviceRPCAddress = new InetSocketAddress( serviceRpcAddr.getHostName(), listenAddr.getPort()); nn.setRpcServiceServerAddress(conf, serviceRPCAddress); } else { serviceRpcServer = null; serviceRPCAddress = null; } //获取namenode的RPC地址,默认localhost:9000 InetSocketAddress rpcAddr = nn.getRpcServerAddress(conf); String bindHost = nn.getRpcServerBindHost(conf); if (bindHost == null) { bindHost = rpcAddr.getHostName(); } LOG.info("RPC server is binding to " + bindHost + ":" + rpcAddr.getPort());第三部分
```java //TODO 第三部分 启动clientRpcServer // 这个服务主要是用户使用客户端与Namenode和datanode进行交互服务调用的 this.clientRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(rpcAddr.getPort()) .setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();这样源码文件NameNode.java中的rpcServer构建完成。 然后进一步执行startCommonServices检查资源情况、判断是否进入安全状态、激活BlockManager。
当rpcServer创建完成后,判断当前NameNode地址是否为空,因为localhost:9000所以不会走这里
if (clientNamenodeAddress == null) { // This is expected for MiniDFSCluster. Set it now using // the RPC server's bind address. clientNamenodeAddress = NetUtils.getHostPortString(rpcServer.getRpcAddress()); LOG.info("Clients are to use " + clientNamenodeAddress + " to access" + " this namenode/service."); }然后判断NameNode角色,如果当前NameNode角色为是NameNode,会绑定一些变量到httpServer上,比如NameNode地址,元数据信息,这样才能在Web页面上看到一些相关的信息。
if (NamenodeRole.NAMENODE == role) { httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); }然后创建JvmPauseMonitor,用来监控JVM停顿的,垃圾回收信息等。 进行启动,放到metrics中开始监控
pauseMonitor = new JvmPauseMonitor(conf); pauseMonitor.start(); metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);最重要的部分是startCommonServices(conf);
/** * 启动一些公共服务,NameNode的RPC服务就是在这里启动 * 1):进行资源的检查,检查是否有足够的磁盘来存储元数据 * 比如:hadoop-daemon.sh start namenode 这个时候就会检查存储元数据的磁盘,是否满足100M * 2):进行安全模式检查,检查是否可以退出安全模式 * */ startCommonServices(conf);进入该方法后,启动namesystem下的namesystem.startCommonServices(conf, haContext); 注册一个NameNode的状态类registerNNSMXBean(); 对NameNode角色进行判断,不是NameNode
// 角色非`NamenodeRole.NAMENODE`的在此处启动HttpServer if (NamenodeRole.NAMENODE != role) { startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); }然后启动rpc服务,对外进行调用 加载插件,进行迭代插件进行启动
//启动服务 rpcServer.start();// plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY, ServicePlugin.class); for (ServicePlugin p: plugins) { try { p.start(this); } catch (Throwable t) { LOG.warn("ServicePlugin " + p + " could not be started", t); } }操作下来,重要的还是namesystem下的startCommonServices 输入的参数startCommonServices(Configuration conf, HAContext haContext)。 首先上写锁writeLock(); 给HA上下文赋值this.haContext = haContext; 然后创建一个NameNode资源检查对象NameNodeResourceChecker,开始进行资源检查,进行模式判断,制定启动进程,进入模式阶段,汇报阶段,设置数据块,激活一个blockManager。
```java /** * 1、将需要检查的URL添加到volumes中 , 后台有线程会一直执行hasAvailableDiskSpace来检查 * 2、checkAvailableResources(); 进行资源检查 * 3、NameNode启动,进入到safemode阶段,处于一个等待汇报blocks的状态 * 4、汇报所有的block,用于后面判断是否进入安全模式 * 5、激活BlockManager */ void startCommonServices(Configuration conf, HAContext haContext) throws IOException { this.registerMBean(); // register the MBean for the FSNamesystemState writeLock(); this.haContext = haContext; try { //NameNodeResourceChecker负责检查磁盘资源。 // active状态的namenod会启动一个监控线程NameNodeResourceMonitor, // 定期执行NameNodeResourceChecker#hasAvailableDiskSpace()检查可用的磁盘资源。 /**需要检查3个涉及到元数据的目录: * Namenode2个目录:fsimage、editlog(默认情况下这两个是在同一个目录) * 高可用模式下的journalNode里面也有存储袁术的目录 * */ nnResourceChecker = new NameNodeResourceChecker(conf); //检查可用资源是否足够:如果不够,日志打印警告信息,然后进入安全模式 checkAvailableResources(); // 判断是否进入安全模式,并且副本队列是否应该被同步/复制 /** * 磁盘资源不足的情况下,任何对元数据修改所产生的日志都无法确保能够写入到磁盘, * 即新产生的edits log和fsimage都无法确保写入磁盘。所以要进入安全模式, * 来禁止元数据的变动以避免往磁盘写入新的日志数据 * */ assert safeMode != null && !isPopulatingReplQueues(); //获取StartupProgress实例用来获取NameNode各任务的启动信息 StartupProgress prog = NameNode.getStartupProgress(); // 目前NameNode启动,进入到safemode阶段,处于一个等待汇报blocks的状态 prog.beginPhase(Phase.SAFEMODE); //处于一个等待汇报blocks的状态 prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS, getCompleteBlocksTotal()); //设置所有的block,用于后面判断是否进入安全模式 setBlockTotal(); //TODO 启动BlockManager里面关于block副本处理的后台线程 //激活BlockManager blockManager.activate(conf); } finally { writeUnlock(); } registerMXBean(); DefaultMetricsSystem.instance().register(this); if (inodeAttributeProvider != null) { inodeAttributeProvider.start(); dir.setINodeAttributeProvider(inodeAttributeProvider); } snapshotManager.registerMXBean(); }