千家信息网

如何进行NVIDIA及k8s-device-plugin源码分析

发表于:2025-02-01 作者:千家信息网编辑
千家信息网最后更新 2025年02月01日,如何进行NVIDIA及k8s-device-plugin源码分析,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。k8s-device-
千家信息网最后更新 2025年02月01日如何进行NVIDIA及k8s-device-plugin源码分析

如何进行NVIDIA及k8s-device-plugin源码分析,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

k8s-device-plugin内部实现原理图

在Kubernetes如何通过Device Plugins来使用NVIDIA GPU中,对NVIDIA/k8s-device-plugin的工作原理进行了深入分析,为了方便我们在这再次贴出其内部实现原理图:

PreStartContainer和GetDevicePluginOptions两个接口,在NVIDIA/k8s-device-plugin中可以忽略,可以认为是空实现。我们主要关注ListAndWatch和Allocate的实现。

启动

一切从main函数开始!核心的代码如下:

func main() {        log.Println("Loading NVML")        if err := nvml.Init(); err != nil {                select {}        }    ...        log.Println("Fetching devices.")        if len(getDevices()) == 0 {                select {}        }        log.Println("Starting FS watcher.")        watcher, err := newFSWatcher(pluginapi.DevicePluginPath)        if err != nil {                os.Exit(1)        }    ...        log.Println("Starting OS watcher.")        sigs := newOSWatcher(syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)        restart := true        var devicePlugin *NvidiaDevicePluginL:        for {                if restart {                        if devicePlugin != nil {                                devicePlugin.Stop()                        }                        devicePlugin = NewNvidiaDevicePlugin()                        if err := devicePlugin.Serve(); err != nil {                                ...                        } else {                                restart = false                        }                }                select {                case event := <-watcher.Events:                        if event.Name == pluginapi.KubeletSocket && event.Op&fsnotify.Create == fsnotify.Create {                                restart = true                        }                case err := <-watcher.Errors:                case s := <-sigs:                        switch s {                        case syscall.SIGHUP:                                restart = true                        default:                                devicePlugin.Stop()                                break L                        }                }        }}

相关说明不需多说,请参考下面的流程逻辑图:

Serve

k8s-device-plugin启动流程中,devicePlugin.Serve负责启动gRPC Server Start对外提供服务,然后把自己注册到kubelet。

// Serve starts the gRPC server and register the device plugin to Kubeletfunc (m *NvidiaDevicePlugin) Serve() error {        err := m.Start()        if err != nil {                log.Printf("Could not start device plugin: %s", err)                return err        }        log.Println("Starting to serve on", m.socket)        err = m.Register(pluginapi.KubeletSocket, resourceName)        if err != nil {                log.Printf("Could not register device plugin: %s", err)                m.Stop()                return err        }        log.Println("Registered device plugin with Kubelet")        return nil}

Start

Start的代码如下:

// Start starts the gRPC server of the device pluginfunc (m *NvidiaDevicePlugin) Start() error {        err := m.cleanup()        if err != nil {                return err        }        sock, err := net.Listen("unix", m.socket)        if err != nil {                return err        }        m.server = grpc.NewServer([]grpc.ServerOption{}...)        pluginapi.RegisterDevicePluginServer(m.server, m)        go m.server.Serve(sock)        // Wait for server to start by launching a blocking connexion        conn, err := dial(m.socket, 5*time.Second)        if err != nil {                return err        }        conn.Close()        go m.healthcheck()        return nil}

更加深入的代码调用关系,这里不多介绍,直接贴出Start的实现逻辑图:

Start流程中负责创建nvidia.sock文件。

需要特别说明healthcheck部分:

  • healthcheck启动协程对管理的devices进行健康状态监控,一旦发现有device unhealthy,则发送到NvidiaDevicePlugin的health channel。device plugin的ListAndWatch会从health channel中获取这些unhealthy devices,并通知到kubelet进行更新。

  • 只监控nvmlEventTypeXidCriticalError事件,一旦监控到某个device的这个Event,就认为该device unhealthy。关于nvmlEventTypeXidCriticalError的说明,请参考NVIDIA的nvml api文档。

  • 可以通过设置NVIDIA device plugin Pod内的环境变量DP_DISABLE_HEALTHCHECKS为"all"来取消healthcheck。不设置或者设置为其他值都会启动healthcheck,默认部署时不设置。

Register

Start之后,接着进入Register流程,其代码如下:

// Register registers the device plugin for the given resourceName with Kubelet.func (m *NvidiaDevicePlugin) Register(kubeletEndpoint, resourceName string) error {        conn, err := dial(kubeletEndpoint, 5*time.Second)        if err != nil {                return err        }        defer conn.Close()        client := pluginapi.NewRegistrationClient(conn)        reqt := &pluginapi.RegisterRequest{                Version:      pluginapi.Version,                Endpoint:     path.Base(m.socket),                ResourceName: resourceName,        }        _, err = client.Register(context.Background(), reqt)        if err != nil {                return err        }        return nil}

Register的实现流程图如下:

  • 注册的Resource Name是nvidia.com/gpu

  • 注册的Version是v1beta1

Stop

Stop的代码如下:

// Stop stops the gRPC serverfunc (m *NvidiaDevicePlugin) Stop() error {        if m.server == nil {                return nil        }        m.server.Stop()        m.server = nil        close(m.stop)        return m.cleanup()}

Stop的实现流程图如下:

  • Stop流程中负责停止gRPC Server,并删除nvidia.sock。

ListAndWatch

ListAndWatch接口主要负责监控health channel,发现有gpu变成unhealthy后,将完成的gpu list信息(ID和health状态)发送给kubelet进行更新。

// ListAndWatch lists devices and update that list according to the health statusfunc (m *NvidiaDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {        s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})        for {                select {                case <-m.stop:                        return nil                case d := <-m.health:                        // FIXME: there is no way to recover from the Unhealthy state.                        d.Health = pluginapi.Unhealthy                        s.Send(&pluginapi.ListAndWatchResponse{Devices: m.devs})                }        }}

ListAndWatch的实现流程图如下:

Allocate

Allocate负责接口kubelet为Container请求分配gpu的请求,请求的结构体如下:

// - Allocate is expected to be called during pod creation since allocation//   failures for any container would result in pod startup failure.// - Allocate allows kubelet to exposes additional artifacts in a pod's//   environment as directed by the plugin.// - Allocate allows Device Plugin to run device specific operations on//   the Devices requestedtype AllocateRequest struct {        ContainerRequests []*ContainerAllocateRequest `protobuf:"bytes,1,rep,name=container_requests,json=containerRequests" json:"container_requests,omitempty"`}type ContainerAllocateRequest struct {        DevicesIDs []string `protobuf:"bytes,1,rep,name=devicesIDs" json:"devicesIDs,omitempty"`}

device plugin Allocate的Response结构体定义如下:

// AllocateResponse includes the artifacts that needs to be injected into// a container for accessing 'deviceIDs' that were mentioned as part of// 'AllocateRequest'.// Failure Handling:// if Kubelet sends an allocation request for dev1 and dev2.// Allocation on dev1 succeeds but allocation on dev2 fails.// The Device plugin should send a ListAndWatch update and fail the// Allocation requesttype AllocateResponse struct {        ContainerResponses []*ContainerAllocateResponse `protobuf:"bytes,1,rep,name=container_responses,json=containerResponses" json:"container_responses,omitempty"`}type ContainerAllocateResponse struct {        // List of environment variable to be set in the container to access one of more devices.        Envs map[string]string `protobuf:"bytes,1,rep,name=envs" json:"envs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`        // Mounts for the container.        Mounts []*Mount `protobuf:"bytes,2,rep,name=mounts" json:"mounts,omitempty"`        // Devices for the container.        Devices []*DeviceSpec `protobuf:"bytes,3,rep,name=devices" json:"devices,omitempty"`        // Container annotations to pass to the container runtime        Annotations map[string]string `protobuf:"bytes,4,rep,name=annotations" json:"annotations,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`}

Allocate的代码实现如下:

// Allocate which return list of devices.func (m *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {        devs := m.devs        responses := pluginapi.AllocateResponse{}        for _, req := range reqs.ContainerRequests {                response := pluginapi.ContainerAllocateResponse{                        Envs: map[string]string{                                "NVIDIA_VISIBLE_DEVICES": strings.Join(req.DevicesIDs, ","),                        },                }                for _, id := range req.DevicesIDs {                        if !deviceExists(devs, id) {                                return nil, fmt.Errorf("invalid allocation request: unknown device: %s", id)                        }                }                responses.ContainerResponses = append(responses.ContainerResponses, &response)        }        return &responses, nil}

下面是其实现逻辑图:

  • Allocate中会遍历ContainerRequests,将DeviceIDs封装到ContainerAllocateResponse的Envs:NVIDIA_VISIBLE_DEVICES中,格式为:"${ID_1},${ID_2},..."

  • 除此之外,并没有封装Mounts, Devices, Annotations。

关于如何进行NVIDIA及k8s-device-plugin源码分析问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。

0