(2)消费—c.processLoop:消费逻辑就是从 DeltaFifo pop 出对象,然后做两件事情:(1)触发前面注册的 eventhandler (2)更新本地索引缓存 indexer,保持数据和 etcd 一致
func (c *controller) processLoop() {for {obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))}}### Queue.Pop:## Queue.Pop是一个带有处理函数的pod方法,首先先看Pod逻辑,即为deltaFifo的pop方法:func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {for {// 无限循环for len(f.queue) == 0 {f.cond.Wait()// 阻塞直到生产端broadcast方法通知}id := f.queue[0]item, ok := f.items[id]delete(f.items, id)err := process(item)// 执行处理方法if e, ok := err.(ErrRequeue); ok {f.addIfNotPresent(id, item)// 如果处理失败的重新加入到fifo中重新处理err = e.Err}return item, err}}### c.config.Process:## c.config.Process是在初始化controller的时候赋值的,即为前面的s.HandleDeltas### s.HandleDeltas:func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {s.blockDeltas.Lock()defer s.blockDeltas.Unlock()// from oldest to newestfor _, d := range obj.(Deltas) {switch d.Type {case Sync, Replaced, Added, Updated:s.cacheMutationDetector.AddObject(d.Object)if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {if err := s.indexer.Update(d.Object); err != nil {return err}isSync := falseswitch {case d.Type == Sync:// Sync events are only propagated to listeners that requested resyncisSync = truecase d.Type == Replaced:if accessor, err := meta.Accessor(d.Object); err == nil {if oldAccessor, err := meta.Accessor(old); err == nil {// Replaced events that didn't change resourceVersion are treated as resync events// and only propagated to listeners that requested resyncisSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()}}}s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)} else {if err := s.indexer.Add(d.Object); err != nil {return err}s.processor.distribute(addNotification{newObj: d.Object}, false)}case Deleted:if err := s.indexer.Delete(d.Object); err != nil {return err}s.processor.distribute(deleteNotification{oldObj: d.Object}, false)}}return nil}
可以看到上面主要执行两部分逻辑:
s.processor.distribute#### s.processor.distribute:### 例如新增通知:s.processor.distribute(addNotification{newObj: d.Object}, false)### 其中addNotification就是add类型的通知,后面会通过notification结构体的类型来执行不同的eventHandlerfunc (p *sharedProcessor) distribute(obj interface{}, sync bool) {p.listenersLock.RLock()defer p.listenersLock.RUnlock()if sync {for _, listener := range p.syncingListeners {listener.add(obj)}} else {for _, listener := range p.listeners {listener.add(obj)}}}func (p *processorListener) add(notification interface{}) {p.addCh <- notification// 新增notification到addCh}
这里 p.addCh 对应到前面说的关注对象 p.addCh,processorListener 收到 addCh 信号之后传递给 nextCh,然后通过 notification 结构体的类型来执行不同的 eventHandler
s.indexer 的增删改:这个就是本地数据的缓存和索引,自定义控制逻辑里面会通过 indexer 获取操作对象的具体参数,这里就不展开细讲了 。
4、总结至此一个 informer 的 client-go 部分的流程就走完了,可以看到启动 informer 主要流程就是:
1、Reflector ListAndWatch:
(1)通过一个 reflector run 起来一个带有 list 和 watch api 的 client
(2)list 到的 pod 列表通过 DeltaFifo 存储 , 并更新最新的 ResourceVersion
(3)继续监听 pod,监听到的 pod 操作事件继续存储到 DeltaFifo 中
2、DeltaFifo 生产和消费:
(1)生产:list and watch 到的事件生产压入队列 DeltaFifo
(2)消费:执行注册的 eventHandler,并更新本地 indexer
所以 informer 本质其实就是一个通过 deltaFifo 建立生产消费机制,并且带有本地缓存和索引 , 以及可以注册回调事件的 apiServer 的客户端库 。
5、参考
- https://Github.com/kube.NETes/sample-controller/tree/master
- https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html
推荐阅读
- 选择空调的基本常识 选择空调的基本常识知乎
- 为什么理发店使用的洗发水,超市基本没有卖?说出来你可能不信
- 狼人杀基本游戏规则
- 亚文化有哪些基本特点 亚文化有哪些
- 显微镜的重点知识 显微镜使用的基本常识考题
- 基本彩妆步骤 基本彩妆步骤包括
- 离心泵维修必备的基本技能有哪些
- 掌握这些基本电脑技巧,轻松驾驭电脑操作!
- Kubernetes 100个常用命令!
- 一文读懂Kubernetes部署策略