K8s client go source code analysis (2)

Four coffee beans 2021-01-23 10:28:29
k8s client source code analysis

brief introduction : Cloud native community activities ---Kubernetes Source analysis phase 1, week 2

This week is K8S Source code workshop phase 1, week 2 , Learning content is learning Informer Mechanism , This paper starts with this topic .

The president of the seminar is very busy this week , Postpone this course to the end of the next week , Everything is like this , There is always a chance that the plan will be broken by something else , But in the end, as long as we can return to the corresponding main line , It's not a problem . It's like participating in open source , The first open source code was just the beginning , What is needed is to be able to stick to it , And this is often very important .

Okay , To the body .

The theme of this article :

  1. Informer Overview of mechanism architecture design
  2. Reflector understand
  3. DeltaFIFO understand
  4. Indexer understand

If it comes to the content of resources , This article takes Deployment Resources for related content .

Informer Overview of mechanism architecture design

Here is a data flow chart I drew according to my understanding , Look at the overall trend of data from a global perspective .

The dotted line represents the method in the code .

Let's start with a conclusion :

adopt Informer In the case of mechanism obtaining data , It will be initialized from Kubernetes API Server Get corresponding Resource All Object, The follow-up will only go through Watch Mechanism reception API Server Push the data , Never take the initiative again API Server Pull data , Use data directly from the local cache to reduce API Server The pressure of the .

Watch The mechanism is based on HTTP Of Chunk Realization , Maintain a long connection , This is an optimization point , Reduce the amount of data requested . The second optimization point is SharedInformer, It allows the same resource to use the same Informer, for example v1 Version of Deployment and v1beta1 Version of Deployment At the same time , Share a Informer.

You can see in the picture above Informer It's divided into three parts , It can be understood as three logic .

among Reflector Mainly from API Server The data obtained is put into DeltaFIFO In line , Act as a producer .

SharedInformer Mainly from DeltaFIFIO Get data in the queue and distribute it , Act as a consumer .

Last Indexer Is a storage component that exists as a local cache .

Reflector understand

Reflector It's mainly about Run、ListAndWatch、watchHandler Three places are enough .

The source code location is tools/cache/reflector.go

// Ruvn starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
// Start with Run, The place where the upper layer is called is controller.go Medium Run Method
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() {
// Execute once after startup ListAndWatch
if err := r.ListAndWatch(stopCh); err != nil {
}, r.period, stopCh)
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
// Here is the call to each resource ListFunc function , For example, if v1 Version of Deployment
// It calls informers/apps/v1/deployment.go Medium ListFunc
return r.listerWatcher.List(opts)
if r.WatchListPageSize != 0 {
pager.Pa1geSize = r.WatchListPageSize
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
list, err = pager.List(context.Background(), options)
// This part is mainly from API SERVER Request data once Get all of the resources Object
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(li
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
initTrace.Step("SyncWith done")
initTrace.Step("Resource version updated")
// Handle Watch And put the data in DeltaFIFO among
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
return nil

The production of data is over , Just two :

  1. Initialize from API Server Request data
  2. Monitor follow-up from Watch Push the data

DeltaFIFO understand

Let's look at the data structure first :

type DeltaFIFO struct {
items map[string]Deltas
queue []string
type Delta struct {
Type DeltaType
Object interface{}
type Deltas []Delta
type DeltaType string
// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
Sync DeltaType = "Sync"

among queue What's stored is Object Of id, and items It's stored with ObjectID by key This Object List of events ,

You can imagine a data structure like this , On the left is Key, On the right is an array object , Each of these elements is made up of type and obj form .

DeltaFIFO As the name suggests, store Delta First in first out queue of data , Equivalent to a data transfer station , Moving data from one place to another .

The main content is queueActionLocked、Pop、Resync

queueActionLocked Method :

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
newDeltas := append(f.items[id], Delta{actionType, obj})
// To reprocess
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
//pop news
return nil

Pop Method :

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
// Blocking Until I call f.cond.Broadcast()
// Take out the first element
id := f.queue[0]
f.queue = f.queue[1:]
item, ok := f.items[id]
delete(f.items, id)
// This process Can be in controller.go Medium processLoop() find
// Initialization is in shared_informer.go Of Run
// Final execution to shared_informer.go Of HandleDeltas Method
err := process(item)
// If the processing goes wrong, put it back in the queue
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err

Resync Mechanism :

A small summary : Every time from local cache Indexer Get the data in and put it back in DeltaFIFO To execute the task logic in .

Starting up Resync The place is reflector.go Of resyncChan() Method , stay reflector.go Of ListAndWatch Method to start the timed execution of the call .

go func() {
// Start timing task
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
for {
select {
case <-resyncCh:
case <-stopCh:
case <-cancelCh:
// Timing execution The call will execute to delta_fifo.go Of Resync() Method
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
resyncCh, cleanup = r.resyncChan()
func (f *DeltaFIFO) Resync() error {
// Get all the... From the cache key
keys := f.knownObjects.ListKeys()
for _, k := range keys {
if err := f.syncKeyLocked(k); err != nil {
return err
return nil
func (f *DeltaFIFO) syncKeyLocked(key string) error {
// Get the cache and get the corresponding Object
obj, exists, err := f.knownObjects.GetByKey(key)
// Put in the queue to execute the task logic
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
return nil

SharedInformer Consumer information understanding

It mainly depends on HandleDeltas Just the way , Consume the message, then distribute the data and store it in the cache

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
// Check if it's in Indexer In cache If you are in the cache, update the objects in the cache
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
// Distribute data to Listener
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// Not in the Indexer In cache Insert objects into the cache
if err := s.indexer.Add(d.Object); err != nil {
return err
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
return nil

Indexer understand

This one doesn't tell much about it , Because I think Informer The most important mechanism is the flow of the previous data , Of course, that doesn't mean data storage doesn't matter , It's to sort out the whole idea first , We will update the stored part in detail later .

Indexer It uses threadsafe_store.go Medium threadSafeMap Store the data , It is a thread safe and indexing function map, Data will only be stored in memory , Each operation involved will be locked .

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
indexers Indexers
indices Indices

Indexer There is also an index related content will not be discussed for the time being .

Example Code

package main
import (
v1 "k8s.io/api/apps/v1"
func main() {
var err error
var config *rest.Config
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[ Optional ] kubeconfig Absolute path ")
} else {
kubeconfig = flag.String("kubeconfig", filepath.Join("/tmp", "config"), "kubeconfig Absolute path ")
// initialization rest.Config object
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
// establish Clientset object
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
// Initialize a SharedInformerFactory Set up resync by 60 Seconds at a time , Will trigger UpdateFunc
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*60)
// Yes Deployment monitor
// Here, if you get v1betav1 Of deployment Resources for
// informerFactory.Apps().V1beta1().Deployments()
deployInformer := informerFactory.Apps().V1().Deployments()
// establish Informer( It's like registering in a factory , So the next time it starts, it will go List & Watch Corresponding resources )
informer := deployInformer.Informer()
// establish deployment Of Lister
deployLister := deployInformer.Lister()
// Register event handlers Processing event data
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
stopper := make(chan struct{})
defer close(stopper)
// Get from local cache default All... In the namespace deployment list
deployments, err := deployLister.Deployments("default").List(labels.Everything())
if err != nil {
for idx, deploy := range deployments {
fmt.Printf("%d -> %sn", idx+1, deploy.Name)
func onAdd(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("add a deployment:", deploy.Name)
func onUpdate(old, new interface{}) {
oldDeploy := old.(*v1.Deployment)
newDeploy := new.(*v1.Deployment)
fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)
func onDelete(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("delete a deployment:", deploy.Name)

In the above example code, the program will be pulled once after it is started Deployment data , And pull the data from the local cache after completion List once default Namespace Deployment Resources and print , Then each 60 second Resync once Deployment resources .


Why Resync?

In this week, some students put forward a , I also feel strange when I see this problem , because Resync It's from the local cache data cache to the local cache ( From the beginning to the end ), Why do you need to take the data out and go through the process again ? At that time, I couldn't understand , Then I'll find out from another angle .

Data from API Server Come here and process it and put it in the cache , But data doesn't have to be processed properly , That is to say, there may be a mistake , And this Resync It is equivalent to a retrial mechanism .

You can try to practice : Deploying stateful Services , Storage use LocalPV( You can also change it into something you are familiar with ), Now pod It will fail to start because the storage directory does not exist . And then in pod After the startup fails, create the corresponding directory , In a while pod It's started successfully . This is a situation I understand .

summary :

Informer Mechanism in K8S Is the cornerstone of communication between components , It's very helpful to have a thorough understanding , I'm still in the process of further understanding , Welcome to share .

Pre reading :

From Four coffee beans , Reprint please state the source .
Pay attention to the number of gongzong ->[ Four coffee beans ] Get the latest
本文为[Four coffee beans]所创,转载请带上原文链接,感谢

  1. 【计算机网络 12(1),尚学堂马士兵Java视频教程
  2. 【程序猿历程,史上最全的Java面试题集锦在这里
  3. 【程序猿历程(1),Javaweb视频教程百度云
  4. Notes on MySQL 45 lectures (1-7)
  5. [computer network 12 (1), Shang Xuetang Ma soldier java video tutorial
  6. The most complete collection of Java interview questions in history is here
  7. [process of program ape (1), JavaWeb video tutorial, baidu cloud
  8. Notes on MySQL 45 lectures (1-7)
  9. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  10. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  11. 精进 Spring Boot 03:Spring Boot 的配置文件和配置管理,以及用三种方式读取配置文件
  12. Refined spring boot 03: spring boot configuration files and configuration management, and reading configuration files in three ways
  13. 【递归,Java传智播客笔记
  14. [recursion, Java intelligence podcast notes
  15. [adhere to painting for 386 days] the beginning of spring of 24 solar terms
  16. K8S系列第八篇(Service、EndPoints以及高可用kubeadm部署)
  17. K8s Series Part 8 (service, endpoints and high availability kubeadm deployment)
  18. 【重识 HTML (3),350道Java面试真题分享
  19. 【重识 HTML (2),Java并发编程必会的多线程你竟然还不会
  20. 【重识 HTML (1),二本Java小菜鸟4面字节跳动被秒成渣渣
  21. [re recognize HTML (3) and share 350 real Java interview questions
  22. [re recognize HTML (2). Multithreading is a must for Java Concurrent Programming. How dare you not
  23. [re recognize HTML (1), two Java rookies' 4-sided bytes beat and become slag in seconds
  24. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  25. RPC 1: how to develop RPC framework from scratch
  26. 造轮子系列之RPC 1:如何从零开始开发RPC框架
  27. RPC 1: how to develop RPC framework from scratch
  28. 一次性捋清楚吧,对乱糟糟的,Spring事务扩展机制
  29. 一文彻底弄懂如何选择抽象类还是接口,连续四年百度Java岗必问面试题
  30. Redis常用命令
  31. 一双拖鞋引发的血案,狂神说Java系列笔记
  32. 一、mysql基础安装
  33. 一位程序员的独白:尽管我一生坎坷,Java框架面试基础
  34. Clear it all at once. For the messy, spring transaction extension mechanism
  35. A thorough understanding of how to choose abstract classes or interfaces, baidu Java post must ask interview questions for four consecutive years
  36. Redis common commands
  37. A pair of slippers triggered the murder, crazy God said java series notes
  38. 1、 MySQL basic installation
  39. Monologue of a programmer: despite my ups and downs in my life, Java framework is the foundation of interview
  40. 【大厂面试】三面三问Spring循环依赖,请一定要把这篇看完(建议收藏)
  41. 一线互联网企业中,springboot入门项目
  42. 一篇文带你入门SSM框架Spring开发,帮你快速拿Offer
  43. 【面试资料】Java全集、微服务、大数据、数据结构与算法、机器学习知识最全总结,283页pdf
  44. 【leetcode刷题】24.数组中重复的数字——Java版
  45. 【leetcode刷题】23.对称二叉树——Java版
  46. 【leetcode刷题】22.二叉树的中序遍历——Java版
  47. 【leetcode刷题】21.三数之和——Java版
  48. 【leetcode刷题】20.最长回文子串——Java版
  49. 【leetcode刷题】19.回文链表——Java版
  50. 【leetcode刷题】18.反转链表——Java版
  51. 【leetcode刷题】17.相交链表——Java&python版
  52. 【leetcode刷题】16.环形链表——Java版
  53. 【leetcode刷题】15.汉明距离——Java版
  54. 【leetcode刷题】14.找到所有数组中消失的数字——Java版
  55. 【leetcode刷题】13.比特位计数——Java版
  56. oracle控制用户权限命令
  57. 三年Java开发,继阿里,鲁班二期Java架构师
  58. Oracle必须要启动的服务
  59. 万字长文!深入剖析HashMap,Java基础笔试题大全带答案
  60. 一问Kafka就心慌?我却凭着这份,图灵学院vip课程百度云