千家信息网

怎么创建Node Controller

发表于:2024-12-01 作者:千家信息网编辑
千家信息网最后更新 2024年12月01日,本篇内容主要讲解"怎么创建Node Controller",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"怎么创建Node Controller"吧!NewN
千家信息网最后更新 2024年12月01日怎么创建Node Controller

本篇内容主要讲解"怎么创建Node Controller",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"怎么创建Node Controller"吧!

NewNodeController入口

Controller Manager在启动时,会启动一系列的Controller,Node Controller也是在Controller Manager启动时StartControllers方法中启动的Controller之一,其对应的创建代码如下。

cmd/kube-controller-manager/app/controllermanager.go:455nodeController, err := nodecontroller.NewNodeController(                        sharedInformers.Core().V1().Pods(),                        sharedInformers.Core().V1().Nodes(),                        sharedInformers.Extensions().V1beta1().DaemonSets(),                        cloud,                        clientBuilder.ClientOrDie("node-controller"),                        s.PodEvictionTimeout.Duration,                        s.NodeEvictionRate,                        s.SecondaryNodeEvictionRate,                        s.LargeClusterSizeThreshold,                        s.UnhealthyZoneThreshold,                        s.NodeMonitorGracePeriod.Duration,                        s.NodeStartupGracePeriod.Duration,                        s.NodeMonitorPeriod.Duration,                        clusterCIDR,                        serviceCIDR,                        int(s.NodeCIDRMaskSize),                        s.AllocateNodeCIDRs,                        s.EnableTaintManager,                        utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),                )

可见,Node Controller主要是ListWatch sharedInformers中的如下对象:

  • Pods

  • Nodes

  • DaemonSets

另外,需要注意:

  • s.EnableTaintManager的默认值为true,即表示默认开启Taint Manager,可通过--enable-taint-manager进行设置。

  • DefaultFeatureGate.Enabled(features.TaintBasedEvictions)的默认值为false,可通过--feature-gates中添加TaintBasedEvictions=true修改为true,true即表示Node上的Pods Eviction Operation通过TaintManager来进行。

补充:关于Kubernetes的Default FeaturesGate的设置见如下代码:

pkg/features/kube_features.go:100var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureSpec{        ExternalTrafficLocalOnly:                    {Default: true, PreRelease: utilfeature.Beta},        AppArmor:                                    {Default: true, PreRelease: utilfeature.Beta},        DynamicKubeletConfig:                        {Default: false, PreRelease: utilfeature.Alpha},        DynamicVolumeProvisioning:                   {Default: true, PreRelease: utilfeature.Alpha},        ExperimentalHostUserNamespaceDefaultingGate: {Default: false, PreRelease: utilfeature.Beta},        ExperimentalCriticalPodAnnotation:           {Default: false, PreRelease: utilfeature.Alpha},        AffinityInAnnotations:                       {Default: false, PreRelease: utilfeature.Alpha},        Accelerators:                                {Default: false, PreRelease: utilfeature.Alpha},        TaintBasedEvictions:                         {Default: false, PreRelease: utilfeature.Alpha},        // inherited features from generic apiserver, relisted here to get a conflict if it is changed        // unintentionally on either side:        StreamingProxyRedirects: {Default: true, PreRelease: utilfeature.Beta},}

NewNodeController定义

func NewNodeController(        podInformer coreinformers.PodInformer,        nodeInformer coreinformers.NodeInformer,        daemonSetInformer extensionsinformers.DaemonSetInformer,        cloud cloudprovider.Interface,        kubeClient clientset.Interface,        podEvictionTimeout time.Duration,        evictionLimiterQPS float32,        secondaryEvictionLimiterQPS float32,        largeClusterThreshold int32,        unhealthyZoneThreshold float32,        nodeMonitorGracePeriod time.Duration,        nodeStartupGracePeriod time.Duration,        nodeMonitorPeriod time.Duration,        clusterCIDR *net.IPNet,        serviceCIDR *net.IPNet,        nodeCIDRMaskSize int,        allocateNodeCIDRs bool,        runTaintManager bool,        useTaintBasedEvictions bool) (*NodeController, error) {                        ...                nc := &NodeController{                cloud:                           cloud,                knownNodeSet:                    make(map[string]*v1.Node),                kubeClient:                      kubeClient,                recorder:                        recorder,                podEvictionTimeout:              podEvictionTimeout,                maximumGracePeriod:              5 * time.Minute,    // 不可配置,表示"The maximum duration before a pod evicted from a node can be forcefully terminated"                zonePodEvictor:                  make(map[string]*RateLimitedTimedQueue),                zoneNotReadyOrUnreachableTainer: make(map[string]*RateLimitedTimedQueue),                nodeStatusMap:                   make(map[string]nodeStatusData),                nodeMonitorGracePeriod:          nodeMonitorGracePeriod,                nodeMonitorPeriod:               nodeMonitorPeriod,                nodeStartupGracePeriod:          nodeStartupGracePeriod,                lookupIP:                        net.LookupIP,                now:                             metav1.Now,                clusterCIDR:                     clusterCIDR,                serviceCIDR:                     serviceCIDR,                allocateNodeCIDRs:               allocateNodeCIDRs,                forcefullyDeletePod:             func(p *v1.Pod) error { return forcefullyDeletePod(kubeClient, p) },                nodeExistsInCloudProvider:       func(nodeName types.NodeName) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },                evictionLimiterQPS:              evictionLimiterQPS,                secondaryEvictionLimiterQPS:     secondaryEvictionLimiterQPS,                largeClusterThreshold:           largeClusterThreshold,                unhealthyZoneThreshold:          unhealthyZoneThreshold,                zoneStates:                      make(map[string]zoneState),                runTaintManager:                 runTaintManager,                useTaintBasedEvictions:          useTaintBasedEvictions && runTaintManager,        }                ...                // 注册enterPartialDisruptionFunc函数为ReducedQPSFunc,当zone state为"PartialDisruption"时,将invoke ReducedQPSFunc来setLimiterInZone。        nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc                // 注册enterFullDisruptionFunc函数为HealthyQPSFunc,当zone state为"FullDisruption"时,将invoke HealthyQPSFunc来setLimiterInZone。        nc.enterFullDisruptionFunc = nc.HealthyQPSFunc                // 注册computeZoneStateFunc函数为ComputeZoneState,当handleDisruption时,将invoke ComputeZoneState来计算集群中unhealthy node number及zone state。        nc.computeZoneStateFunc = nc.ComputeZoneState                        // 注册PodInformer的Event Handler:Add,Update,Delete。podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{        // 对于Pod Add和Update Event,都会去判断Node上kubelet的version,如果version低于1.1.0,则会通过forcefullyDeletePod直接调用apiserver接口删除etcd中该Pod object。        // 对于Pod Add, Update, Delete Event,如果启动了TaintManager,则会对比OldPod和newPod的Tolerations信息,如果不相同,则会将该Pod的变更信息Add到NoExecuteTaintManager的podUpdateQueue中,交给Taint Controller处理。只不过对于Delete Event,newPod 为nil。                AddFunc: func(obj interface{}) {                        nc.maybeDeleteTerminatingPod(obj)                        pod := obj.(*v1.Pod)                        if nc.taintManager != nil {                                nc.taintManager.PodUpdated(nil, pod)                        }                },                UpdateFunc: func(prev, obj interface{}) {                        nc.maybeDeleteTerminatingPod(obj)                        prevPod := prev.(*v1.Pod)                        newPod := obj.(*v1.Pod)                        if nc.taintManager != nil {                                nc.taintManager.PodUpdated(prevPod, newPod)                        }                },                DeleteFunc: func(obj interface{}) {                        pod, isPod := obj.(*v1.Pod)                        // We can get DeletedFinalStateUnknown instead of *v1.Node here and we need to handle that correctly. #34692                        if !isPod {                                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)                                if !ok {                                        glog.Errorf("Received unexpected object: %v", obj)                                        return                                }                                pod, ok = deletedState.Obj.(*v1.Pod)                                if !ok {                                        glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)                                        return                                }                        }                        if nc.taintManager != nil {                                nc.taintManager.PodUpdated(pod, nil)                        }                },        })                // returns true if the shared informer's store has synced.        nc.podInformerSynced = podInformer.Informer().HasSynced                        // 注册NodeInformer的Event Handler:Add,Update,Delete。        nodeEventHandlerFuncs := cache.ResourceEventHandlerFuncs{}        if nc.allocateNodeCIDRs {           // --allocate-node-cidrs -- Should CIDRs for Pods be allocated and set on the cloud provider。                ...        } else {                nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs{                                  // 对于Node Add, Update, Delete Event,如果启动了TaintManager,则会对比OldNode和newNode的Taints信息,如果不相同,则会将该Node的变更信息Add到NoExecuteTaintManager的nodeUpdateQueue中,交给Taint Controller处理。只不过对于Delete Event,newNode 为nil。                        AddFunc: func(originalObj interface{}) {                                obj, err := api.Scheme.DeepCopy(originalObj)                                if err != nil {                                        utilruntime.HandleError(err)                                        return                                }                                node := obj.(*v1.Node)                                if nc.taintManager != nil {                                        nc.taintManager.NodeUpdated(nil, node)                                }                        },                        UpdateFunc: func(oldNode, newNode interface{}) {                                node := newNode.(*v1.Node)                                prevNode := oldNode.(*v1.Node)                                if nc.taintManager != nil {                                        nc.taintManager.NodeUpdated(prevNode, node)                                }                        },                        DeleteFunc: func(originalObj interface{}) {                                obj, err := api.Scheme.DeepCopy(originalObj)                                if err != nil {                                        utilruntime.HandleError(err)                                        return                                }                                node, isNode := obj.(*v1.Node)                                // We can get DeletedFinalStateUnknown instead of *v1.Node here and we need to handle that correctly. #34692                                if !isNode {                                        deletedState, ok := obj.(cache.DeletedFinalStateUnknown)                                        if !ok {                                                glog.Errorf("Received unexpected object: %v", obj)                                                return                                        }                                        node, ok = deletedState.Obj.(*v1.Node)                                        if !ok {                                                glog.Errorf("DeletedFinalStateUnknown contained non-Node object: %v", deletedState.Obj)                                                return                                        }                                }                                if nc.taintManager != nil {                                        nc.taintManager.NodeUpdated(node, nil)                                }                        },                }        }                // 注册NoExecuteTaintManager为taintManager。        if nc.runTaintManager {                nc.taintManager = NewNoExecuteTaintManager(kubeClient)        }        nodeInformer.Informer().AddEventHandler(nodeEventHandlerFuncs)        nc.nodeLister = nodeInformer.Lister()                // returns true if the shared informer's nodeStore has synced.        nc.nodeInformerSynced = nodeInformer.Informer().HasSynced                // returns true if the shared informer's daemonSetStore has synced.        nc.daemonSetStore = daemonSetInformer.Lister()        nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced        return nc, nil        

因此,创建NodeController实例时,主要进行了如下工作:

  • maximumGracePeriod - The maximum duration before a pod evicted from a node can be forcefully terminated. 不可配置,代码中写死为5min。

  • 注册enterPartialDisruptionFunc函数为ReducedQPSFunc,当zone state为"PartialDisruption"时,将invoke ReducedQPSFuncsetLimiterInZone

  • 注册enterFullDisruptionFunc函数为HealthyQPSFunc,当zone state为"FullDisruption"时,将invoke HealthyQPSFuncsetLimiterInZone

  • 注册computeZoneStateFunc函数为ComputeZoneState,当handleDisruption时,将invoke ComputeZoneState来计算集群中unhealthy node number及zone state。

  • 注册**PodInformer**的Event Handler:Add,Update,Delete。

    • 对于Pod Add和Update Event,都会去判断Node上kubelet version,如果version低于1.1.0,则会通过forcefullyDeletePod直接调用apiserver接口删除etcd中该Pod object。

    • 对于Pod Add, Update, Delete Event,如果启动了TaintManager,则会对比OldPod和newPod的Tolerations信息,如果不相同,则会将该Pod的变更信息Add到NoExecuteTaintManager的**podUpdateQueue**中,交给Taint Controller处理。只不过对于Delete Event,newPod 为nil。

  • 注册PodInformerSynced,用来检查the shared informer's Podstore 是否已经synced.

  • 注册**NodeInformer**的Event Handler:Add,Update,Delete。

    • 对于Node Add, Update, Delete Event,如果启动了TaintManager,则会对比OldNode和newNode的Taints信息,如果不相同,则会将该Node的变更信息Add到NoExecuteTaintManagernodeUpdateQueue中,交给Taint Controller处理。只不过对于Delete Event,newNode 为nil。

  • 注册NoExecuteTaintManager为taintManager。

  • 注册NodeInformerSynced,用来检查the shared informer's NodeStore 是否已经synced.

  • 注册DaemonSetInformerSynced,用来检查the shared informer's DaemonSetStore 是否已经synced.

关于ZoneState

上面提到ZoneState,关于ZoneState是怎么来的,见如下代码:

pkg/api/v1/types.go:3277const (        // NodeReady means kubelet is healthy and ready to accept pods.        NodeReady NodeConditionType = "Ready"        // NodeOutOfDisk means the kubelet will not accept new pods due to insufficient free disk        // space on the node.        NodeOutOfDisk NodeConditionType = "OutOfDisk"        // NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory.        NodeMemoryPressure NodeConditionType = "MemoryPressure"        // NodeDiskPressure means the kubelet is under pressure due to insufficient available disk.        NodeDiskPressure NodeConditionType = "DiskPressure"        // NodeNetworkUnavailable means that network for the node is not correctly configured.        NodeNetworkUnavailable NodeConditionType = "NetworkUnavailable"        // NodeInodePressure means the kubelet is under pressure due to insufficient available inodes.        NodeInodePressure NodeConditionType = "InodePressure")pkg/controller/node/nodecontroller.go:1149// This function is expected to get a slice of NodeReadyConditions for all Nodes in a given zone.// The zone is considered:// - fullyDisrupted if there're no Ready Nodes,// - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,// - normal otherwisefunc (nc *NodeController) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, zoneState) {        readyNodes := 0        notReadyNodes := 0        for i := range nodeReadyConditions {                if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue {                        readyNodes++                } else {                        notReadyNodes++                }        }        switch {        case readyNodes == 0 && notReadyNodes > 0:                return notReadyNodes, stateFullDisruption        case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:                return notReadyNodes, statePartialDisruption        default:                return notReadyNodes, stateNormal        }}

zone state共分为如下三种类型:

  • FullDisruption:Ready状态的Nodes number为0,并且NotReady状态的Nodes number大于0。

  • PartialDisruption:NotReady状态的Nodes number大于2,并且notReadyNodes/(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold,其中nc.unhealthyZoneThreshold通过--unhealthy-zone-threshold设置,默认为0.55。

  • Normal:除了以上两种zone state,其他都属于Normal状态。

到此,相信大家对"怎么创建Node Controller"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0