如何通过源码分析Informer机制








  • client-go:

    • Reflector:主要对接Kubernetes的APIServer,依托ListWatch实现数据从ETCD中同步到本地缓存(Delta Fifo queue)中。

    • Informer:用于把本地缓存的数据构建索引及调用事先注册好的ResourceEventHandler。

    • Indexer:用于构建索引,底层采用一个线程安全的Map存储。每个资源默认的Key为/

  • Custom Controller

    • Workqueue是一个去重队列,内部除了items列表外还带有processing和dirty set记录.

    • 同一个资源对象的多次事件触发,入队列后会去重;

    • 同一个资源对象不会被多个worker同时处理。详细可见Learning Concurrent Reconciling controller对资源对象的查询都应该从Informer中查cache,而不是直接调用kube-apiserver查询。

    • Informer reference : 编写自定义Controller时,需要创建一个关注自已资源的Informer对象。

    • ResourceEventHandler: 用于注册相关的事件,待有数据时,Informer会进行相关的回调。

    • ProcessItem: 通过Workqueue去数据,并通过下发给Handler进行处理。

    • Workqueue:工作队列工具类,每个controller都需要有一个工作队列。从event handler触发的事件会先放入工作队列,然后由controller的ProcessItem取出来处理。


  • Client-go

    • HandleDeltas中会进行调用Indexer进行索引构建,并最终存储在本地的一个线程安全的Map中

    • 之后,会进行该事件的分发,通知所有的listener进行调用用户注册好的ResourceEventHandler进行处理。

    • Reflector首先通过List进行全量数据同步,由ETCD到本地的Delta Fifo queue中。Reflector是最终和Kubernetes APIServer建立连接的。

    • Reflector其次再通过最新的ResourceVersion进行Watch数据,此时若有未同步到的数据,将进行补齐(因List完成之后,可能存在新数据的增加,因此可能存在遗漏)。

    • 启动自定义控制器时,通过Informer调用Reflector执行List&Watch进行数据同步及注册观察事件。

    • 当用户创建了一个自定义资源时,会被Reflector的Watch观察到,并放入本地的Delta缓存中。

    • Informer通过chche中的Controller定时(1s)调用processLoop方法,并Pop出队列(Delta)中的数据,交给Informer的HandleDeltas处理;

  • Custom Controller

    • 自定的ResourceEventHandler中会进行相关过滤处理,并最终加入到Workqueue中,该工作队列只存储KEY,不会存储具体对象

    • 一旦加入后,ProcessItem方案就会Pop出数据,并交给Handle Object方法进行处理

    • Handle Object会根据Key调用Indexer reference获取到具体对象,并开始处理业务。注意:这里一定要通过缓存去数据,不要去直接调用Kubernetes的API,否则会影响性能。


  • main方法中初始化相关结构及启动Informer

     //https://github.com/kubernetes/sample-controller/blob/master/main.go#L62                kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)        exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)        controller := NewController(kubeClient, exampleClient,                kubeInformerFactory.Apps().V1().Deployments(),                exampleInformerFactory.Samplecontroller().V1alpha1().Foos())        // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)        // Start method is non-blocking and runs all registered informers in a dedicated goroutine.        kubeInformerFactory.Start(stopCh)        exampleInformerFactory.Start(stopCh)        if err = controller.Run(2, stopCh); err != nil {                klog.Fatalf("Error running controller: %s", err.Error())        }


  • Client-go部分,构建SharedInformerFactory并启动,实现ListAndWatch,第一部分解析

  • 自定义Controller部分,内部主要业务为等待事件,并做响应,第二部分解析



  • 调用Start方法初始化请求的Informer

// Start initializes all requested informers.func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {        f.lock.Lock()        defer f.lock.Unlock()        for informerType, informer := range f.informers {                if !f.startedInformers[informerType] {                        go informer.Run(stopCh)                        f.startedInformers[informerType] = true                }        }}
  • 调用Run正式开始启动相关业务

//client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:189func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {        defer utilruntime.HandleCrash()        fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)        cfg := &Config{                //注册DeltaFIFO队列                Queue:            fifo,                //注册listerWatcher,后续会和APIServer建立连接                ListerWatcher:    s.listerWatcher,                ObjectType:       s.objectType,                FullResyncPeriod: s.resyncCheckPeriod,                RetryOnError:     false,                //检查是否需要进行Resync,该方法会把需要Resync的listener加入到需要同步的队列中                ShouldResync:     s.processor.shouldResync,                //这里先注册用于构建索引和分发事件的方法                Process: s.HandleDeltas,        }        ......        //启动用于缓存比较的方法        wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)        //启动用于接收事件消息并回调用户注册的ResourceEventHandler        wg.StartWithChannel(processorStopCh, s.processor.run)        ......        //运行内部的Controller        s.controller.Run(stopCh)}
  • 启动内置的Controller

// Run begins processing items, and will continue until a value is sent down stopCh.// It's an error to call Run more than once.// Run blocks; call via go.func (c *controller) Run(stopCh <-chan struct{}) {        ......   //调用reflector中的Run,进行启动ListAndWatch,同APIServer建立连接   //client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:219   wg.StartWithChannel(stopCh, r.Run)        //启动定时器,每秒运行一次,用于调用processLoop进行读取数据        wait.Until(c.processLoop, time.Second, stopCh)}
  • ListAndWatch,实现同APIServer建立连接

        //k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/reflector.go:121        func (r *Reflector) Run(stopCh <-chan struct{}) {                wait.Until(func() {                        if err := r.ListAndWatch(stopCh); err != nil {                                utilruntime.HandleError(err)                        }                }, r.period, stopCh)        }
           //k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/reflector.go:159        // ListAndWatch first lists all items and get the resource version at the moment of call,        // and then use the resource version to watch.        // It returns error if ListAndWatch didn't even try to initialize watch.        func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {                ......                // Explicitly set "0" as resource version - it's fine for the List()                // to be served from cache and potentially be delayed relative to                // etcd contents. Reflector framework will catch up via Watch() eventually.                options := metav1.ListOptions{ResourceVersion: "0"}                if err := func() error {                        ......                        go func() {                                .....                                // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first                                // list request will return the full response.                                pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {                                        //这里是真正同Kubernetes连接并获取数据的地方                                        return r.listerWatcher.List(opts)                                }))                                ......                                //这里会执行list,获取数据                                list, err = pager.List(context.Background(), options)                                close(listCh)                        }()                        ......                        //解析数据类型                        listMetaInterface, err := meta.ListAccessor(list)                        ......                        resourceVersion = listMetaInterface.GetResourceVersion()                        //抽取list中的数据                        items, err := meta.ExtractList(list)                        ......                        //通过list到的数据进行数据全量本地队列(Delta FIFO Queue)数据替换                        if err := r.syncWith(items, resourceVersion); err != nil {                                return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)                        }                        initTrace.Step("SyncWith done")                        r.setLastSyncResourceVersion(resourceVersion)                        initTrace.Step("Resource version updated")                        return nil                }(); err != nil {                        return err                }                ......                for {                        ......                        //获取Watch对象                        w, err := r.listerWatcher.Watch(options)                        ......                        //开始处理Watch到的数据                        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {                                ......                        }                }        }
           //k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/reflector.go:319        // watchHandler watches w and keeps *resourceVersion up to date.        func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {        ......        loop:                for {                        select {                        case <-stopCh:                                return errorStopRequested                        case err := <-errc:                                return err                        case event, ok := <-w.ResultChan(): //这里当Watch到数据后,会触发该CHANEL                                ......                                // 通过得到的事件对象,访问到具体的数据                                meta, err := meta.Accessor(event.Object)                                ......                                newResourceVersion := meta.GetResourceVersion()                                //根据获取到的事件类型,触发相应动作                                switch event.Type {                                case watch.Added:                                        err := r.store.Add(event.Object)                                        ......                                case watch.Modified:                                        err := r.store.Update(event.Object)                                        ......                                case watch.Deleted:                                        // TODO: Will any consumers need access to the "last known                                        // state", which is passed in event.Object? If so, may need                                        // to change this.                                        err := r.store.Delete(event.Object)                                        ......                                case watch.Bookmark:                                        // A `Bookmark` means watch has synced here, just update the resourceVersion                                default:                                        utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))                                }                                *resourceVersion = newResourceVersion                                //设置最新需要Watch的版本                                r.setLastSyncResourceVersion(newResourceVersion)                                eventCount++                        }                }                ......        }

    通过以上步骤,实现了Kubernetes存储在ETCD中的数据到Controller本地缓存中的过程。接下来就需要对存储在Delta FIFO Queue中的数据进行处理的过程。

    • 处理Watch到的数据

    • 执行List和Watch逻辑

    • 定时调用ListAndWatch

  • 处理Delta中的数据,建立索引及分发事件

        func (c *controller) processLoop() {                for {                        //读取Delta中的数据并调用之前设置好的方法HandleDelta,进行业务处理                        //vendor/k8s.io/client-go/tools/cache/controller.go:150                        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))                        ......                }        }
           //client-go@v2.0.0-alpha.0.0.20180910083459-2cefa64ff137+incompatible/tools/cache/shared_informer.go:344        func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {                s.blockDeltas.Lock()                defer s.blockDeltas.Unlock()                // from oldest to newest                for _, d := range obj.(Deltas) {                        switch d.Type {                        //根据事件的类型进行相关的事件分类下发                        case Sync, Added, Updated:                                isSync := d.Type == Sync                                s.cacheMutationDetector.AddObject(d.Object)                                //到索引中先查询是否有数据                                if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {                                        //若存在数据,则更新索引数据                                        if err := s.indexer.Update(d.Object); err != nil {                                                return err                                        }                                        //给listener分发更新事件                                        s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)                                } else {                                        //若没有数据,则直接添加新数据到索引中去                                        if err := s.indexer.Add(d.Object); err != nil {                                                return err                                        }                                        //给listener分发添加事件                                        s.processor.distribute(addNotification{newObj: d.Object}, isSync)                                }                        case Deleted:                                //若是删除类型,则先删除索引                                if err := s.indexer.Delete(d.Object); err != nil {                                        return err                                }                                //给listener分发删除事件                                s.processor.distribute(deleteNotification{oldObj: d.Object}, false)                        }                }                return nil        }


            //k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:453        func (p *sharedProcessor) distribute(obj interface{}, sync bool) {                ......           //这里通过分发到相应的listener中                if sync {                        for _, listener := range p.syncingListeners {                                listener.add(obj)                        }                } else {                        for _, listener := range p.listeners {                                listener.add(obj)                        }                }        }        //触发add的CHANEL,实现对接到用户定义的ResourceEventHandler中        func (p *processorListener) add(notification interface{}) {                p.addCh <- notification        }


          //https://github.com/kubernetes/sample-controller/blob/master/controller.go#L116        //这里调用Informer的AddEventHandler方法进行注册ResourceEventHandler,并添加入队方式        fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{                AddFunc: controller.enqueueFoo,                UpdateFunc: func(old, new interface{}) {                        //这里向工作队列中加入数据,同时这里可以做一些过滤操作                        controller.enqueueFoo(new)                },        })        //加入到工作队列中        func (c *Controller) enqueueFoo(obj interface{}) {                ......                //把收到的Key加入到工作队列中                c.workqueue.Add(key)        }
           //k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:326        func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {                s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)        }        //k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:347        func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {                ......                //构建ProcessListener对象                listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)                if !s.started {                        //注册listener                        s.processor.addListener(listener)                        return                }                ......                //若已经处于启动状态下,则还需要添加事件消息给该listener,用于及时处理消息                s.processor.addListener(listener)                for _, item := range s.indexer.List() {                        listener.add(addNotification{newObj: item})                }        }        //k8s.io/client-go@v0.0.0-20190918200256-06eb1244587a/tools/cache/shared_informer.go:437        func (p *sharedProcessor) addListener(listener *processorListener) {                ......                if p.listenersStarted {                        //运行listener开始处理收到的数据,比如回调用户定义的EventHandler                        // 定时调用用户的Handler进行处理                        p.wg.Start(listener.run)                        p.wg.Start(listener.pop)                }        }

    • 添加用户的事件,并开始处理收到的数据

    • 自定义控制器中注册ResourceEventHandler

    • 事件分发

    • HandleDelta对读取到的事件进行处理

    • 通过processLoop读取Delta中的数据

Custom Controller 代码分析


  • 自定义Controller定义定时器执行业务

//https://github.com/kubernetes/sample-controller/blob/7e92736cc38f37632d2b53e31b9a966e7a91c24a/controller.go#L150func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {                ......        // Wait for the caches to be synced before starting workers        if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {                return fmt.Errorf("failed to wait for caches to sync")        }        klog.Info("Starting workers")        // Launch two workers to process Foo resources        for i := 0; i < threadiness; i++ {                go wait.Until(c.runWorker, time.Second, stopCh)        }}        // runWorker is a long-running function that will continually call the// processNextWorkItem function in order to read and process a message on the// workqueue.func (c *Controller) runWorker() {        //死循环,执行业务逻辑        for c.processNextWorkItem() {        }}       
  • 从队列中取出数据,并进行调用syncHandler方法进行处理,处理完毕后从工作队列中删除

//https://github.com/kubernetes/sample-controller/blob/7e92736cc38f37632d2b53e31b9a966e7a91c24a/controller.go#L186func (c *Controller) processNextWorkItem() bool {        //读取工作队列中的数据,在之前通过用户定义的ResourceEventHandler已经加入到了工作队列中,这里区出做处理        obj, shutdown := c.workqueue.Get()        if shutdown {                return false        }        // We wrap this block in a func so we can defer c.workqueue.Done.        err := func(obj interface{}) error {                // We call Done here so the workqueue knows we have finished                // processing this item. We also must remember to call Forget if we                // do not want this work item being re-queued. For example, we do                // not call Forget if a transient error occurs, instead the item is                // put back on the workqueue and attempted again after a back-off                // period.                defer c.workqueue.Done(obj)                var key string                var ok bool                // We expect strings to come off the workqueue. These are of the                // form namespace/name. We do this as the delayed nature of the                // workqueue means the items in the informer cache may actually be                // more up to date that when the item was initially put onto the                // workqueue.                if key, ok = obj.(string); !ok {                        // As the item in the workqueue is actually invalid, we call                        // Forget here else we'd go into a loop of attempting to                        // process a work item that is invalid.                        c.workqueue.Forget(obj)                        utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))                        return nil                }                // Run the syncHandler, passing it the namespace/name string of the                // Foo resource to be synced.                if err := c.syncHandler(key); err != nil {                        // Put the item back on the workqueue to handle any transient errors.                        c.workqueue.AddRateLimited(key)                        return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())                }                // Finally, if no error occurs we Forget this item so it does not                // get queued again until another change happens.                c.workqueue.Forget(obj)                klog.Infof("Successfully synced '%s'", key)                return nil        }(obj)        if err != nil {                utilruntime.HandleError(err)                return true        }        return true}
// syncHandler compares the actual state with the desired, and attempts to// converge the two. It then updates the Status block of the Foo resource// with the current status of the resource.func (c *Controller) syncHandler(key string) error {        .....}




  • main方法中构建Informer对象并启动,同时启动自己的Controller,主要逻辑为轮询去工作队列中取数据,并做处理,若无数据,则会阻塞在取数据的地方。

  • Informer构建,主要步骤如下

    • 调用reflector进行ListAndWatch,主要是首次获取全量的数据(List)及监听所有需要关注资源的最新版本(Watch)存储到Delta FIFO Queue中。

    • 调用内置controller从Delta中取出数据并构建数据索引及分发消息给用户注册的ResourceEventHandler中;

  • 自定义ResourceEventHandler中根据事件类型进行处理(如过滤)后,再加入到自定义Controller的工作队列中;

  • 当加入到工作队列中后,自定义Controller中的轮询取数据的地方就会继续,取出数据,处理,成功后删除该数据。
