千家信息网

Device Manager在什么时候创建

发表于:2024-12-04 作者:千家信息网编辑
千家信息网最后更新 2024年12月04日,这篇文章主要介绍"Device Manager在什么时候创建",在日常操作中,相信很多人在Device Manager在什么时候创建问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对
千家信息网最后更新 2024年12月04日Device Manager在什么时候创建

这篇文章主要介绍"Device Manager在什么时候创建",在日常操作中,相信很多人在Device Manager在什么时候创建问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Device Manager在什么时候创建"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

Create Device Manager Instance

Device Manager在何时创建

Device Manager和Volume Manager、QoS Container Manager等一样,都属于kubelet管理的众多Manager之一。Device Manager在kubelet启动时的NewContainerManager中创建。

pkg/kubelet/cm/container_manager_linux.go:197func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {                ...        glog.Infof("Creating device plugin manager: %t", devicePluginEnabled)        if devicePluginEnabled {                cm.deviceManager, err = devicemanager.NewManagerImpl()        } else {                cm.deviceManager, err = devicemanager.NewManagerStub()        }        if err != nil {                return nil, err        }        ...}       

ManagerImpl结构体

我们有必要先了解Device Manager的结构体:

// ManagerImpl is the structure in charge of managing Device Plugins.type ManagerImpl struct {        socketname string        socketdir  string        endpoints map[string]endpoint // Key is ResourceName        mutex     sync.Mutex        server *grpc.Server        // activePods is a method for listing active pods on the node        // so the amount of pluginResources requested by existing pods        // could be counted when updating allocated devices        activePods ActivePodsFunc        // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.        // We use it to determine when we can purge inactive pods from checkpointed state.        sourcesReady config.SourcesReady        // callback is used for updating devices' states in one time call.        // e.g. a new device is advertised, two old devices are deleted and a running device fails.        callback monitorCallback        // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.        healthyDevices map[string]sets.String        // unhealthyDevices contains all of the unhealthy devices and their exported device IDs.        unhealthyDevices map[string]sets.String        // allocatedDevices contains allocated deviceIds, keyed by resourceName.        allocatedDevices map[string]sets.String        // podDevices contains pod to allocated device mapping.        podDevices podDevices        store      utilstore.Store        pluginOpts map[string]*pluginapi.DevicePluginOptions}

下面是核心field的说明:

  • socketname: 就是kubelet对外暴露的socket名,即 kubelet.sock

  • socketdir: device plugins' socket的存放的目录,/var/lib/kubelet/device-plugins/

  • endpoints: map对象,key为Resource Name,value为endpoint接口(包括run,stop,allocate,preStartContainer,getDevices,callback,isStoped,StopGracePeriodExpired),每个endpoint接口对应一个已注册的device plugin,负责与device plugin的gRPC通信及缓存device plugin反馈的device states。

  • server: Register服务暴露的gRPC Server。

  • activePods: 用来获取该节点上所有active pods,即non-Terminated状态的Pods。在kubelet的initializeRuntimeDependentModules时会注册activePods Func为如下函数:

          // GetActivePods returns non-terminal pods        func (kl *Kubelet) GetActivePods() []*v1.Pod {                allPods := kl.podManager.GetPods()                activePods := kl.filterOutTerminatedPods(allPods)                return activePods        }


  • callback: 是kubelet收到device plugin的ListAndWatch gRCP stream中有devices state变更时的回调函数,包括有新设备增加、旧设备删除、设备状态变化,所以通过ListAndWatch接口的回调方式,可以实现设备的自动发现和热插拔。

            type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)


  • healthyDevices: map对象,key为Resource Name,value为对应的健康的device IDs。

  • unhealthyDevices: map对象,key为Resource Name,value为对应的不健康的device IDs。

  • allocatedDevices: map对象,key为Resource Name,value为已经分配出去的device IDs。

  • podDevices: 记录每个pod中每个容器的device分配情况。

         // ContainerAllocateResponse为容器内某个device对应的分配信息,包括注入的环境变量、挂载信息、Annotations。        type ContainerAllocateResponse struct {                Envs map[string]string                 Mounts []*Mount                 Devices []*DeviceSpec                 Annotations map[string]string         }        // deviceAllocateInfo        type deviceAllocateInfo struct {                deviceIds sets.String                allocResp *pluginapi.ContainerAllocateResponse        }        type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName.        type containerDevices map[string]resourceAllocateInfo   // Keyed by containerName.        type podDevices map[string]containerDevices             // Keyed by podUID.


  • store: 是对checkpointData的文件存储(/var/lib/kubelet/device-plugins/kubelet_internal_checkpoint),具体存储了每个Pod分配的Devices信息PodDeviceEntries, 以及已经注册的Resource Name及对应的Devices IDs。

     type checkpointData struct {                PodDeviceEntries  []podDevicesCheckpointEntry                RegisteredDevices map[string][]string // key为Resource Name,value为DeviceIDs        }        type podDevicesCheckpointEntry struct {                PodUID        string                ContainerName string                ResourceName  string                DeviceIDs     []string                AllocResp     []byte        }


  • pluginOpts: map对象,key为Resource Name,value为DevicePluginOptions,目前只有一项内容,就是PreStartRequired bool,表示是否在容器启动前要调用device plugin的PreStartContiner接口。在nvidia-k8s-plugin中,PreStartContainer为空实现。

NewManagerImpl

我们再来看看Device Manager的具体创建实现NewManagerImpl

pkg/kubelet/cm/devicemanager/manager.go:97// NewManagerImpl creates a new manager.func NewManagerImpl() (*ManagerImpl, error) {        // 通过/var/lib/kubelet/device-plugins/kubelet.sock与device plugin交互        return newManagerImpl(pluginapi.KubeletSocket)}func newManagerImpl(socketPath string) (*ManagerImpl, error) {        glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)        if socketPath == "" || !filepath.IsAbs(socketPath) {                return nil, fmt.Errorf(errBadSocket+" %v", socketPath)        }        dir, file := filepath.Split(socketPath)        manager := &ManagerImpl{                endpoints:        make(map[string]endpoint),                socketname:       file,                socketdir:        dir,                healthyDevices:   make(map[string]sets.String),                unhealthyDevices: make(map[string]sets.String),                allocatedDevices: make(map[string]sets.String),                pluginOpts:       make(map[string]*pluginapi.DevicePluginOptions),                podDevices:       make(podDevices),        }        manager.callback = manager.genericDeviceUpdateCallback        // The following structs are populated with real implementations in manager.Start()        // Before that, initializes them to perform no-op operations.        manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }        manager.sourcesReady = &sourcesReadyStub{}        var err error                // 在/var/lib/kubelet/device-plugins/目录下创建file store类型的key-value存储文件kubelet_internal_checkpoint,用来作为kubelet的device plugin的checkpoint。        manager.store, err = utilstore.NewFileStore(dir, utilfs.DefaultFs{})        if err != nil {                return nil, fmt.Errorf("failed to initialize device plugin checkpointing store: %+v", err)        }        return manager, nil}
  • kubelet Device Manager通过/var/lib/kubelet/device-plugins/kubelet.sock与device plugin交互。

  • 注册callback为genericDeviceUpdateCallback,用来处理对应devices的add,delete,update事件。

  • /var/lib/kubelet/device-plugins/目录下创建file store类型的key-value存储文件kubelet_internal_checkpoint,用来作为kubelet的device plugin的checkpoint。

    • 当监听到devices add/delete/update事件发生时,会更新到kubelet_internal_checkpoint文件中。

    • 当device plugin的stop time超过grace period time(代码写死为5min,不可配置),会从checkpoint中删除对应的devices。在这个时间范围内,Device Manager会继续缓存该endpoint及对应的devices。

    • 为Container Allocate Devices后,也会将PodDevices更新到checkpoint中。

我们来看看callback的实现genericDeviceUpdateCallback的实现,了解Device Manager是如何处理devices的add/delete/update消息的。

pkg/kubelet/cm/devicemanager/manager.go:134func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, added, updated, deleted []pluginapi.Device) {        kept := append(updated, added...)        m.mutex.Lock()        if _, ok := m.healthyDevices[resourceName]; !ok {                m.healthyDevices[resourceName] = sets.NewString()        }        if _, ok := m.unhealthyDevices[resourceName]; !ok {                m.unhealthyDevices[resourceName] = sets.NewString()        }        for _, dev := range kept {                if dev.Health == pluginapi.Healthy {                        m.healthyDevices[resourceName].Insert(dev.ID)                        m.unhealthyDevices[resourceName].Delete(dev.ID)                } else {                        m.unhealthyDevices[resourceName].Insert(dev.ID)                        m.healthyDevices[resourceName].Delete(dev.ID)                }        }        for _, dev := range deleted {                m.healthyDevices[resourceName].Delete(dev.ID)                m.unhealthyDevices[resourceName].Delete(dev.ID)        }        m.mutex.Unlock()        m.writeCheckpoint()}
  • 将callback中收到的devices状态是Healthy,那么将device ID插入到ManagerImpl中healthDevices中,并从unhealthyDevices中删除。

  • 将callback中收到的devices状态是Unhealthy,那么将device ID插入到ManagerImpl中unhealthDevices中,并从healthyDevices中删除。

  • 将device plugin反馈的需要delete的devices从healthDevices和unhealthDevices中一并删除。

  • 将ManagerImpl中的数据更新到checkpoint文件中。

Device Manager的启动

前面把Device Manager的创建流程分析了一下,还涉及到checkpoint和callback的分析。接下来,我们继续对Device Manager的Start流程进行分析。

Start Device Manager

Device Manager是在containerManagerImpl的Start时启动的。

pkg/kubelet/cm/container_manager_linux.go:527func (cm *containerManagerImpl) Start(node *v1.Node,        activePods ActivePodsFunc,        sourcesReady config.SourcesReady,        podStatusProvider status.PodStatusProvider,        runtimeService internalapi.RuntimeService) error {        ...                // Starts device manager.        if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {                return err        }        return nil}
  • deviceManager.Start的第一个参数是获取该节点的active(non-terminated)Pods的函数。

  • SourcesReady是用来跟踪kubelet配置的Pod Sources,这些Sources包括:

    • file: 通过static file创建静态Pods。

    • http: 通过http接口来获取Pods信息。

    • api: 从Kubernetes API Server获取Pods信息,是Kubernetes默认的内部机制。

    • *: 表示包含以上全部的Sources类型。

ManagerIml Start

ManagerIml.Start负责启动Device Manager,对外提供gRPC服务。

pkg/kubelet/cm/devicemanager/manager.go:204// Start starts the Device Plugin Manager amd start initialization of// podDevices and allocatedDevices information from checkpoint-ed state and// starts device plugin registration service.func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {        m.activePods = activePods        m.sourcesReady = sourcesReady        // Loads in allocatedDevices information from disk.        err := m.readCheckpoint()        ...        socketPath := filepath.Join(m.socketdir, m.socketname)        os.MkdirAll(m.socketdir, 0755)        // Removes all stale sockets in m.socketdir. Device plugins can monitor        // this and use it as a signal to re-register with the new Kubelet.        if err := m.removeContents(m.socketdir); err != nil {                glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)        }        s, err := net.Listen("unix", socketPath)        if err != nil {                glog.Errorf(errListenSocket+" %+v", err)                return err        }        m.server = grpc.NewServer([]grpc.ServerOption{}...)        pluginapi.RegisterRegistrationServer(m.server, m)        go m.server.Serve(s)        glog.V(2).Infof("Serving device plugin registration server on %q", socketPath)        return nil}
  • 首先读取checkpoint file中数据,恢复ManagerImpl的相关数据,包括:

    • podDevices;

    • allocatedDevices;

    • healthyDevices;

    • unhealthyDevices;

    • endpoints,注意这里会将endpoint的stop time设置为当前时间,意味着kubelet restart后,需要等待device plugin进行re-register后,才认为这些resource是可用的。

  • 然后将/var/lib/kubelet/device-plugins/下面的所有文件清空,当然checkpiont文件除外,也就是清空所有的socket文件,包括自己的kubelet.sock,以及其他所有之前的device plugin的socket文件。device plugin会监控kubelet.sock文件是否被删除,如果删除,则会触发自己的向kubelet重新注册自己。

  • 创建kubelet.sock并启动gRPC Server对外提供gRPC服务,目前只注册了Register服务,用于Device plugin调用进行插件注册。

Register服务

我们就来看看kubelet Device Manager对外提供的唯一gRPC接口Register。

Register

pkg/kubelet/cm/devicemanager/manager.go:289// Register registers a device plugin.func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {        glog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)        metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()        var versionCompatible bool        for _, v := range pluginapi.SupportedVersions {                if r.Version == v {                        versionCompatible = true                        break                }        }        if !versionCompatible {                errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)                glog.Infof("Bad registration request from device plugin with resource name %q: %v", r.ResourceName, errorString)                return &pluginapi.Empty{}, fmt.Errorf(errorString)        }        if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {                errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)                glog.Infof("Bad registration request from device plugin: %v", errorString)                return &pluginapi.Empty{}, fmt.Errorf(errorString)        }        // TODO: for now, always accepts newest device plugin. Later may consider to        // add some policies here, e.g., verify whether an old device plugin with the        // same resource name is still alive to determine whether we want to accept        // the new registration.        go m.addEndpoint(r)        return &pluginapi.Empty{}, nil}
  • 注册请求是device plugin向kubelet发送的,注册请求RegisterRequest为:

          type RegisterRequest struct {                Version string  // Kubernetes 1.10对应的device plugin api version为v1beta1                Endpoint string // device plugin对应的socket name                ResourceName string                 Options *DevicePluginOptions         }


  • 这里会检查注册的Resource Name是否符合Extended Resource的规则:

    • Resource Name不能属于kubernetes.io,得有自己的domain,比如nvidia.com。

    • Resource Name中不能包含requests.前缀。

    • 对应的Resource value只能是整数值。

  • 调用addEndpoint进行插件注册。

addEndpoint进行device plugin注册

从上面Register的方法中可见,真正插件注册的逻辑是在addEndpoint中实现的。

pkg/kubelet/cm/devicemanager/manager.go:332func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {        existingDevs := make(map[string]pluginapi.Device)        m.mutex.Lock()        old, ok := m.endpoints[r.ResourceName]        if ok && old != nil {                // Pass devices of previous endpoint into re-registered one,                // to avoid potential orphaned devices upon re-registration                devices := make(map[string]pluginapi.Device)                for _, device := range old.getDevices() {                        devices[device.ID] = device                }                existingDevs = devices        }        m.mutex.Unlock()        socketPath := filepath.Join(m.socketdir, r.Endpoint)        e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback)        if err != nil {                glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)                return        }        m.mutex.Lock()        if r.Options != nil {                m.pluginOpts[r.ResourceName] = r.Options        }        // Check for potential re-registration during the initialization of new endpoint,        // and skip updating if re-registration happens.        // TODO: simplify the part once we have a better way to handle registered devices        ext := m.endpoints[r.ResourceName]        if ext != old {                glog.Warningf("Some other endpoint %v is added while endpoint %v is initialized", ext, e)                m.mutex.Unlock()                e.stop()                return        }        // Associates the newly created endpoint with the corresponding resource name.        // Stops existing endpoint if there is any.        m.endpoints[r.ResourceName] = e        glog.V(2).Infof("Registered endpoint %v", e)        m.mutex.Unlock()        if old != nil {                old.stop()        }        go func() {                e.run()                e.stop()                m.mutex.Lock()                if old, ok := m.endpoints[r.ResourceName]; ok && old == e {                        m.markResourceUnhealthy(r.ResourceName)                }                glog.V(2).Infof("Unregistered endpoint %v", e)                m.mutex.Unlock()        }()}
  • 首先检查注册的这个device plugin是否已经注册过,如果注册过,则获取已经缓存的devices。

  • 再检查device plugin的socket是否能dial成功,如果dial失败,则说明device plugin没正常启动。如果dial成功,就根据已经缓存的devices重新初始化Endpoint,EndpointImpl的定义如下:

         type endpointImpl struct {                client     pluginapi.DevicePluginClient                clientConn *grpc.ClientConn                socketPath   string                resourceName string                stopTime     time.Time                devices map[string]pluginapi.Device                mutex   sync.Mutex                cb monitorCallback        }


  • 为了防止在EndpointImpl重新初始化的过程中device plugin进行re-register,初始化完成后再次获取缓存中该device plugin的Endpoint,并与初始化之前的Endpoint对象进行比对:

    • 如果不是同一个对象,则说明在初始化过程中发生了re-register,那么就invoke Endpoint的stop接口,关闭gRPC连接,并设置Endpoint的stopTime为当前时间,Register流程以失败结束。

    • 否则继续后面流程。

  • 如果该device plugin之前注册过,那么再重新调用Endpoint的run()启动之前,先调用Endpoint的stop关闭gRPC连接,并设置Endpoint的stopTime为当前时间。

  • 然后启动golang协程执行Endpoint的run(),在run方法中:

    • 调用device plugin的ListAndWatch gRPC接口,通过长连接持续获取ListAndWatch gRPC stream,

    • 从stream流中获取的devices与Endpoint中缓存的devices进行比对,得到需要add/delete/update的devices,

    • 然后调用Endpoint的callback(也就是ManagerImpl注册的callback方法genericDeviceUpdateCallback)进行Device Manager的缓存更新并写到checkpoint文件中。

  • 直到与device plugin的gRPC连接发生errListAndWatch错误,跳出持续获取stream的死循环,然后调用Endpoint的stop关闭gRPC连接,并设置Endpoint的stopTime为当前时间。

  • invoke stop后,再标记该device plugin对应的所有devices为unhealthy,即设置healthyDevices为空, 所有原来healthy的devices都加到unhealthyDevices中,此时表示注册失败。

调用Device Plugin的Allocate接口

注册UpdatePluginResources为Pod Admit Handler

kubelet在NewMainKubelet中会注册一系列的Pod Admit Handler,当有Pod需要创建的时,都会先调用这些Pod Admit Handler进行处理,其中klet.containerManager.UpdatePluginResources就是kubelet Device Manager为Pod分配devices的。

pkg/kubelet/kubelet.go:893func NewMainKubelet( ... ) (*Kubelet, error) {        ...                klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))                ...}        pkg/kubelet/cm/container_manager_linux.go:618func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {        return cm.deviceManager.Allocate(node, attrs)}

Allocate

kubelet在创建Pod前,会invoke Device Manager的Allocate方法,为Pod中的每个Container请求分配对应的devices,kubelet会将请求转发到对应的Endpoint的Allocate方法, 然后请求会到对应的device plugin进行处理。

pkg/kubelet/cm/devicemanager/manager.go:259func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {        pod := attrs.Pod        devicesToReuse := make(map[string]sets.String)        // TODO: Reuse devices between init containers and regular containers.        for _, container := range pod.Spec.InitContainers {                if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {                        return err                }                m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)        }        for _, container := range pod.Spec.Containers {                if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {                        return err                }                m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)        }        m.mutex.Lock()        defer m.mutex.Unlock()        // quick return if no pluginResources requested        if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {                return nil        }        m.sanitizeNodeAllocatable(node)        return nil}
  • 调用allocateContainerResources为Pod中的init container分配devices,并更新ManagerImpl中PodDevices缓存;

  • 调用allocateContainerResources为Pod中的regular container分配devices,并更新ManagerImpl中PodDevices缓存;

  • 调用sanitizeNodeAllocatable更新scheduler cache中Node对应Resource Name的Allocatable Resource;

allocateContainerResources

pkg/kubelet/cm/devicemanager/manager.go:608func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {        podUID := string(pod.UID)        contName := container.Name        allocatedDevicesUpdated := false        // Extended resources are not allowed to be overcommitted.        // Since device plugin advertises extended resources,        // therefore Requests must be equal to Limits and iterating        // over the Limits should be sufficient.        for k, v := range container.Resources.Limits {                resource := string(k)                needed := int(v.Value())                glog.V(3).Infof("needs %d %s", needed, resource)                if !m.isDevicePluginResource(resource) {                        continue                }                // Updates allocatedDevices to garbage collect any stranded resources                // before doing the device plugin allocation.                if !allocatedDevicesUpdated {                        m.updateAllocatedDevices(m.activePods())                        allocatedDevicesUpdated = true                }                allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])                if err != nil {                        return err                }                if allocDevices == nil || len(allocDevices) <= 0 {                        continue                }                startRPCTime := time.Now()                                m.mutex.Lock()                e, ok := m.endpoints[resource]                m.mutex.Unlock()                if !ok {                        m.mutex.Lock()                        m.allocatedDevices = m.podDevices.devices()                        m.mutex.Unlock()                        return fmt.Errorf("Unknown Device Plugin %s", resource)                }                devs := allocDevices.UnsortedList()                                glog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)                resp, err := e.allocate(devs)                metrics.DevicePluginAllocationLatency.WithLabelValues(resource).Observe(metrics.SinceInMicroseconds(startRPCTime))                if err != nil {                        m.mutex.Lock()                        m.allocatedDevices = m.podDevices.devices()                        m.mutex.Unlock()                        return err                }                // Update internal cached podDevices state.                m.mutex.Lock()                m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])                m.mutex.Unlock()        }        // Checkpoints device to container allocation information.        return m.writeCheckpoint()}
  • device plugin提供的Resource属于Kubernetes Extended Resources,所以其Resource QoS只能是Guaranted。

  • 每次在为Pod分配devices之前,都去检查一下此时的active pods,并与podDevices缓存中的pods进行比对,将已经terminated的Pods的devices从podDevices中删除,即进行了devices的GC操作。

  • 从healthyDevices中随机分配对应数量的devices给该Pod,并注意更新allocatedDevices,否则会导致一个device被分配给多个Pod。

  • 拿到devices后,就调用Endpoint的Allocate方法(进而调用对应device plugin的Allocate gRPC Service),device plugin返回ContainerAllocateResponse(包括注入的环境变量、挂载信息、Annotations)。

  • 更新podDevices缓存信息,并将ManagerImpl中缓存数据更新到checkpoint文件中。

思考:当init container结束后,对应分配的devices会被释放吗? 目前还不会释放devices,在Allocate前只会回收Terminated Pods的devices,并没有回收init container的devices。要优化这个也是比较简单的,只要修改上面代码中updateAllocatedDevices方法内的逻辑就行了,增加init container的devices回收逻辑。
所以当前版本最好不会要在init container中使用devices,虽然这种场景几乎不存在。

维护NodeStatus中Device Plugin管理的Resource Capacity

当kubelet更新node status时会调用GetCapacity更新device plugins对应的Resource信息。

pkg/kubelet/kubelet_node_status.go:599func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {        ...        devicePluginCapacity, devicePluginAllocatable, removedDevicePlugins = kl.containerManager.GetDevicePluginResourceCapacity()        ...}       pkg/kubelet/cm/container_manager_linux.go:881func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {        return cm.deviceManager.GetCapacity()}

下面是GetCapacity的具体代码实现,逻辑很简单:

  • 检测healthyDevices对应的device plugin是否已经从缓存中删除或者已经停止超过5min,如果满足以上条件之一,则从endpoints和healthyDevices缓存中删除这些devices。

  • 检测unhealthyDevices对应的device plugin是否已经从缓存中删除或者已经停止超过5min,如果满足以上条件之一,则从endpoints和unhealthyDevices缓存中删除这些devices。

  • 如果缓存发生变化,则更新到checkpoint文件中。

pkg/kubelet/cm/devicemanager/manager.go:414func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {        needsUpdateCheckpoint := false        var capacity = v1.ResourceList{}        var allocatable = v1.ResourceList{}        deletedResources := sets.NewString()        m.mutex.Lock()        for resourceName, devices := range m.healthyDevices {                e, ok := m.endpoints[resourceName]                if (ok && e.stopGracePeriodExpired()) || !ok {                                        if !ok {                                glog.Errorf("unexpected: healthyDevices and endpoints are out of sync")                        }                        delete(m.endpoints, resourceName)                        delete(m.healthyDevices, resourceName)                        deletedResources.Insert(resourceName)                        needsUpdateCheckpoint = true                } else {                        capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)                        allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)                }        }        for resourceName, devices := range m.unhealthyDevices {                e, ok := m.endpoints[resourceName]                if (ok && e.stopGracePeriodExpired()) || !ok {                        if !ok {                                glog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")                        }                        delete(m.endpoints, resourceName)                        delete(m.unhealthyDevices, resourceName)                        deletedResources.Insert(resourceName)                        needsUpdateCheckpoint = true                } else {                        capacityCount := capacity[v1.ResourceName(resourceName)]                        unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)                        capacityCount.Add(unhealthyCount)                        capacity[v1.ResourceName(resourceName)] = capacityCount                }        }        m.mutex.Unlock()        if needsUpdateCheckpoint {                m.writeCheckpoint()        }        return capacity, allocatable, deletedResources.UnsortedList()}

GetCapacity更新NodeStatus如下数据:

  • registered device plugin resource Capacity

  • registered device plugin resource Allocatable

  • previously registered resources that are no longer active

调用Device Plugin的PreStartContainer接口

在kubelet的GetResource中,会调用DeviceManager的GetDeviceRunContainerOptions,并将这些options添加到kubecontainer.RunContainerOptions中。RunContainerOptions包括Envs、Mounts、Devices、PortMappings、Annotations等信息。

pkg/kubelet/cm/container_manager_linux.go:601// TODO: move the GetResources logic to PodContainerManager.func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {        opts := &kubecontainer.RunContainerOptions{}        // Allocate should already be called during predicateAdmitHandler.Admit(),        // just try to fetch device runtime information from cached state here        devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)        if err != nil {                return nil, err        } else if devOpts == nil {                return opts, nil        }        opts.Devices = append(opts.Devices, devOpts.Devices...)        opts.Mounts = append(opts.Mounts, devOpts.Mounts...)        opts.Envs = append(opts.Envs, devOpts.Envs...)        opts.Annotations = append(opts.Annotations, devOpts.Annotations...)        return opts, nil}
  • Device Manager的GetDeviceRunContainerOptions会根据pluginOpts的PreStartRequired是否为true,决定是否调用device plugin的PreStartContainer gRPC Service。

注意:如果某个device plugin的PreStartRequired为true,那么需要注册kubelet Device Manager调用device plugin的PreStartContainer接口的超时时间是30s,即30s内必须完成PreStartContainer的逻辑并返回。

pkg/kubelet/cm/devicemanager/manager.go:688// GetDeviceRunContainerOptions checks whether we have cached containerDevices// for the passed-in  and returns its DeviceRunContainerOptions// for the found one. An empty struct is returned in case no cached state is found.func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {        podUID := string(pod.UID)        contName := container.Name        for k := range container.Resources.Limits {                resource := string(k)                if !m.isDevicePluginResource(resource) {                        continue                }                err := m.callPreStartContainerIfNeeded(podUID, contName, resource)                if err != nil {                        return nil, err                }        }        m.mutex.Lock()        defer m.mutex.Unlock()        return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil}
  • 然后deviceRunContainerOptions负责封装Container的Envs、Mount points、Device files、Annotations。

到此,关于"Device Manager在什么时候创建"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

0