Kubernetes Informer基本原理,你明白了吗?

本文分析 k8s controller 中 informer 启动的基本流程
不论是 k8s 自身组件 , 还是自己编写 controller , 都需要通过 apiserver 监听 etcd 事件来完成自己的控制循环逻辑 。
如何高效可靠进行事件监听,k8s 客户端工具包 client-go 提供了一个通用的 informer 包,通过 informer , 可以方便和高效的进行 controller 开发 。
informer 包提供了如下的一些功能:
1、本地缓存(store)
2、索引机制(indexer)
3、Handler 注册功能(eventHandler)
1、informer 架构整个 informer 机制架构如下图(图片源自 Client-go):

Kubernetes Informer基本原理,你明白了吗?

文章插图
图片
可以看到这张图分为上下两个部分,上半部分由 client-go 提供,下半部分则是需要自己实现的控制循环逻辑
本文主要分析上半部分的逻辑 , 包括下面几个组件:
1.1、Reflector:从图上可以看到 Reflector 是一个和 apiserver 交互的组件,通过 list 和 watch api 将资源对象压入队列
1.2、DeltaFifo:DeltaFifo的结构体示意如下:
type DeltaFIFO struct {...// We depend on the property that items in the set are in// the queue and vice versa, and that all Deltas in this// map have at least one Delta.items map[string]Deltasqueue []string...}主要分为两部分,fifo 和 delta
(1)fifo:先进先出队列
对应结构体中的 queue,结构体示例如下:
【Kubernetes Informer基本原理,你明白了吗?】[default/centos-fd77b5886-pfrgn, xxx, xxx](2)delta:对应结构体中的items,存储了资源对象并且携带了资源操作类型的一个 map,结构体示例如下:
map:{"default/centos-fd77b5886-pfrgn":[{Replaced &Pod{ObjectMeta: ${pod参数}], "xxx": [{},{}]}消费者从 queue 中 pop 出对象进行消费,并从 items 获取具体的消费操作(执行动作 Update/Deleted/Sync,和执行的对象 object spec)
1.3、Indexer:client-go 用来存储资源对象并自带索引功能的本地存储,deltaFIFO 中 pop 出的对象将存储到 Indexer 。
indexer 与 etcd 集群中的数据保持一致,从而 client-go 可以直接从本地缓存获取资源对象,减少 apiserver 和 etcd 集群的压力 。
2、一个基本例子func mAIn() {stopCh := make(chan struct{})defer close(stopCh)// (1)New a k8s clientsetmasterUrl := "172.27.32.110:8080"config, err := clientcmd.BuildConfigFromFlags(masterUrl, "")if err != nil {klog.Errorf("BuildConfigFromFlags err, err: %v", err)}clientset, err := k.NewForConfig(config)if err != nil {klog.Errorf("Get clientset err, err: %v", err)}// (2)New a sharedInformers factorysharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)// (3)Register a informer//f.informers[informerType] = informer,//the detail for informer is build in NewFilteredPodInformer()podInformer := sharedInformers.Core().V1().Pods().Informer()// (4)Register event handlerpodInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: func(obj interface{}) {mObj := obj.(v1.Object)klog.Infof("Get new obj: %v", mObj)klog.Infof("Get new obj name: %s", mObj.GetName())},})// (5)Start all informerssharedInformers.Start(stopCh)// (6)A cronjob for cache syncif !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) {klog.Infof("Cache sync fail!")}// (7)Use listerpodLister := sharedInformers.Core().V1().Pods().Lister()pods, err := podLister.List(labels.Everything())if err != nil {klog.Infof("err: %v", err)}klog.Infof("len(pods), %d", len(pods))for _, v := range pods {klog.Infof("pod: %s", v.Name)}<- stopChan}上面就是一个简单的 informer 的使用例子,整个过程如上述几个步骤,着重说一下(2)、(3)、(4)、(5)四个步骤
3、流程分析3.1、New a sharedInformers factorysharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync)factory := &sharedInformerFactory{client:client,namespace:v1.NamespaceAll,defaultResync:defaultResync,informers:make(map[reflect.Type]cache.SharedIndexInformer),startedInformers: make(map[reflect.Type]bool),customResync:make(map[reflect.Type]time.Duration),}这个过程就是创建一个 informer 的工厂 sharedInformerFactory,sharedInformerFactory 中有一个 informers 对象,里面是一个 informer 的 map,sharedInformerFactory 是为了防止过多的重复 informer 监听 apiserver , 导致 apiserver 压力过大,在同一个服务中,不同的 controller 使用同一个 informer
3.2、Register a informer这个过程主要是生成和注册 informer 到 sharedInformerFactory
podInformer := sharedInformers.Core().V1().Pods().Informer()func (f *podInformer) Informer() cache.SharedIndexInformer {return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)}### f.factory.InformerFor:### 注册 informer func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {...informer = newFunc(f.client, resyncPeriod)f.informers[informerType] = informerreturn informer}### f.defaultInformer:### 生成 informerfunc (f *podInformer) defaultInformer(client k.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)}func NewFilteredPodInformer(client k.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {return cache.NewSharedIndexInformer(&cache.ListWatch{ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.CoreV1().Pods(namespace).List(context.TODO(), options)},WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {if tweakListOptions != nil {tweakListOptions(&options)}return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)},},&corev1.Pod{},resyncPeriod,indexers,)}### cache.NewSharedIndexInformer:func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {realClock := &clock.RealClock{}sharedIndexInformer := &sharedIndexInformer{processor:&sharedProcessor{clock: realClock},indexer:NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),listerWatcher:lw,objectType:exampleObject,resyncCheckPeriod:defaultEventHandlerResyncPeriod,defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,cacheMutationDetector:NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),clock:realClock,}return sharedIndexInformer}


推荐阅读