千家信息网

如何实例化一个Taint Manager

发表于:2025-02-03 作者:千家信息网编辑
千家信息网最后更新 2025年02月03日,本篇内容主要讲解"如何实例化一个Taint Manager",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何实例化一个Taint Manager"吧!Ne
千家信息网最后更新 2025年02月03日如何实例化一个Taint Manager

本篇内容主要讲解"如何实例化一个Taint Manager",感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习"如何实例化一个Taint Manager"吧!

NewNoExecuteTaintManager

  • PodInformer添加Event Handler时,通过调用taintManager.PodUpdated(oldPod *v1.Pod, newPod *v1.Pod)往tc.podUpdateQueue添加updateItem。

  • NodeInformer添加Event Handler时,通过调用taintManager.NodeUpdated(oldNode *v1.Node, newNode *v1.Node)往tc.nodeUpdateQueue添加updateItem。

  • 当创建NodeController时,如果runTaintManager为true(通过kube-controller-manager的--enable-taint-manager中指定,默认为true),则会通过NewNoExecuteTaintManager来实例化一个Taint Manager。

pkg/controller/node/nodecontroller.go:195func NewNodeController(..) (*NodeController, error) {        ...    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{                AddFunc: func(obj interface{}) {                        ...                        if nc.taintManager != nil {                                nc.taintManager.PodUpdated(nil, pod)                        }                },                ...        }        ...        } else {                nodeEventHandlerFuncs = cache.ResourceEventHandlerFuncs{                        AddFunc: func(originalObj interface{}) {                                ...                                if nc.taintManager != nil {                                        nc.taintManager.NodeUpdated(nil, node)                                }                        },                        ...                }        }        ...        if nc.runTaintManager {                nc.taintManager = NewNoExecuteTaintManager(kubeClient)        }    ...        return nc, nil}

因此,创建NodeController时已经配置了监听pod和node的事件,并会将相关数据发送到tc.podUpdateQueue和tc.nodeUpdateQueue,然后由Taint Manager从中取出数据进行处理。在此之前,我们先来看看NewNoExecuteTaintManager是如何实例化一个Taint Manager的。

pkg/controller/node/taint_controller.go:152func NewNoExecuteTaintManager(c clientset.Interface) *NoExecuteTaintManager {        ...        tm := &NoExecuteTaintManager{                client:            c,                recorder:          recorder,                                // taintedNodes记录每个Node对应的Taint信息。                taintedNodes:      make(map[string][]v1.Taint),                                // nodeUpdateQueue中取出的updateItem会发送到nodeUpdateChannel,Tait Manager从该Channel中取出对应的node update info。                nodeUpdateChannel: make(chan *nodeUpdateItem, nodeUpdateChannelSize),                                // podUpdateQueue中取出的updateItem会发送到podUpdateChannel,Tait Manager从该Channel中取出对应的pod update info。                podUpdateChannel:  make(chan *podUpdateItem, podUpdateChannelSize),                // Node Controller监听到的node update info会发送到nodeUpdateQueue。                nodeUpdateQueue: workqueue.New(),                                // Node Controller监听到的pod update info会发送到podUpdateQueue。                podUpdateQueue:  workqueue.New(),        }                // CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute deletePodHandler.        tm.taintEvictionQueue = CreateWorkerQueue(deletePodHandler(c, tm.emitPodDeletionEvent))        return tm}

相关的代码分析见里面的代码注释。需要强调的是,我们在这里给tm.taintEvictionQueue注册了函数deletePodHandler,用来通过Taint Eviction时删除pod时调用。Taint Manager Run的时候会通过tc.taintEvictionQueue.AddWork()时创建Worker来执行deletePodHandler

func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {        return func(args *WorkArgs) error {                ns := args.NamespacedName.Namespace                name := args.NamespacedName.Name                glog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())                if emitEventFunc != nil {                        emitEventFunc(args.NamespacedName)                }                var err error                                // 按照失败重试5次,每次间隔10s的重试机制,调用apiserver的api删除对应的Pod。                for i := 0; i < retries; i++ {                        err = c.Core().Pods(ns).Delete(name, &metav1.DeleteOptions{})                        if err == nil {                                break                        }                        time.Sleep(10 * time.Millisecond)                }                return err        }}

Run

在Kubernetes Node Controller源码分析之执行篇中提到,在Node Controller Run的时候,如果runTaintManager为true,则会调用nc.taintManager.Run启动Taint Manager loop。

pkg/controller/node/nodecontroller.go:550func (nc *NodeController) Run() {        go func() {                ...                if nc.runTaintManager {                        go nc.taintManager.Run(wait.NeverStop)                }                ...        }()}

接下来,我们来看Taint Manager的Run方法。Node Controller启动的Taint Manager实例其实就是NoExecuteTaintManager,其对应的Run方法代码如下。

pkg/controller/node/taint_controller.go:179// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {        glog.V(0).Infof("Starting NoExecuteTaintManager")                // Functions that are responsible for taking work items out of the workqueues and putting them into channels.        // 从tc.nodeUpdateQueue中获取updateItem,并发送到tc.nodeUpdateChannel。        go func(stopCh <-chan struct{}) {                for {                        item, shutdown := tc.nodeUpdateQueue.Get()                        if shutdown {                                break                        }                        nodeUpdate := item.(*nodeUpdateItem)                        select {                        case <-stopCh:                                break                        case tc.nodeUpdateChannel <- nodeUpdate:                        }                }        }(stopCh)    // 从tc.podUpdateQueue中获取updateItem,并发送到tc.podUpdateChannel。        go func(stopCh <-chan struct{}) {                for {                        item, shutdown := tc.podUpdateQueue.Get()                        if shutdown {                                break                        }                        podUpdate := item.(*podUpdateItem)                        select {                        case <-stopCh:                                break                        case tc.podUpdateChannel <- podUpdate:                        }                }        }(stopCh)        // When processing events we want to prioritize Node updates over Pod updates,        // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -        // we don't want user (or system) to wait until PodUpdate queue is drained before it can        // start evicting Pods from tainted Nodes.        for {                select {                case <-stopCh:                        break                                        // 从tc.nodeUpdateChannel获取nodeUpdate数据,然后invoke tc.handleNodeUpdate进行处理。                case nodeUpdate := <-tc.nodeUpdateChannel:                        tc.handleNodeUpdate(nodeUpdate)                                        // 从tc.podUpdateChannel获取podUpdate数据,在invoke tc.handlePodUpdate进行处理之前,先确保tc.nodeUpdateQueue中的数据已经被处理完。                case podUpdate := <-tc.podUpdateChannel:                                // If we found a Pod update we need to empty Node queue first.                priority:                        for {                                select {                                case nodeUpdate := <-tc.nodeUpdateChannel:                                        tc.handleNodeUpdate(nodeUpdate)                                default:                                        break priority                                }                        }                                                // After Node queue is emptied we process podUpdate.                        tc.handlePodUpdate(podUpdate)                }        }}

可见, Run方法中分别从对应的queue中取出数据,然后调用tc.handleNodeUpdatetc.handlePodUpdate进行处理。

// pkg/controller/node/taint_controller.go:365func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate *nodeUpdateItem) {        // Delete        // 如果nodeUpdate.newNode == nil,则表明该Node被删除了,那么将该Node的Taints信息从tc.taintedNodes缓存中删除。        if nodeUpdate.newNode == nil {                node := nodeUpdate.oldNode                glog.V(4).Infof("Noticed node deletion: %#v", node.Name)                tc.taintedNodesLock.Lock()                defer tc.taintedNodesLock.Unlock()                delete(tc.taintedNodes, node.Name)                return        }                // Create or Update        // 如果是Node Create或者Node Update Event,则更新tc.taintedNodes缓存中记录的该Node的Taints信息。        glog.V(4).Infof("Noticed node update: %#v", nodeUpdate)        node := nodeUpdate.newNode        taints := nodeUpdate.newTaints        func() {                tc.taintedNodesLock.Lock()                defer tc.taintedNodesLock.Unlock()                glog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)                if len(taints) == 0 {                        delete(tc.taintedNodes, node.Name)                } else {                        tc.taintedNodes[node.Name] = taints                }        }()                // 然后,获取该Node上所有pods list。        pods, err := getPodsAssignedToNode(tc.client, node.Name)        if err != nil {                glog.Errorf(err.Error())                return        }        if len(pods) == 0 {                return        }                        // Short circuit, to make this controller a bit faster.        // 如果该Node上的Taints被删除了,则取消所有该node上的pod evictions。        if len(taints) == 0 {                glog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)                for i := range pods {                        tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})                }                return        }    // 否则,调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。        now := time.Now()        for i := range pods {                pod := &pods[i]                podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}                tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)        }}

handleNodeUpdate的逻辑为:

  • 如果nodeUpdate.newNode == nil,则表明该Node被删除了,那么将该Node的Taints信息从tc.taintedNodes缓存中删除。

  • 如果是Node Create或者Node Update Event,则更新tc.taintedNodes缓存中记录的该Node的Taints信息。

    • 获取该Node上所有pods list。

    • 如果该Node上的Taints被删除了,则取消所有该node上的pod evictions。

    • 否则,遍历pods list中的每个pod,分别调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。

// pkg/controller/node/taint_controller.go:334func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate *podUpdateItem) {        // Delete        // 如果podUpdate.newPod == nil,则表明该Pod被删除了,那么取消该Pod Evictions。        if podUpdate.newPod == nil {                pod := podUpdate.oldPod                podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}                glog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)                tc.cancelWorkWithEvent(podNamespacedName)                return        }                // Create or Update        // 如果是Pod Create或者Pod Update Event,则取出该pod的node上的Taints info。        pod := podUpdate.newPod        podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}        glog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)        nodeName := pod.Spec.NodeName        if nodeName == "" {                return        }        taints, ok := func() ([]v1.Taint, bool) {                tc.taintedNodesLock.Lock()                defer tc.taintedNodesLock.Unlock()                taints, ok := tc.taintedNodes[nodeName]                return taints, ok        }()        // It's possible that Node was deleted, or Taints were removed before, which triggered        // eviction cancelling if it was needed.        if !ok {                return        }                // 然后,调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。        tc.processPodOnNode(podNamespacedName, nodeName, podUpdate.newTolerations, taints, time.Now())}

handlePodUpdate的逻辑为:

  • 如果podUpdate.newPod == nil,则表明该Pod被删除了,那么取消该Pod Evictions。

  • 如果是Pod Create或者Pod Update Event,则取出该pod的node上的Taints info。

    • 如果node上的Taints info信息为空,表明Taints info被删除了或者Node被删除了,那么就不需要处理该node上的pod eviction了,流程结束。

    • 否则,调用tc.processPodOnNode根据Node Taints info和Pod Tolerations info处理该Node上的Pod Eviction。

因此,不管是handlePodUpdate还是handleNodeUpdate,最终都是通过processPodOnNode来处理Pod Eviction的。

pkg/controller/node/taint_controller.go:295func (tc *NoExecuteTaintManager) processPodOnNode(        podNamespacedName types.NamespacedName,        nodeName string,        tolerations []v1.Toleration,        taints []v1.Taint,        now time.Time,) {    // 如果该node的taints info为空,则取消Taint Eviction Pods。        if len(taints) == 0 {                tc.cancelWorkWithEvent(podNamespacedName)        }                // 对比node的taints info和pod tolerations info,判断出node的taints是否都能被pod所能容忍。        allTolerated, usedTolerations := v1.GetMatchingTolerations(taints, tolerations)                // 如果不是全部都能容忍,那么调用立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。        if !allTolerated {                glog.V(2).Infof("Not all taints are tolerated after update for Pod %v on %v", podNamespacedName.String(), nodeName)                // We're canceling scheduled work (if any), as we're going to delete the Pod right away.                tc.cancelWorkWithEvent(podNamespacedName)                tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), time.Now(), time.Now())                return        }                // 否则,取pod的所有tolerations的TolerationSeconds的最小值作为minTolerationTime。如果某个Toleration没有设置TolerationSeconds,则表示0,如果设置的值为负数,则用0替代。        minTolerationTime := getMinTolerationTime(usedTolerations)        // getMinTolerationTime returns negative value to denote infinite toleration.        if minTolerationTime < 0 {                glog.V(4).Infof("New tolerations for %v tolerate forever. Scheduled deletion won't be cancelled if already scheduled.", podNamespacedName.String())                return        }        startTime := now        triggerTime := startTime.Add(minTolerationTime)                // 从tc.taintEvictionQueue中获取Worker-scheduledEviction        scheduledEviction := tc.taintEvictionQueue.GetWorkerUnsafe(podNamespacedName.String())                // 如果获取到不为空的scheduledEviction,则判断worker创建时间加上minTolerationTime是否达到触发时间要求,如果没达到,则不进行Taint Pod Eviction,流程结束。        if scheduledEviction != nil {                startTime = scheduledEviction.CreatedAt                if startTime.Add(minTolerationTime).Before(triggerTime) {                        return                } else {                        tc.cancelWorkWithEvent(podNamespacedName)                }        }                // 如果达到触发时间要求,则取消worker,并立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。        tc.taintEvictionQueue.AddWork(NewWorkArgs(podNamespacedName.Name, podNamespacedName.Namespace), startTime, triggerTime)}

processPodOnNode的逻辑为:

  • 如果该node的taints info为空,则取消Taint Eviction Pods。

  • 对比node的taints info和pod tolerations info,判断出node的taints是否都能被pod所能容忍。

  • 如果不是全部都能容忍,那么调用立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。

  • 否则,取pod的所有tolerations的TolerationSeconds的最小值作为minTolerationTime。如果某个Toleration没有设置TolerationSeconds,表示不作驱逐。

    • 如果获取到不为空的scheduledEviction,则判断worker创建时间加上minTolerationTime是否达到触发时间要求,如果没达到,则不进行Taint Pod Eviction,流程结束。

    • 如果达到触发时间要求,则取消worker,并立刻调用AddWork来创建worker,启动tc.taintEvictionQueue注册的deletePodHandler来删除该pod。

    • 如果minTolerationTime小于0,则永远容忍,流程结束。

    • 从tc.taintEvictionQueue中获取Worker-scheduledEviction。

到此,相信大家对"如何实例化一个Taint Manager"有了更深的了解,不妨来实际操作一番吧!这里是网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

0