Container启动

    科技2022-07-14  167

    目录

    Container启动源码分析 Yarn资源参数设置

    Container启动

      Container启动命令是由各个ApplicationMaster通过RPC函数ContainerManager.startContainer()向NodeManager发起的,Container启动过程主要经历三个阶段:资源本地化、启动并运行container、资源回收。其中,资源本地化指创建container工作目录,从HDFS下载运行container所需的各种资源(jar包、可执行文件等)等,而资源回收则是资源本地化的逆过程,它负责清理各种资源,它们均由ResourceLocalizationService服务完成的。启动container是由ContainersLauncher服务完成的,而运行container是由插拔式组件ContainerExecutor完成的,YARN提供了两种ContainerExecutor实现,一种是DefaultContainerExecutor,另一种是LinuxContainerExecutor。

    资源本地化   资源本地化是指准备container运行所需的环境,包括创建container工作目录,从HDFS下载运行container所需的各种资源(jar包、可执行文件等)等。   YARN将资源分为两类,一类是public级别的资源,这类资源被放到一个公共目录下,由所有用户共享,另一类是private级别的资源,这类资源是用户私有的,只能在所属用户的各个作业间共享。资源本地化过程实际上就是准备public和private资源的过程,它由ResourceLocalizationService服务完成,其中,所有application的public资源由专门的线程PublicLocalizer下载完成,该线程内部维护了一个线程池以加快资源下载速度,每个application的private资源由一个专门的线程LocalizerRunner下载完成。

    启动并运行Container   启动Container是由ContainersLauncher完成的,该过程主要工作是将运行container对应的完整shell命令写到私有目录下的launch_container.sh中,并将token文件写到container_tokens中。之所以要将container运行命令写到launch_container.sh中,然后通过运行shell脚本的形式运行container,主要是因为直接执行命令可能会有些特殊符号不识别。   而运行container是由插拔式组件ContainerExecutor完成的,YARN提供了两种ContainerExecutor实现,一种是DefaultContainerExecutor,另一种是LinuxContainerExecutor。DefaultContainerExecutor只是简单的以管理员身份运行launch_container.sh脚本,而LinuxContainerExecutor则是以container所属用户身份运行该脚本,它是Hadoop引入安全机制后加入的。

    资源回收   资源回收由ResourceLocalizationService服务完成的,该过程与资源本地化正好相反,它负责撤销container运行过程中使用的各种资源。

    资源隔离方案   YARN对内存资源和CPU资源采用了不同的资源隔离方案。对于内存资源,为了能够更灵活的控制内存使用量,YARN采用了进程监控的方案控制内存使用,即每个NodeManager会启动一个额外监控线程监控每个container内存资源使用量,一旦发现它超过约定的资源量,则会将其杀死。采用这种机制的另一个原因是java中创建子进程采用了fork()+exec()的方案,子进程启动瞬间,它使用的内存量与父进程一致,从外面看来,一个进程使用内存量可能瞬间翻倍,然后又降下来,采用线程监控的方法可防止这种情况下导致swap操作。对于CPU资源,则采用了Cgroups进行资源隔离。

    源码分析

    ApplicationMaster的主要逻辑: ApplicationMaster与 NodeManager通信   ApplicationMaster与 NodeManager通过 NMClientAsync 通信,后者需要调用方提供一个回调类,NodeManager会在合适的时机调用回调类中的方法来通知 ApplicationMaster。回调类被ApplicationMaster实现为 NMCallbackHandler ,其中最重要的两个函数是:

    onContainerStarted() ,当 NodeManager新启动了 Containers 时,会调用该方法,把 Container 列表传给它onContainerStopped() ,当 NodeManager停止了一些 Containers 时,会调用该方法,把 Container 列表传给它

    ApplicationMaster与ResourceManager 通信   ApplicationMaster与ResourceManager通过 AMRMClientAsync 通信。首先通过 AMRMClientAsync.registerApplicationMaster() 向 ResourceManager注册自己。然后 ApplicationMaster开始提交对 Container 的需求,在申请到需要数量的Container之前,先调用 setupContainerAskForRM() 设置对 Container 的具体需求(优先级、资源等),然后调用 AMRMClientAsync.addContainerRequest() 把需求提交给 ResourceManager ,最终该方法会把需求存到一个集合(AMRMClient.ask)里面。   AMRMClientAsync 同样需要调用方提供一个回调类,ApplicationMaster实现为 RMCallbackHandler 。这个回调类主要实现了两个方法:

    onContainersAllocated(),获得新申请的 Container ,创建一个新线程,设置 ContainerLaunchContext , 最终调用 NMClientAsync.startContainerAsync() 来启动 ContaineronContainersCompleted(),检查已完成的 Container 的数量是否达到了需求,没有的话,继续添加需求

      ApplicationMaster需要与 ResourceManager进行心跳,对于使用了 AMRMClientAsync 的 ApplicationMaster(如 DistributedShell ),心跳是通过 AMRMClientAsync 的一个线程实现。最终调用了 AMRMClientImpl.allocate() ,其主要动作就是从 ask 集合中拿到 Container 需求,组装成一个 AllocateRequest ,通过ApplicationMasterProtocol.allocate() 进行 RPC 调用。

    ApplicationMaster的三个主流程   总结上面说的,ApplicationMaster有三个主要流程与 Container 的创建密切相关,这两个流程并行:

    提交需求,通过心跳,把需求发送给 ResourceManager获取Container,通过心跳,拿到申请好的 Container每申请到一个 Container ,与 NodeManager通信,启动这个Container

    Application与ResourceManager 的心跳   在 ApplicationMaster向 ResourceManager 注册时,ResourceManager 最终会生成一个代表这个 APP 的实例,最终是生成了一个 FicaSchedulerApp 。Application与ResourceManager进行心跳,发送的信息中含有:

    ApplicationMaster告诉 ResourceManager 两个信息: a) 自己对Container的要求,b) 已经用完的待回收的Container列表ResourceManager 给 ApplicationMaster的回应:a) 新申请的 Container,b) 已经完成的 Container 的状态

      ApplicationMasterService 是 ResourceManager的一个组成部分。ResourceManager启动时,会初始化这个服务,并根据配置,把相应的调度器 YarnScheduler 传进来。它实现了 ApplicationMasterProtocol 接口,负责对来自 ApplicationMaster的 RPC 请求进行回应。ApplicationMasterService.allocate() 方法会被调用,核心逻辑是:

    触发 RMappAttemptStatusupdateEvent 事件调用 YarnScheduler.allocate() 方法,把执行的结果封装起来返回。YarnScheduler 是与调度器通信的接口。所以,最后调用的是具体调度器的 allocate() 方法

    这里使用的是 FIFO 调度器,FifoScheduler.allocate() 方法的主要做两件事情:

    调用 FicaSchedulerApp.updateResourceRequests() 更新 APP (指从调度器角度看的 APP) 的资源需求通过 FicaSchedulerApp.pullNewlyAllocatedContainersAndNMTokens() 把 FicaSchedulerApp.newlyAllocatedContainers 这个 List 中的Container取出来,封装后返回

      FicaSchedulerApp.newlyAllocatedContainers 这个数据结构中存放的,正是最近申请到的 Container 。那么,List 中的元素是怎么来的呢,这与 NodeManager 的心跳有关。

    NodeManager 与 ResourceManager 的心跳   NodeManager 需要和 ResourceManager 进行心跳,让 ResourceManager 更新自己的信息。心跳的信息包含:

    Request(NodeManager ->ResourceManager ) : NodeManager 上所有 Container 的状态Response(ResourceManager ->NodeManager ) : 已待删除和待清理的 Container 列表

      NodeManager 启动时会向 ResourceManager 注册自己,ResourceManager 生成对应的 RMNode 结构,代表这个 NodeManager ,存放了这个 NodeManager 的资源信息以及其他一些统计信息。负责具体心跳的,在 NodeManager这边是 NodeStatusUpdater 服务,在 ResourceManager 那边则是 ResourceTrackerService 服务。心跳的信息包括这个 NodeManager的状态,其中所有 Container 的状态等。   心跳最终通过 RPC 调用到了 ResourceTrackerService.nodeHeartbeat() 。其核心逻辑就是触发一个RMNodeStatusEvent(RMNodeEventType.STATUS_UPDATE)事件,这个事件由 NodeManager注册时生成的 RMNode 处理。   RMNode 接收 RMNodeStatusEvent(RMNodeEventType.STATUS_UPDATE)消息,更新自己的状态机,然后调用 StatusUpdateWhenHealthyTransition.transition ,该方法从参数中获得这个 NM 所有的 Container 的信息,根据其状态分成两组:a) 刚申请到还未使用的,b) 运行完毕需要回收的,这两组 Container 的信息存放在 RMNode 的一个队列中。接着,发出一个消息: NodeUpdateSchedulerEvent(SchedulerEventType.NODE_UPDATE) 。这个消息,由调度器处理。

    ResourceManager 处理NODE_UPDATE消息   ResourceManager 接收到 NodeManager的心跳后,会发出一个 SchedulerEventType.NODE_UPDATE 的消息,改消息由调度器处理。FifoScheduler 接收到这个消息后,调用了 FifoScheduler.nodeUpdate() 方法。与 Container 申请相关的主要逻辑如下: 【获取已申请到的】   从 RMNode 中获取出那些刚申请还未使用的 Container (NodeManager与 ResourceManager 心跳是获得),发出消息:RMContainerEventType.LAUNCHED,该消息由 RMContainer 处理 【回收已完成的】 从 RMNode 中获取出那些「已经使用完待回收」的 Container,进行回收 【申请新的】 在这个 NodeManager 上申请新的 Container:

    通过 FicaSchedulerApp.getResourceRequest() 拿到资源请求(ResourceRequest)计算可申请的资源,调用 FicaSchedulerApp.allocate(),根据传进来的参数,封装出一个 RMContainer 添加到 newlyAllocatedContainers 中。然后触发事件RMContainerEventType.START。该事件之后会由 RMContainer 处理调用 FicaSchedulerNode.allocateContainer()

    RMContainer 对 RMContainerEventType 事件进行处理处理:

    RMContainerEventType.START : 状态从 NEW 变为 ALLOCATED,最终触发事件 RMAppAttemptEvent(type=CONTAINER_ALLOCATED), 该事件由 RMAppAttemptImpl 处理RMContainerEventType.LAUNCHED : 状态从 ACQUIED 变为 RUNNING

      RMAppAttemptImpl 对 RMAppAttemptEvent 事件进行处理,该事件告诉就是告诉 AppAttempt ,你这个APP有 Container 申请好了,AppAttempt 检查自己的状态,如果当前还没有运行 AM ,就把这个 Container 拿来运行 AM。到此,已经理清楚了 FicaSchedulerApp.newlyAllocatedContainers 中元素的来源,也就理清楚了,AM 与 RM 心跳中获得的那些「新申请」的 Container 的来源。

    ApplicationMaster 与 NodeManager 通信启动 Container

    ApplicationMaster的三个主流程与Container 的创建密切相关,这两个流程并行: 1、提交需求,通过心跳,把需求发送给 ResourceManager 2、获取Container,通过心跳,拿到申请好的 Container 3、每申请到一个 Container ,与 NodeManager 通信,启动这个Container

      下面来具体看看 NodeManager 具体是怎么启动一个 Container的。ApplicationMaster 设置好 ContainerLaunchContext , 调用 NMClientAsync.startContainerAsync() 启动Container。   NMClientAsync 中有一个名叫 events 的事件队列,同时,NMClientAsync 还启动这一个线程,不断地从 events 中取出事件进行处理。startContainerAsync() 方法被调用时,会生成一个ContainerEvent(type=START_CONTAINER)事件放入 events 队列。对于这个事件,处理逻辑是调用 NMClient.startContainer() 同步地启动 Container ,然后调用回调类中的 onContainerStarted() 方法。   NMClient 最终会调用 ContainerManagementProtocol.startContainers() ,以 Google Protocol Buffer 格式,通过 RPC 调用 NM 的对应方法。NM 处理后会返回成功启动的 Container 列表。

    NodeManager 中启动 Container   NodeManager 中负责响应来自 AM 的 RPC 请求的是 ContainerManagerImpl ,它是 NodeManager 的一部分,负责 Container 的管理,在 Nodemanager 启动时,该服务被初始化。该类实现了接口 ContainerManagementProtocol ,接到 RPC 请求后,会调用 ContainerManagerImpl.startContainers() 。该函数的基本逻辑是:

    首先进行 APP 的初始化(如果还没有的话),生成一个 ApplicationImpl 实例,然后根据请求,生成一堆 ContainerImpl 实例触发一个新事件:ApplicationContainerInitEvent ,之前生成的 ApplicationImpl 收到改事件,又出发一个 ContainerEvent(type=INIT_CONTAINER) 事件,这个事件由 ContainerImpl 处理ContainerImpl 收到事件, 更新状态机,启动辅助服务,然后触发一个新事件 ContainersLaucherEvent(type=LAUNCH_CONTAINER) ,处理这个事件的是 ContainersLauncher

    ContainerLauncher 是 ContainerManager 的一个子服务,收到 ContainersLaucherEvent(type=LAUNCH_CONTAINER)事件后,组装出一个 ContainerLaunch 类并使用 ExecutorService 执行。ContainerLaunch 类负责一个 Container 具体的 Lanuch 。基本逻辑如下:

    设置运行环境,包括生成运行脚本,Local Resource ,环境变量,工作目录,输出目录等触发新事件ContainerEvent(type=CONTAINER_LAUNCHED),该事件由 ContainerImpl 处理调用 ContainerExecutor.launchContainer() 执行 Container 的工作,这是一个阻塞方法执行结束后,根据执行的结果设置 Container 的状态。

    ContainerExecutor   ContainerExecutor 是 NodeManager 的一部分,负责 Container 中具体工作的执行。该类是抽象类,可以有不同的实现,如 DefaultContainerExecutor ,DockerContainerExecutor ,LinuxContainerExecutor 等。根据 YARN 的配置,NodeManager 启动时,会初始化具体的 ContainerExecutor 。ContainerExecutor 最主要的方法是 launchContainer() ,该方法阻塞,直到执行的命令结束。DefaultContainerExecutor 是默认的 ContainerExecutor ,支持 Windows 和 Linux 。它的 launchContainer() 的逻辑是:创建 Container 需要的目录;拷贝 Token、运行脚本到工作目录;做一些脚本的封装,然后执行脚本,返回状态码。

    至此,Container 在 NodeManager 中已经启动,ApplicationMaster 中 NMCallback 回调类中的 onContainerStarted() 方法被调用。

    Yarn资源参数设置

    配置文件yarn-site.xml

    yarn.scheduler.minimum-allocation-mb 默认值:1024 yarn.scheduler.maximum-allocation-mb 默认值:8192

    说明:单个容器可申请的最小与最大内存,应用在运行申请内存时不能超过最大值,小于最小值则分配最小值。最小值还有另外一种用途,计算一个节点的最大container数目注:这两个值一经设定不能动态改变。

    yarn.scheduler.minimum-allocation-vcores 默认值:1 yarn.scheduler.maximum-allocation-vcores 默认值:32

    参数解释:单个可申请的最小/最大虚拟CPU个数。比如设置为1和4,则运行MapRedce作业时,每个Task最少可申请1个虚拟CPU,最多可申请4个虚拟CPU。

    yarn.nodemanager.resource.memory-mb 默认值:8G yarn.nodemanager.vmem-pmem-ratio 默认值:2.1

    说明:每个节点可用的最大内存,RM中的两个值不应该超过此值。此数值可以用于计算container最大数目,即:用此值除以RM中的最小容器内存。虚拟内存率,是占task所用内存的百分比,默认值为2.1倍;注意:第一个参数是不可修改的,一旦设置,整个运行过程中不可动态修改,且该值的默认大小是8G,即使计算机内存不足8G也会按着8G内存来使用。

    yarn.nodemanager.resource.cpu-vcores 默认值:8

    参数解释:NodeManager总的可用虚拟CPU个数。

    Processed: 0.014, SQL: 8