千家信息网

如何理解kubernetes数据卷管理的源码

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,本篇文章给大家分享的是有关如何理解kubernetes数据卷管理的源码,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。概述volume是k
千家信息网最后更新 2025年02月07日如何理解kubernetes数据卷管理的源码

本篇文章给大家分享的是有关如何理解kubernetes数据卷管理的源码,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

概述

volume是k8s中很重要的一个环节,主要用来存储k8s中pod生产的一些系统或者业务数据。k8s在kubelet中提供了volume管理的逻辑

源码分析

首先是kubelet启动方法

func main() {       s := options.NewKubeletServer()       s.AddFlags(pflag.CommandLine)       flag.InitFlags()       logs.InitLogs()       defer logs.FlushLogs()       verflag.PrintAndExitIfRequested()       if err := app.Run(s, nil); err != nil {              fmt.Fprintf(os.Stderr, "error: %v\n", err)              os.Exit(1)       }}

很容易发现run方法中包含了kubelet所有重要信息

func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {                        //配置验证            ...       if kubeDeps == nil {              ...              kubeDeps, err = UnsecuredKubeletDeps(s)              ...       }       //初始化cAdvisor以及containerManager等管理器       ...       if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {              return err       }       ...}

里面有两个与volume管理相关的重要方法

  • UnsecuredKubeletDeps:会初始化docker client、网络管理插件、数据管理插件等系统核心组件,因为不方便对外部开放,所以命名为unsecure。其中我们需要关注的是它对volume plugin的初始化操作

     func UnsecuredKubeletDeps(s *options.KubeletServer) (*kubelet.KubeletDeps, error) {            ...                return &kubelet.KubeletDeps{                        Auth:               nil,                         CAdvisorInterface:  nil,                         Cloud:              nil,                         ContainerManager:   nil,                        DockerClient:       dockerClient,                        KubeClient:         nil,                        ExternalKubeClient: nil,                        Mounter:            mounter,                        NetworkPlugins:     ProbeNetworkPlugins(s.NetworkPluginDir, s.CNIConfDir, s.CNIBinDir),                        OOMAdjuster:        oom.NewOOMAdjuster(),                        OSInterface:        kubecontainer.RealOS{},                        Writer:             writer,                        VolumePlugins:      ProbeVolumePlugins(s.VolumePluginDir),                        TLSOptions:         tlsOptions,                }, nil        }


    在初始化volume plugin的时候会传递VolumePluginDir作为自定义plugin的路径,默认路径为**/usr/libexec/kubernetes/kubelet-plugins/volume/exec/**

           func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin {                allPlugins := []volume.VolumePlugin{}                allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, empty_dir.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, git_repo.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(volume.VolumeConfig{})...)                allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(volume.VolumeConfig{})...)                allPlugins = append(allPlugins, secret.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, iscsi.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, glusterfs.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, rbd.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, quobyte.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, cephfs.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, downwardapi.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, fc.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, flexvolume.ProbeVolumePlugins(pluginDir)...)                allPlugins = append(allPlugins, azure_file.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, configmap.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, vsphere_volume.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, azure_dd.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, photon_pd.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...)                allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)                return allPlugins        }


    可以观察到众多插件中,有一个名为flexvolume,只有这个插件带有参数pluginDir,说明只有这个插件支持自定义实现。具体kubelet怎么和这些插件交互,以及这些插件提供哪些接口,还需要继续阅读代码

  • RunKubelet:这才是kubelet服务的启动方法,其中最重要的功能都藏在startKubelet中

        func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {                //初始化启动器                ...                if runOnce {                        if _, err := k.RunOnce(podCfg.Updates()); err != nil {                                return fmt.Errorf("runonce failed: %v", err)                        }                        glog.Infof("Started kubelet %s as runonce", version.Get().String())                } else {                        startKubelet(k, podCfg, kubeCfg, kubeDeps)                        glog.Infof("Started kubelet %s", version.Get().String())                }                return nil        }


    startKubelet包含两个环节

           func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) {                // 同步pod信息                go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)                // 启动kubelet服务                if kubeCfg.EnableServer {                        go wait.Until(func() {                                k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)                        }, 0, wait.NeverStop)                }                if kubeCfg.ReadOnlyPort > 0 {                        go wait.Until(func() {                                k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))                        }, 0, wait.NeverStop)                }        }


    跟踪同步pod信息的Run方法,会追查到这段代码

     func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {            ...                go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)                if kl.kubeClient != nil {                        //同步node信息                        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)                }                // 同步pod信息                kl.pleg.Start()                kl.syncLoop(updates, kl)        }


    kl.volumeManager是kubelet进行数据卷管理的核心接口

        type VolumeManager interface {                Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})                WaitForAttachAndMount(pod *v1.Pod) error                GetMountedVolumesForPod(podName types.UniquePodName) container.VolumeMap                GetExtraSupplementalGroupsForPod(pod *v1.Pod) []int64                GetVolumesInUse() []v1.UniqueVolumeName                ReconcilerStatesHasBeenSynced() bool                VolumeIsAttached(volumeName v1.UniqueVolumeName) bool                MarkVolumesAsReportedInUse(volumesReportedAsInUse []v1.UniqueVolumeName)        }


    VolumeManager的Run会执行一个异步循环,当pod被调度到该node,它会检查该pod所申请的所有volume,根据这些volume与pod的关系做attach/detach/mount/unmount操作

        func (vm *volumeManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {                defer runtime.HandleCrash()                go vm.desiredStateOfWorldPopulator.Run(sourcesReady, stopCh)                glog.V(2).Infof("The desired_state_of_world populator starts")                glog.Infof("Starting Kubelet Volume Manager")                go vm.reconciler.Run(stopCh)                <-stopCh                glog.Infof("Shutting down Kubelet Volume Manager")        }


    其中重点关注的地方是vm.desiredStateOfWorldPopulator.Runvm.reconciler.Run这两个方法。在介绍这两个方法之前,需要补充一个关键信息,这也是理解这两个方法的关键信息。

    kubelet管理volume的方式基于两个不同的状态:

    理解了这两个状态,就能大概知道vm.desiredStateOfWorldPopulator.Run这个方法是干什么的呢。很明显,它就是根据从apiserver同步到的pod信息,来更新DesiredStateOfWorld。另外一个方法vm.reconciler.Run,是预期状态和实际状态的协调者,它负责将实际状态调整成与预期状态。预期状态的更新实现,以及协调者具体如何协调,需要继续阅读代码才能理解

    追踪vm.desiredStateOfWorldPopulator.Run,我们发现这段逻辑

           func (dswp *desiredStateOfWorldPopulator) findAndAddNewPods() {                for _, pod := range dswp.podManager.GetPods() {                        if dswp.isPodTerminated(pod) {                                continue                        }                        dswp.processPodVolumes(pod)                }        }


    kubelet会同步新增的pod到desiredStateOfWorldPopulator的podManager中。这段代码就是轮询其中非结束状态的pod,并交给desiredStateOfWorldPopulator处理

      func (dswp *desiredStateOfWorldPopulator) processPodVolumes(pod *v1.Pod) {                ...                for _, podVolume := range pod.Spec.Volumes {                        volumeSpec, volumeGidValue, err :=                                dswp.createVolumeSpec(podVolume, pod.Namespace)                        if err != nil {                                glog.Errorf(                                        "Error processing volume %q for pod %q: %v",                                        podVolume.Name,                                        format.Pod(pod),                                        err)                                continue                        }                        _, err = dswp.desiredStateOfWorld.AddPodToVolume(                                uniquePodName, pod, volumeSpec, podVolume.Name, volumeGidValue)                        if err != nil {                                glog.Errorf(                                        "Failed to add volume %q (specName: %q) for pod %q to desiredStateOfWorld. err=%v",                                        podVolume.Name,                                        volumeSpec.Name(),                                        uniquePodName,                                        err)                        }                        glog.V(10).Infof(                                "Added volume %q (volSpec=%q) for pod %q to desired state.",                                podVolume.Name,                                volumeSpec.Name(),                                uniquePodName)                }                dswp.markPodProcessed(uniquePodName)        }


    desiredStateOfWorldPopulator并不处理很重的逻辑,只是作为一个代理,将控制某个pod预期状态的逻辑交付给desiredStateOfWorld,并标记为已处理

         func (dsw *desiredStateOfWorld) AddPodToVolume(                podName types.UniquePodName,                pod *v1.Pod,                volumeSpec *volume.Spec,                outerVolumeSpecName string,                volumeGidValue string) (v1.UniqueVolumeName, error) {                ...                dsw.volumesToMount[volumeName].podsToMount[podName] = podToMount{                        podName:             podName,                        pod:                 pod,                        spec:                volumeSpec,                        outerVolumeSpecName: outerVolumeSpecName,                }                return volumeName, nil        }


    这段逻辑中,我们忽略了前面一系列预处理操作,直接关注最核心的地方:确定预期状态的方式就是,用一个映射表结构,绑定volume到pod之间的关系,这个关系表就是绑定关系的参考依据

    看完了desiredStateOfWorldPopulator的处理逻辑,接着进入另一个核心接口reconciler。它才是volume manager中最重要的控制器

    追踪reconciler的Run方法,我们定位到最核心的一段代码

           func (rc *reconciler) reconcile() {                //umount                for _, mountedVolume := range rc.actualStateOfWorld.GetMountedVolumes() {                        if !rc.desiredStateOfWorld.PodExistsInVolume(mountedVolume.PodName, mountedVolume.VolumeName) {                                ...                                err := rc.operationExecutor.UnmountVolume(                                        mountedVolume.MountedVolume, rc.actualStateOfWorld)                                ...                        }                }                // attach/mount                for _, volumeToMount := range rc.desiredStateOfWorld.GetVolumesToMount() {                        volMounted, devicePath, err := rc.actualStateOfWorld.PodExistsInVolume(volumeToMount.PodName, volumeToMount.VolumeName)                        volumeToMount.DevicePath = devicePath                        if cache.IsVolumeNotAttachedError(err) {                                ...                                err := rc.operationExecutor.AttachVolume(volumeToAttach, rc.actualStateOfWorld)                                ...                        } else if !volMounted || cache.IsRemountRequiredError(err) {                                ...                                err := rc.operationExecutor.MountVolume(                                        rc.waitForAttachTimeout,                                        volumeToMount.VolumeToMount,                                        rc.actualStateOfWorld)                                ...                        }                }                //detach/unmount                for _, attachedVolume := range rc.actualStateOfWorld.GetUnmountedVolumes() {                        if !rc.desiredStateOfWorld.VolumeExists(attachedVolume.VolumeName) &&                                !rc.operationExecutor.IsOperationPending(attachedVolume.VolumeName, nestedpendingoperations.EmptyUniquePodName) {                                if attachedVolume.GloballyMounted {                                        ...                                        err := rc.operationExecutor.UnmountDevice(                                                attachedVolume.AttachedVolume, rc.actualStateOfWorld, rc.mounter)                                        ...                                } else {                                        ...                                        err := rc.operationExecutor.DetachVolume(                                                        attachedVolume.AttachedVolume, false,rc.actualStateOfWorld)                                        ...                                }                        }                }        }


    我略去了多余的代码,保留最核心的部分。这段控制逻辑就是一个协调器,具体要做的事情就是,根据实际状态与预期状态的差异,做协调操作

    如果采用自定义的flexvolume插件,上述这些方法会对插件中实现的方法进行系统调用

    flex volume提供的lvm插件。如果需要支持mount和unmount操作,可以在这个脚本中补充

         #!/bin/bash        # Copyright 2015 The Kubernetes Authors.        #        # Licensed under the Apache License, Version 2.0 (the "License");        # you may not use this file except in compliance with the License.        # You may obtain a copy of the License at        #        #     http://www.apache.org/licenses/LICENSE-2.0        #        # Unless required by applicable law or agreed to in writing, software        # distributed under the License is distributed on an "AS IS" BASIS,        # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.        # See the License for the specific language governing permissions and        # limitations under the License.        # Notes:        #  - Please install "jq" package before using this driver.        usage() {                err "Invalid usage. Usage: "                err "\t$0 init"                err "\t$0 attach  "                err "\t$0 detach  "                err "\t$0 waitforattach  "                err "\t$0 mountdevice   "                err "\t$0 unmountdevice "                err "\t$0 isattached  "                exit 1        }        err() {                echo -ne $* 1>&2        }        log() {                echo -ne $* >&1        }        ismounted() {                MOUNT=`findmnt -n ${MNTPATH} 2>/dev/null | cut -d' ' -f1`                if [ "${MOUNT}" == "${MNTPATH}" ]; then                        echo "1"                else                        echo "0"                fi        }        getdevice() {                VOLUMEID=$(echo ${JSON_PARAMS} | jq -r '.volumeID')                VG=$(echo ${JSON_PARAMS}|jq -r '.volumegroup')                # LVM substitutes - with --                VOLUMEID=`echo $VOLUMEID|sed s/-/--/g`                VG=`echo $VG|sed s/-/--/g`                DMDEV="/dev/mapper/${VG}-${VOLUMEID}"                echo ${DMDEV}        }        attach() {                JSON_PARAMS=$1                SIZE=$(echo $1 | jq -r '.size')                DMDEV=$(getdevice)                if [ ! -b "${DMDEV}" ]; then                        err "{\"status\": \"Failure\", \"message\": \"Volume ${VOLUMEID} does not exist\"}"                        exit 1                fi                log "{\"status\": \"Success\", \"device\":\"${DMDEV}\"}"                exit 0        }        detach() {                log "{\"status\": \"Success\"}"                exit 0        }        waitforattach() {                shift                attach $*        }        domountdevice() {                MNTPATH=$1                DMDEV=$2                FSTYPE=$(echo $3|jq -r '.["kubernetes.io/fsType"]')                if [ ! -b "${DMDEV}" ]; then                        err "{\"status\": \"Failure\", \"message\": \"${DMDEV} does not exist\"}"                        exit 1                fi                if [ $(ismounted) -eq 1 ] ; then                        log "{\"status\": \"Success\"}"                        exit 0                fi                VOLFSTYPE=`blkid -o udev ${DMDEV} 2>/dev/null|grep "ID_FS_TYPE"|cut -d"=" -f2`                if [ "${VOLFSTYPE}" == "" ]; then                        mkfs -t ${FSTYPE} ${DMDEV} >/dev/null 2>&1                        if [ $? -ne 0 ]; then                                err "{ \"status\": \"Failure\", \"message\": \"Failed to create fs ${FSTYPE} on device ${DMDEV}\"}"                                exit 1                        fi                fi                mkdir -p ${MNTPATH} &> /dev/null                mount ${DMDEV} ${MNTPATH} &> /dev/null                if [ $? -ne 0 ]; then                        err "{ \"status\": \"Failure\", \"message\": \"Failed to mount device ${DMDEV} at ${MNTPATH}\"}"                        exit 1                fi                log "{\"status\": \"Success\"}"                exit 0        }        unmountdevice() {                MNTPATH=$1                if [ ! -d ${MNTPATH} ]; then                        log "{\"status\": \"Success\"}"                        exit 0                fi                if [ $(ismounted) -eq 0 ] ; then                        log "{\"status\": \"Success\"}"                        exit 0                fi                umount ${MNTPATH} &> /dev/null                if [ $? -ne 0 ]; then                        err "{ \"status\": \"Failed\", \"message\": \"Failed to unmount volume at ${MNTPATH}\"}"                        exit 1                fi                log "{\"status\": \"Success\"}"                exit 0        }        isattached() {                log "{\"status\": \"Success\", \"attached\":true}"                exit 0        }        op=$1        if [ "$op" = "init" ]; then                log "{\"status\": \"Success\"}"                exit 0        fi        if [ $# -lt 2 ]; then                usage        fi        shift        case "$op" in                attach)                        attach $*                        ;;                detach)                        detach $*                        ;;                waitforattach)                        waitforattach $*                        ;;                mountdevice)                        domountdevice $*                        ;;                unmountdevice)                        unmountdevice $*                        ;;                isattached)                        isattached $*                        ;;                *)                        log "{ \"status\": \"Not supported\" }"                        exit 0        esac        exit 1


    值得注意的是,为什么会有两次mount操作,一次mountdevice,一次mount。分别是做什么的?

    其实k8s提供的volume管理方式是,一个volume可以被多个pod挂载,如果某个device需要作为多个pod的volume,就需要多次挂载。但是device只能被挂载一次。所以,k8s采用的方式是,先用mountdevice将device挂载到一个全局目录,然后这个全局目录就可以被多次挂载到pod的卷目录。如此一来,就能完成多pod挂载同一个volume

    • AttachVolume:调用attach

    • DetachVolume:调用detach

    • MountVolume:调用mountdevice,mount

    • UnmountVolume:调用unmount

    • UnmountDevice:调用umountdevice

    • volume和pod的预期状态不存在绑定关系,则detach volume,并对pod和volume执行unmount操作

    • volume和pod的预期状态存在绑定关系,则attach volume,并对pod和volume执行mount操作

    • DesiredStateOfWorld:预期中,pod对volume的使用情况,简称预期状态。当pod.yaml定制好volume,并提交成功,预期状态就已经确定

    • ActualStateOfWorld:实际中,pod对voluem的使用情况,简称实际状态。实际状态是kubelet的后台线程监控的结果

    • 不断同步apiserver的pod信息,根据新增、删除的pod对volume状态进行同步更新

    • 启动服务,监听controller manager的请求。其中controller manager可以辅助kubelet管理volume,用户也可以选择禁用controller manager的管理

只有理解了volume manager的代码,在使用它提供的volume plugin或者实现自定义flex volume plugin时才能驾轻就熟。以上代码,都是基于k8s v1.6.6版本

以上就是如何理解kubernetes数据卷管理的源码,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

状态 方法 管理 插件 信息 代码 两个 就是 逻辑 同步 实际 核心 数据 重要 方式 处理 源码 只有 接口 目录 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 数据库外部接口连接 宝塔面板服务器怎样管理 数据库改数据set 软件开发设计与制造专业排名 装服务器一定要进管理地址么 一个人的软件开发公司 数据库公理系统三条推论 网络安全社会工程学攻击案例 网络安全教育平台如何安装 网络技术应用章节目录 郑州做软件开发的公司 宝山区运营软件开发价格走势 数据库使用最多的是什么 数据库角色的安全性分类 网络安全平台密码忘记了怎么办 上海水电缴费软件开发团队 和平精英在哪个服务器上能下载 政府机关能不能用云服务器 app手机软件开发大专院校 非关系型数据库 知乎 慈溪软件开发哪家安全 网络安全小学主题队会教案 服务器装到机柜需要什么 网络安全知识专题讲座直播 网络安全和信息大赛 网络安全法实施后的数据 查询数据库字段值相同的记录 上海水电缴费软件开发团队 互联网中医科技 网络技术服务岗位要求
0