千家信息网

Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的

发表于:2025-02-11 作者:千家信息网编辑
千家信息网最后更新 2025年02月11日,Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单
千家信息网最后更新 2025年02月11日Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的

Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

ResoureQuota介绍

关于ResoureQuota和ResourceController的介绍和使用请参见如下官方文档。这是你理解这篇博客的基础。

  • https://kubernetes.io/docs/admin/resourcequota/

  • https://kubernetes.io/docs/admin/resourcequota/walkthrough/

  • https://kubernetes.io/docs/user-guide/compute-resources/

  • https://kubernetes.io/docs/admin/admission-controllers/

  • https://github.com/kubernetes/community/blob/master/contributors/design-proposals/admission_control_resource_quota.md

ResourceQuota Controller源码目录结构分析

ResourceQuota Controller作为Kubernetes Controller Manager管理的众多Controller中的一员,其主要的源码位于目录k8s.io/kubernetes/pkg/quotak8s.io/kubernetes/pkg/controller/resourcequota,具体分析如下:

k8s.io/kubernetes/pkg/quota.├── evaluator    // 负责各种资源使用的统计│   └── core│       ├── configmap.go   // ConfigMapEvaluator的实现,负责ConfigMap资源的统计│       ├── doc.go│       ├── persistent_volume_claims.go    // PVCEvaluator的实现,负责PVC资源的统计│       ├── persistent_volume_claims_test.go│       ├── pods.go    //PodEvaluator的实现,负责Pod资源的统计│       ├── pods_test.go│       ├── registry.go    // 创建Registry时注册所有的Evaluators│       ├── replication_controllers.go    // RCEvaluator的实现,负责ReplicationController资源的统计│       ├── resource_quotas.go    // ResourceQuotaEvaluator的实现,负责ResourceQuota资源的统计│       ├── secrets.go    // SecretEvaluator的实现,负责Secret资源的统计│       ├── services.go    // ServiceEvaluator的实现,负责Service资源的统计│       └── services_test.go├── generic    // genericEvaluator的定义和实现│   ├── evaluator.go    // 实现了genericEvaluator的接口,包括最重要的CalculateUsageStats接口│   └── registry.go    // 定义GenericRegistry├── install│   └── registry.go    // 定义了startResourceQuotaController时会调用创建ResourceQuota Registry的方法├── interfaces.go    // 定义了Registry和Evaluator Interface├── resources.go    // 定义Resources的集合操作以及CalculateUsage方法└── resources_test.go
k8s.io/kubernetes/pkg/controller/resourcequota.├── doc.go├── replenishment_controller.go    // 定义replenishmentControllerFactory,用来创建replenishmentController├── replenishment_controller_test.go├── resource_quota_controller.go    // 定义ResourceQuotaController及其Run方法,syncResourceQuota方法等,属于核心文件。└── resource_quota_controller_test.go

ResourceQuota Controller内部实现原理图

请下载到本地放大查看。

具体各个模块的功能和交互请看下面的源码分析。

ResourceQuota Controller源码分析

上面的内部实现原理图显示,ResourceQuotaController是Kubenetes Controller Manager启动进行初始化众多Controllers的时候,通过调用startResourceQuotaController来完成ResourceQuotaController的启动。

###从kube-controller-manager的startResourceQuotaController开始

cmd/kube-controller-manager/app/core.go:76func startResourceQuotaController(ctx ControllerContext) (bool, error) {        resourceQuotaControllerClient := ctx.ClientBuilder.ClientOrDie("resourcequota-controller")        resourceQuotaRegistry := quotainstall.NewRegistry(resourceQuotaControllerClient, ctx.InformerFactory)                // 定义ReplenishmentController需要监控的资源对象        groupKindsToReplenish := []schema.GroupKind{                api.Kind("Pod"),                api.Kind("Service"),                api.Kind("ReplicationController"),                api.Kind("PersistentVolumeClaim"),                api.Kind("Secret"),                api.Kind("ConfigMap"),        }                ...                go resourcequotacontroller.NewResourceQuotaController(                resourceQuotaControllerOptions,        ).Run(int(ctx.Options.ConcurrentResourceQuotaSyncs), ctx.Stop)        return true, nil}

startResourceQuotaController启动一个goroutine,通过NewResourceQuotaController创建一个ResourceQuotaController并执行其Run方法开始提供ResourceQuotaController。

下面是ResourceQuotaController和ResourceQuotaControllerOptions结构体的定义。ResourceQuotaController中定义了几个关键Entity,分别是rqController、queue、missingUsageQueue、registry、replenishmentControllers,在上一节中的原理图中也能看到它们的身影。

###ResourceQuotaController定义

pkg/controller/resourcequota/resource_quota_controller.go:40// ResourceQuotaControllerOptions holds options for creating a quota controllertype ResourceQuotaControllerOptions struct {        // Must have authority to list all quotas, and update quota status        KubeClient clientset.Interface        // Controls full recalculation of quota usage        ResyncPeriod controller.ResyncPeriodFunc        // Knows how to calculate usage        Registry quota.Registry        // Knows how to build controllers that notify replenishment events        ControllerFactory ReplenishmentControllerFactory        // Controls full resync of objects monitored for replenihsment.        ReplenishmentResyncPeriod controller.ResyncPeriodFunc        // List of GroupKind objects that should be monitored for replenishment at        // a faster frequency than the quota controller recalculation interval        GroupKindsToReplenish []schema.GroupKind}// ResourceQuotaController is responsible for tracking quota usage status in the systemtype ResourceQuotaController struct {        // Must have authority to list all resources in the system, and update quota status        kubeClient clientset.Interface        // An index of resource quota objects by namespace        rqIndexer cache.Indexer        // Watches changes to all resource quota        rqController *cache.Controller        // ResourceQuota objects that need to be synchronized        queue workqueue.RateLimitingInterface        // missingUsageQueue holds objects that are missing the initial usage informatino        missingUsageQueue workqueue.RateLimitingInterface        // To allow injection of syncUsage for testing.        syncHandler func(key string) error        // function that controls full recalculation of quota usage        resyncPeriod controller.ResyncPeriodFunc        // knows how to calculate usage        registry quota.Registry        // controllers monitoring to notify for replenishment        replenishmentControllers []cache.ControllerInterface}

NewRegistry

接下来,我们看看startResourceQuotaController调用的NewRegistry、NewResourceQuotaController以及ResourceQuotaController的Run方法。

pkg/quota/evaluator/core/registry.go:29// NewRegistry returns a registry that knows how to deal with core kubernetes resources// If an informer factory is provided, evaluators will use them.func NewRegistry(kubeClient clientset.Interface, f informers.SharedInformerFactory) quota.Registry {        pod := NewPodEvaluator(kubeClient, f)        service := NewServiceEvaluator(kubeClient)        replicationController := NewReplicationControllerEvaluator(kubeClient)        resourceQuota := NewResourceQuotaEvaluator(kubeClient)        secret := NewSecretEvaluator(kubeClient)        configMap := NewConfigMapEvaluator(kubeClient)        persistentVolumeClaim := NewPersistentVolumeClaimEvaluator(kubeClient, f)        return &generic.GenericRegistry{                InternalEvaluators: map[schema.GroupKind]quota.Evaluator{                        pod.GroupKind():                   pod,                        service.GroupKind():               service,                        replicationController.GroupKind(): replicationController,                        secret.GroupKind():                secret,                        configMap.GroupKind():             configMap,                        resourceQuota.GroupKind():         resourceQuota,                        persistentVolumeClaim.GroupKind(): persistentVolumeClaim,                },        }}

可见,NewRegistry负责这些资源对象(pod,service,rc,secret,configMap,resourceQuota,PVC)的的Evaluator的创建和注册,供后面Worker中执行quota.CalculateUsage(...)对这些资源对象进行使用统计。

NewResourceQuotaController

NewRegistry执行完后,开始创建ResourceQuotaController,代码如下。

pkg/controller/resourcequota/resource_quota_controller.go:78func NewResourceQuotaController(options *ResourceQuotaControllerOptions) *ResourceQuotaController {        // build the resource quota controller        rq := &ResourceQuotaController{                kubeClient:               options.KubeClient,                queue:                    workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_primary"),                missingUsageQueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resourcequota_priority"),                resyncPeriod:             options.ResyncPeriod,                registry:                 options.Registry,                replenishmentControllers: []cache.ControllerInterface{},        }                ...                // set the synchronization handler        rq.syncHandler = rq.syncResourceQuotaFromKey        // build the controller that observes quota        rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(                &cache.ListWatch{                        ListFunc: func(options v1.ListOptions) (runtime.Object, error) {                                return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).List(options)                        },                        WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {                                return rq.kubeClient.Core().ResourceQuotas(v1.NamespaceAll).Watch(options)                        },                },                &v1.ResourceQuota{},                rq.resyncPeriod(),                cache.ResourceEventHandlerFuncs{                        AddFunc: rq.addQuota,                        UpdateFunc: func(old, cur interface{}) {                                oldResourceQuota := old.(*v1.ResourceQuota)                                curResourceQuota := cur.(*v1.ResourceQuota)                                if quota.V1Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {                                        return                                }                                rq.addQuota(curResourceQuota)                        },                        DeleteFunc: rq.enqueueResourceQuota,                },                cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},        )        for _, groupKindToReplenish := range options.GroupKindsToReplenish {                controllerOptions := &ReplenishmentControllerOptions{                        GroupKind:         groupKindToReplenish,                        ResyncPeriod:      options.ReplenishmentResyncPeriod,                        ReplenishmentFunc: rq.replenishQuota,                }                replenishmentController, err := options.ControllerFactory.NewController(controllerOptions)                if err != nil {                        glog.Warningf("quota controller unable to replenish %s due to %v, changes only accounted during full resync", groupKindToReplenish, err)                } else {                        rq.replenishmentControllers = append(rq.replenishmentControllers, replenishmentController)                }        }        return rq}

NewResourceQuotaController负责创建ResourceQuotaController,包括queue, missingUsageQueue, syncHandler,rqIndexer, rqController,replenishmentControllers的Entity填充。重点关注 rq.rqIndexer, rq.rqController = cache.NewIndexerInformer(...) 进行了rqController中注册ResourceEventHandlerFuncs:addQuota和enqueueResourceQuota。另外, replenishmentController, err := options.ControllerFactory.NewController(controllerOptions) 负责replenishmentController的创建,NewRegistry中注册了6种replenishmentSource,所以这里replenishmentControllers会添加对应的6中replenishmentController。

###ResourceQuotaController.Run

创建完ResourceQuotaController之后,就执行Run方法开始进行任务处理了。

pkg/controller/resourcequota/resource_quota_controller.go:227// Run begins quota controller using the specified number of workersfunc (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {        ...        // 启动rqController和rq.replenishmentControllers中的6中replenishmentController,开始watch对应的ResourceQuota加入到queue和missingUsageQueue。        go rq.rqController.Run(stopCh)        // the controllers that replenish other resources to respond rapidly to state changes        for _, replenishmentController := range rq.replenishmentControllers {                go replenishmentController.Run(stopCh)        }                // 启动workers数量的worker协程,分别对queue和missingUsageQueue中的Item。        for i := 0; i < workers; i++ {                go wait.Until(rq.worker(rq.queue), time.Second, stopCh)                go wait.Until(rq.worker(rq.missingUsageQueue), time.Second, stopCh)        }                // 定期的进行全量的quotas计算。        go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), stopCh)        <-stopCh        glog.Infof("Shutting down ResourceQuotaController")        rq.queue.ShutDown()}

Worker

接下来的主要处理都交给了workers进行处理了,默认配置是有5个worker对queue进行处理,有5个worker对missingUsageQuota进行处理。下面来看看worker是怎么对Queue中的Item进行处理的。

pkg/controller/resourcequota/resource_quota_controller.go:199// worker runs a worker thread that just dequeues items, processes them, and marks them done.func (rq *ResourceQuotaController) worker(queue workqueue.RateLimitingInterface) func() {        workFunc := func() bool {                // 从queue中获取Key                key, quit := queue.Get()                if quit {                        return true                }                defer queue.Done(key)                                // 执行NewResourceQuotaController时注册的syncHandler(流程跳转到syncResourceQuotaFromKey)                 err := rq.syncHandler(key.(string))                ...        }        return func() {                for {                        if quit := workFunc(); quit {                                glog.Infof("resource quota controller worker shutting down")                                return                        }                }        }}

流程进入到syncResourceQuotaFromKey,下面看看它的实现:

pkg/controller/resourcequota/resource_quota_controller.go:247// syncResourceQuotaFromKey syncs a quota keyfunc (rq *ResourceQuotaController) syncResourceQuotaFromKey(key string) (err error) {        ...        obj, exists, err := rq.rqIndexer.GetByKey(key)        ...        quota := *obj.(*v1.ResourceQuota)        return rq.syncResourceQuota(quota)}

syncResourceQuotaFromKey根据从queue中获得的key,从rqIndexer中得到该Object,然后执行rq.syncResourceQuota(quota)。

pkg/controller/resourcequota/resource_quota_controller.go:268// syncResourceQuota runs a complete sync of resource quota status across all known kindsfunc (rq *ResourceQuotaController) syncResourceQuota(v1ResourceQuota v1.ResourceQuota) (err error) {        ...        newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry)        ...        // ensure set of used values match those that have hard constraints        hardResources := quota.ResourceNames(hardLimits)        used = quota.Mask(used, hardResources)        usage := api.ResourceQuota{                ObjectMeta: api.ObjectMeta{                        Name:            resourceQuota.Name,                        Namespace:       resourceQuota.Namespace,                        ResourceVersion: resourceQuota.ResourceVersion,                        Labels:          resourceQuota.Labels,                        Annotations:     resourceQuota.Annotations},                Status: api.ResourceQuotaStatus{                        Hard: hardLimits,                        Used: used,                },        }        dirty = dirty || !quota.Equals(usage.Status.Used, resourceQuota.Status.Used)        // there was a change observed by this controller that requires we update quota        if dirty {                v1Usage := &v1.ResourceQuota{}                if err := v1.Convert_api_ResourceQuota_To_v1_ResourceQuota(&usage, v1Usage, nil); err != nil {                        return err                }                _, err = rq.kubeClient.Core().ResourceQuotas(usage.Namespace).UpdateStatus(v1Usage)                return err        }        return nil}

syncResourceQuota中最关键的操作是: newUsage, err := quota.CalculateUsage(resourceQuota.Namespace, resourceQuota.Spec.Scopes, hardLimits, rq.registry) quota.CalculateUsage根据namespace, quota的Scope,hardLimits,registry对该Item(resourceQuota)进行CalculateUsage。

pkg/quota/resources.go:217// CalculateUsage calculates and returns the requested ResourceList usagefunc CalculateUsage(namespaceName string, scopes []api.ResourceQuotaScope, hardLimits api.ResourceList, registry Registry) (api.ResourceList, error) {        // find the intersection between the hard resources on the quota        // and the resources this controller can track to know what we can        // look to measure updated usage stats for        hardResources := ResourceNames(hardLimits)        potentialResources := []api.ResourceName{}        evaluators := registry.Evaluators()        for _, evaluator := range evaluators {                potentialResources = append(potentialResources, evaluator.MatchingResources(hardResources)...)        }        // NOTE: the intersection just removes duplicates since the evaluator match intersects wtih hard        matchedResources := Intersection(hardResources, potentialResources)        // sum the observed usage from each evaluator        newUsage := api.ResourceList{}        for _, evaluator := range evaluators {                // only trigger the evaluator if it matches a resource in the quota, otherwise, skip calculating anything                intersection := evaluator.MatchingResources(matchedResources)                if len(intersection) == 0 {                        continue                }                usageStatsOptions := UsageStatsOptions{Namespace: namespaceName, Scopes: scopes, Resources: intersection}                stats, err := evaluator.UsageStats(usageStatsOptions)                if err != nil {                        return nil, err                }                newUsage = Add(newUsage, stats.Used)        }        // mask the observed usage to only the set of resources tracked by this quota        // merge our observed usage with the quota usage status        // if the new usage is different than the last usage, we will need to do an update        newUsage = Mask(newUsage, matchedResources)        return newUsage, nil}

CalculateUsage中最重要的一步是循环registry中注册的所有Evaluators,执行对应Evaluator的UsageStats方法进资源使用统计。看到这里,你也许懵逼了,Evaluators又是个什么东西?

我们先来看看Registry和Evaluator的关系,以及Evaluator的定义。

pkg/quota/interfaces.go:62// Registry holds the list of evaluators associated to a particular group kindtype Registry interface {        // Evaluators returns the set Evaluator objects registered to a groupKind        Evaluators() map[schema.GroupKind]Evaluator}pkg/quota/interfaces.go:43// Evaluator knows how to evaluate quota usage for a particular group kindtype Evaluator interface {        // Constraints ensures that each required resource is present on item        Constraints(required []api.ResourceName, item runtime.Object) error        // GroupKind returns the groupKind that this object knows how to evaluate        GroupKind() schema.GroupKind        // Handles determines if quota could be impacted by the specified operation.        // If true, admission control must perform quota processing for the operation, otherwise it is safe to ignore quota.        Handles(operation admission.Operation) bool        // Matches returns true if the specified quota matches the input item        Matches(resourceQuota *api.ResourceQuota, item runtime.Object) (bool, error)        // MatchingResources takes the input specified list of resources and returns the set of resources evaluator matches.        MatchingResources(input []api.ResourceName) []api.ResourceName        // Usage returns the resource usage for the specified object        Usage(item runtime.Object) (api.ResourceList, error)        // UsageStats calculates latest observed usage stats for all objects        UsageStats(options UsageStatsOptions) (UsageStats, error)}

可见Evaluator就是一系列操作的集合,是一个Interface,而Registry就是资源类型到Evaluator的一个Map。

Kubernetes中定义了7种资源的Evaluator,都在pkg/quota/evaluator/core/*目录下,比如pods.go就是PodEvaluator的实现,里面实现了关键的UsageStats方法。除了PodEvaluator之外,其他的Evaluator的UsageStats实现,都是genericEvaluator来完成的,其代码在pkg/quota/generic/evaluator.go:177

具体Evaluator的代码分析,请读者自行完成。

下面我给出Worker的内部流程图,供大家参考:

###ReplenishmentController

rqController负责watch待sync的ResourceQuota,并将其加入到queue和missingUsageQueue中,而上面分析NewResourceQuotaController代码时提到: replenishmentController, err := options.ControllerFactory.NewController(controllerOptions) 负责replenishmentController的创建,那replenishmentController又是啥呢?我们有必要来看看replenishmentController的创建。

pkg/controller/resourcequota/replenishment_controller.go:131func (r *replenishmentControllerFactory) NewController(options *ReplenishmentControllerOptions) (result cache.ControllerInterface, err error) {        ...        switch options.GroupKind {        case api.Kind("Pod"):                if r.sharedInformerFactory != nil {                        result, err = controllerFor(api.Resource("pods"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{                                UpdateFunc: PodReplenishmentUpdateFunc(options),                                DeleteFunc: ObjectReplenishmentDeleteFunc(options),                        })                        break                }                result = informers.NewPodInformer(r.kubeClient, options.ResyncPeriod())        case api.Kind("Service"):                // TODO move to informer when defined                _, result = cache.NewInformer(                        &cache.ListWatch{                                ListFunc: func(options v1.ListOptions) (runtime.Object, error) {                                        return r.kubeClient.Core().Services(v1.NamespaceAll).List(options)                                },                                WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {                                        return r.kubeClient.Core().Services(v1.NamespaceAll).Watch(options)                                },                        },                        &v1.Service{},                        options.ResyncPeriod(),                        cache.ResourceEventHandlerFuncs{                                UpdateFunc: ServiceReplenishmentUpdateFunc(options),                                DeleteFunc: ObjectReplenishmentDeleteFunc(options),                        },                )        case api.Kind("ReplicationController"):                // TODO move to informer when defined                _, result = cache.NewInformer(                        &cache.ListWatch{                                ListFunc: func(options v1.ListOptions) (runtime.Object, error) {                                        return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).List(options)                                },                                WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {                                        return r.kubeClient.Core().ReplicationControllers(v1.NamespaceAll).Watch(options)                                },                        },                        &v1.ReplicationController{},                        options.ResyncPeriod(),                        cache.ResourceEventHandlerFuncs{                                DeleteFunc: ObjectReplenishmentDeleteFunc(options),                        },                )        case api.Kind("PersistentVolumeClaim"):                if r.sharedInformerFactory != nil {                        result, err = controllerFor(api.Resource("persistentvolumeclaims"), r.sharedInformerFactory, cache.ResourceEventHandlerFuncs{                                DeleteFunc: ObjectReplenishmentDeleteFunc(options),                        })                        break                }                // TODO (derekwaynecarr) remove me when we can require a sharedInformerFactory in all code paths...                _, result = cache.NewInformer(                        &cache.ListWatch{                                ListFunc: func(options v1.ListOptions) (runtime.Object, error) {                                        return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).List(options)                                },                                WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {                                        return r.kubeClient.Core().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)                                },                        },                        &v1.PersistentVolumeClaim{},                        options.ResyncPeriod(),                        cache.ResourceEventHandlerFuncs{                                DeleteFunc: ObjectReplenishmentDeleteFunc(options),                        },                )        case api.Kind("Secret"):                // TODO move to informer when defined                _, result = cache.NewInformer(                        &cache.ListWatch{                                ListFunc: func(options v1.ListOptions) (runtime.Object, error) {                                        return r.kubeClient.Core().Secrets(v1.NamespaceAll).List(options)                                },                                WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {                                        return r.kubeClient.Core().Secrets(v1.NamespaceAll).Watch(options)                                },                        },                        &v1.Secret{},                        options.ResyncPeriod(),                        cache.ResourceEventHandlerFuncs{                                DeleteFunc: ObjectReplenishmentDeleteFunc(options),                        },                )        case api.Kind("ConfigMap"):                // TODO move to informer when defined                _, result = cache.NewInformer(                        &cache.ListWatch{                                ListFunc: func(options v1.ListOptions) (runtime.Object, error) {                                        return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).List(options)                                },                                WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {                                        return r.kubeClient.Core().ConfigMaps(v1.NamespaceAll).Watch(options)                                },                        },                        &v1.ConfigMap{},                        options.ResyncPeriod(),                        cache.ResourceEventHandlerFuncs{                                DeleteFunc: ObjectReplenishmentDeleteFunc(options),                        },                )        default:                return nil, NewUnhandledGroupKindError(options.GroupKind)        }        return result, err}

整个代码结构非常清晰,就是根据不同的资源类型,返回对应的Controller。而每种资源的Controller的定义都是通过创建一个对应的Informer完成。Informer中注册对应的ResourceEventHandlerFuncs:UpdateFunc和DeleteFunc用来出watch的对象发生对应的change时需要调用的方法。

以Pod为例,看看Pod注册的UpdateFunc:PodReplenishmentUpdateFunc和DeleteFunc:ObjectReplenishmentDeleteFunc,你就知道replenishmentController是用来干啥的了。

pkg/controller/resourcequota/replenishment_controller.go:56// PodReplenishmentUpdateFunc will replenish if the old pod was quota tracked but the new is notfunc PodReplenishmentUpdateFunc(options *ReplenishmentControllerOptions) func(oldObj, newObj interface{}) {        return func(oldObj, newObj interface{}) {                oldPod := oldObj.(*v1.Pod)                newPod := newObj.(*v1.Pod)                if core.QuotaV1Pod(oldPod) && !core.QuotaV1Pod(newPod) {                        options.ReplenishmentFunc(options.GroupKind, newPod.Namespace, oldPod)                }        }}// ObjectReplenenishmentDeleteFunc will replenish on every deletefunc ObjectReplenishmentDeleteFunc(options *ReplenishmentControllerOptions) func(obj interface{}) {        return func(obj interface{}) {                metaObject, err := meta.Accessor(obj)                if err != nil {                        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)                        if !ok {                                glog.Errorf("replenishment controller could not get object from tombstone %+v, could take up to %v before quota is replenished", obj, options.ResyncPeriod())                                utilruntime.HandleError(err)                                return                        }                        metaObject, err = meta.Accessor(tombstone.Obj)                        if err != nil {                                glog.Errorf("replenishment controller tombstone contained object that is not a meta %+v, could take up to %v before quota is replenished", tombstone.Obj, options.ResyncPeriod())                                utilruntime.HandleError(err)                                return                        }                }                options.ReplenishmentFunc(options.GroupKind, metaObject.GetNamespace(), nil)        }}

在NewResourceQuotaController中创建replenishmentController时,已经指定了对应的ReplenishmentFunc为rq.replenishQuota,PodReplenishmentUpdateFunc和ObjectReplenishmentDeleteFunc最终都是调用ReplenishmentFunc(rq.replenishQuota)来进行quota recalculated。

pkg/controller/resourcequota/resource_quota_controller.go:330// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculatedfunc (rq *ResourceQuotaController) replenishQuota(groupKind schema.GroupKind, namespace string, object runtime.Object) {        ...        for i := range resourceQuotas {                resourceQuota := resourceQuotas[i].(*v1.ResourceQuota)                internalResourceQuota := &api.ResourceQuota{}                if err := v1.Convert_v1_ResourceQuota_To_api_ResourceQuota(resourceQuota, internalResourceQuota, nil); err != nil {                        glog.Error(err)                        continue                }                resourceQuotaResources := quota.ResourceNames(internalResourceQuota.Status.Hard)                if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {                        // 将该resourceQuota加入到队列queue                        rq.enqueueResourceQuota(resourceQuota)                }        }}

因此replenishmentController就是用来捕获对应资源的Update/Delete事件,将其对应的ResourceQuota加入到queue中,然后worker再对其进行重新计算Usage。

总结

  • Kubernetes Controller Manager在初始化Controllers时执行startResourceQuotaController启动创建ResourceQuotaController并执行其Run方法来启动ResourceQuotaController。

  • ResourceQuotaController中包括两个队列:

    • queue:用来存放待sync和recalculate的ResourceQuota

    • missingUsageQueue:用来存放那些丢失Usage信息的ResourceQuota

  • ResourceQuotaController中有两种Controller:

    • rqController:通过List/Watch对应的资源及变化,根据情况,将ResourceQuota加入到queue和missingUsageQueue。

    • replenishmentControllers:通过监控资源的Update/Delete操作,将ResourceQuota加入到queue。

  • ResourceQuotaController中存在一个Registry对象,用来存放各种资源的Evaluator,包括:

    • PodEvaluator

    • ConfigMapEvaluator

    • PersistentVolumeClaimEvaluator

    • ResourceQuotaEvaluator

    • ReplicationControllerEvaluator

    • ServiceEvaluator

    • SecretEvaluator

  • ResourceQuotaController中的replenishmentControllers包含以下replenishmentController:

    • PodReplenishController

    • ConfigMapReplenishController

    • PersistentVolumeClaimReplenishController

    • ReplicationControllerReplenishController

    • ServiceReplenishController

    • SecretReplenishController

  • ResourceQuotaController中默认存在5个worker对queue中的ResourceQuota Item进行处理。可通过kube-controller-manager的--concurrent-resource-quota-syncs配置。

  • ResourceQuotaController中默认存在5个worker对missingUsageQueue中的ResourceQuota Item进行处理。可通过kube-controller-manager的--concurrent-resource-quota-syncs配置。

  • ResourceQuotaController默认5min会做一次全量的quota usage同步。可通过kube-controller-manager的--resource-quota-sync-period

关于Kubernetes ResourceQuotaController内部实现原理及源码分析是怎样的问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注行业资讯频道了解更多相关知识。

0