千家信息网

Kubernetes Endpoints Controller的源码解析

发表于:2024-12-12 作者:千家信息网编辑
千家信息网最后更新 2024年12月12日,本篇内容介绍了"Kubernetes Endpoints Controller的源码解析"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧
千家信息网最后更新 2024年12月12日Kubernetes Endpoints Controller的源码解析

本篇内容介绍了"Kubernetes Endpoints Controller的源码解析"的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!


Endpoints Controller相关的配置项

  • --concurrent-endpoint-syncs int32 Default: 5 The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load.

  • --leader-elect-resource-lock endpoints Default: "endpoints" The type of resource object that is used for locking during leader election. Supported options are endpoints (default) and configmaps.

Endpoints Controller Watch的GVK

  • Core/V1/Pods

  • Core/V1/Services

  • Core/V1/Endpoints

Endpoints Controller Event Handler

  • Add Service Event --> enqueueService

  • Update Service Event --> enqueueService(new)

  • Delete Service Event --> enqueueService

  • Add Pod Event --> addPod

  • Update Pod Event --> updatePod

  • Delete Pod Event --> deletePod

  • Add/Update/Delete Endpoints Event --> nil

Run Endpoints Controller

启动两类go协程:

  • 一类协程数为--concurrent-endpoint-syncs配置值(default 5),每个worker负责从service queue中pop service进行syncService同步,完成一次sync后等待1s再从service queue中pop一个service进行sync,如此反复。

  • 另一类协程只有一个协程,负责checkLeftoverEndpoints,只有启动时会执行一次。

// Run will not return until stopCh is closed. workers determines how many// endpoints will be handled in parallel.func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {        defer utilruntime.HandleCrash()        defer e.queue.ShutDown()        glog.Infof("Starting endpoint controller")        defer glog.Infof("Shutting down endpoint controller")        if !controller.WaitForCacheSync("endpoint", stopCh, e.podsSynced, e.servicesSynced, e.endpointsSynced) {                return        }        // workers = --concurrent-endpoint-syncs's value (default 5)        for i := 0; i < workers; i++ {                // workerLoopPeriod = 1s                go wait.Until(e.worker, e.workerLoopPeriod, stopCh)        }        go func() {                defer utilruntime.HandleCrash()                e.checkLeftoverEndpoints()        }()        <-stopCh}

checkLeftoverEndpoints

checkLeftoverEndpoints负责List所有当前集群中的endpoints并将它们对应的services添加到queue中,由workers进行syncService同步。

这是为了防止在controller-manager发生重启时时,用户删除了某些Services或者某些Endpoints还没删除干净,Endpoints Controller没有处理的情况下,在Endpoints Controller再次启动时能通过checkLeftoverEndpoints检测到那些孤立的endpionts(没有对应services),将虚构的Services重新加入到队列进行syncService操作,从而完成这些孤立endpoint的清理工作。

上面提到的虚构Services其实是把Endpoints的Key(namespace/name)作为Services的Key,因此这就是为什么要求Endpiont和Service的名字要一致的原因之一。

func (e *EndpointController) checkLeftoverEndpoints() {        list, err := e.endpointsLister.List(labels.Everything())        if err != nil {                utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))                return        }        for _, ep := range list {                if _, ok := ep.Annotations[resourcelock.LeaderElectionRecordAnnotationKey]; ok {                        // when there are multiple controller-manager instances,                        // we observe that it will delete leader-election endpoints after 5min                        // and cause re-election                        // so skip the delete here                        // as leader-election only have endpoints without service                        continue                }                key, err := keyFunc(ep)                if err != nil {                        utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))                        continue                }                e.queue.Add(key)        }}

另外,还需要注意一点,对于kube-controller-manager多实例HA部署时,各个contorller-manager endpoints是没有对应service的,这种情况下,我们不能把虚构的Service加入到队列触发这些"理应孤立"的endpoints被清理,因此我们给这些"理应孤立"的endpoints加上Annotation "control-plane.alpha.kubernetes.io/leader"以做跳过处理。

Endpoint Contoller的核心逻辑syncService

Service的Add/Update/Delete Event Handler都是将Service Key加入到Queue中,等待worker进行syncService处理:

  1. 根据queue中得到的service key(namespace/name)去indexer中获取对应的Service Object,如果没获取到,则调api删除同Key(namespace/name)的Endpoints Object进行清理工作,这对应到checkLeftoverEndpoints中描述到的那些孤立endpoints清理工作。

  2. 因为Service是通过LabelSelector进行Pod匹配,将匹配的Pods构建对应的Endpoints Subsets加入到Endpoints中,因此这里会先过滤掉那些没有LabelSelector的Services。

  3. 然后用Service的LabelSelector获取同namespace下的所有Pods。

  4. 检查service.Spec.PublishNotReadyAddresses是否为true,或者Service Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"是否为true(/t/T/True/TRUE/1),如果为true,则表示tolerate Unready Endpoints,即Unready的Pods信息也会被加入该Service对应的Endpoints中。

    注意,Annotations "service.alpha.kubernetes.io/tolerate-unready-endpoints"在Kubernetes 1.13中将被弃用,后续只使用.Spec.PublishNotReadyAddresses Field。

  5. 接下来就是遍历前面获取到的Pods,用各个Pod的IP、ContainerPorts、HostName及Service的Port去构建Endpoints的Subsets,注意如下特殊处理:

    4)当tolerate Unready Endpoints为true(即使Pod not Ready)或者Pod isReady时,Pod对应的EndpointAddress也会被加入到(Ready)Addresses中。

    5)tolerate Unready Endpoints为false且Pod isNotReady情况下:

     - 当pod.Spec.RestartPolicy为Never,Pod Status.Phase为非结束状态(非Failed/Successed)时,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。 - 当pod.Spec.RestartPolicy为OnFailure, Pod Status.Phase为非Successed时,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。 - 其他情况下,Pod对应的EndpointAddress也会被加入到NotReadyAddresses中。


    1. 跳过没有pod.Status.PodIP为空的pod;

    2. 当tolerate Unready Endpoints为false时,跳过那些被标记删除(DeletionTimestamp != nil)的Pods;

    3. 对于Headless Service,因为没有Service Port,因此构建EndpointSubset时对应的Ports内容为空;

  6. 从indexer中获取service对应的Endpoints Object(currentEndpoints),如果从indexer中没有返回对应的Endpoints Object,则构建一个与该Service同名、同Labels的Endpoints对象(newEndpoints)。

  7. 如果currentEndpoints的ResourceVersion不为空,则对比currentEndpoints.Subsets、Labels与前面构建的Subsets、Service.Labels是否DeepEqual,如果是则说明不需要update,流程结束。

  8. 否则,就像currentEndpoints DeepCopy给newEndpoints,并用前面构建的Subsets和Services.Labels替换newEndpoints中对应内容。

  9. 如果currentEndpoints的ResourceVersion为空,则调用Create API去创建上一步的newEndpoints Object。如果currentEndpoints的ResourceVersion不为空,表示已经存在对应的Endpoints,则调用Update API用newEndpoints去更新该Endpoints。

  10. 流程结束。

Pod Event Hanlder

Add Pod

  1. 通过Services LabeleSelector与Pod Labels进行匹配的方法,将该Pod能匹配上的所有Services都找出来,然后将它们的Key(namespace/name)都加入到queue等待sync。

// When a pod is added, figure out what services it will be a member of and// enqueue them. obj must have *v1.Pod type.func (e *EndpointController) addPod(obj interface{}) {        pod := obj.(*v1.Pod)        services, err := e.getPodServiceMemberships(pod)        if err != nil {                utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err))                return        }        for key := range services {                e.queue.Add(key)        }}

Update Pod

  • 如果newPod.ResourceVersion等于oldPod.ResourceVersion,则跳过,不进行任何update。

  • 检查新老Pod的DeletionTimestamp、Ready Condition以及由PodIP,Hostname等建构的EndpointAddress是否发生变更,只要其中之一发生变更,podChangedFlag就为true。

  • 检查新老Pod Spec的Labels、HostName、Subdomain是否发生变更,只要其中之一发生变更,labelChangedFlag就为true。

  • 如果podChangedFlag和labelChangedFlag都为false,则跳过,不做任何update。

  • 通过Services LabeleSelector与Pod Labels进行匹配的方法,将newPod能匹配上的所有Services都找出来(services记录),如果labelChangedFlag为true,则根据LabelSelector匹配找出oldPod对应的oldServices:

    互相差值进行union集合的含义:services.Difference(oldServices).Union(oldServices.Difference(services))

    • 如果podChangedFlag为true,则将services和oldServices进行union集合,将集合内的所有Services Key都加入到queue中等待sync;

    • 如果podChangedFlag为false,则将services和oldServices的互相差值进行union集合,将集合内的所有Services Key都加入到queue中等待sync;

Delete Pod

  • 如果该pod还是个完整记录的pod,则跟addPod逻辑一样:通过Services LabeleSelector与Pod Labels进行匹配的方法,将该Pod能匹配上的所有Services都找出来,然后将它们的Key(namespace/name)都加入到queue等待sync。

  • 如果该pod是tombstone object(final state is unrecorded),则将其转换成v1.pod后,再调用addPod。相比正常的Pod,就是多了一步:从tombstone到v1.pod的转换。

// When a pod is deleted, enqueue the services the pod used to be a member of.// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item.func (e *EndpointController) deletePod(obj interface{}) {        if _, ok := obj.(*v1.Pod); ok {                // Enqueue all the services that the pod used to be a member                // of. This happens to be exactly the same thing we do when a                // pod is added.                e.addPod(obj)                return        }        // If we reached here it means the pod was deleted but its final state is unrecorded.        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)        if !ok {                utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))                return        }        pod, ok := tombstone.Obj.(*v1.Pod)        if !ok {                utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a Pod: %#v", obj))                return        }        glog.V(4).Infof("Enqueuing services of deleted pod %s/%s having final state unrecorded", pod.Namespace, pod.Name)        e.addPod(pod)}

核心Struct

里面有几个struct,挺容易混淆的,简单用图表示下,方便比对:

总结

通过对Endpoints Controller的源码分析,我们了解了其中很多细节,比如对Service和Pod事件处理逻辑、对孤立Pod的处理方法、Pod Labels变更带来的影响等等,这对我们通过Watch Endpoints去写自己的Ingress组件对接公司内部的路由组件时是有帮助的。

"Kubernetes Endpoints Controller的源码解析"的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注网站,小编将为大家输出更多高质量的实用文章!

处理 情况 内容 方法 源码 就是 逻辑 检查 虚构 接下来 只有 差值 更多 核心 流程 知识 组件 队列 同步 工作 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 ado接口如何连接数据库 红河互联网科技 网络安全演讲比赛英文报告 酒店网络安全方案设计 黑龙江专业网络技术服务产品介绍 网络安全手抄报设计色彩 火线精英ol显示尝试登录服务器 局域网服务器如何在异地链接 关系数据库中主键是啥 数据库实验报告 华科 网站搭建一定要服务器吗 ppas数据库搭建ogg 虹口区媒体数据库服务商行业 服务器主频怎么看 智能工业网络技术是什么 流放者柯南服务器怎么连接 奉贤中学网络安全告家长书 怎么看论文所属数据库 大学网络安全与防范措施 阿里云主机数据库 焦点访谈关于网络安全的节目 成都网络安全儿童画 员工管理系统数据库 网络安全的表述正确的是 汕尾自主可控软件开发零售价 下载巅峰坦克加载服务器资源失败 服务器hba卡回收价格 服务器再美国 学习java用什么数据库 计算机为什么是服务器工作站
0