Overview

根据Kuberneter文档对Controller的描述,Controller在kubernetes中是负责协调的组件,根据设计模式可知,controller会不断的你的对象(如Pod)从当前状态与期望状态同步的一个过程。当然Controller会监听你的实际状态与期望状态。

Writing Controllers

go
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package main

import (
	"flag"
	"fmt"
	"os"
	"time"

	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/wait"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/klog"
)

type Controller struct {
	lister     cache.Indexer
	controller cache.Controller
	queue      workqueue.RateLimitingInterface
}

func NewController(lister cache.Indexer, controller cache.Controller, queue workqueue.RateLimitingInterface) *Controller {
	return &Controller{
		lister:     lister,
		controller: controller,
		queue:      queue,
	}
}

func (c *Controller) processItem() bool {
	item, quit := c.queue.Get()
	if quit {
		return false
	}
	defer c.queue.Done(item)
	fmt.Println(item)
	err := c.processWrapper(item.(string))
	if err != nil {
		c.handleError(item.(string))
	}
	return true
}

func (c *Controller) handleError(key string) {

	if c.queue.NumRequeues(key) < 3 {
		c.queue.AddRateLimited(key)
		return
	}
	c.queue.Forget(key)
	klog.Infof("Drop Object %s in queue", key)
}

func (c *Controller) processWrapper(key string) error {
	item, exists, err := c.lister.GetByKey(key)
	if err != nil {
		klog.Error(err)
		return err
	}
	if !exists {
		klog.Info(fmt.Sprintf("item %v not exists in cache.\n", item))
	} else {
		fmt.Println(item.(*v1.Pod).GetName())
	}
	return err
}

func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
	defer utilruntime.HandleCrash()
	defer c.queue.ShutDown()
	klog.Infof("Starting custom controller")

	go c.controller.Run(stopCh)

	if !cache.WaitForCacheSync(stopCh, c.controller.HasSynced) {
		utilruntime.HandleError(fmt.Errorf("sync failed."))
		return
	}

	for i := 0; i < threadiness; i++ {
		go wait.Until(func() {
			for c.processItem() {
			}
		}, time.Second, stopCh)
	}
	<-stopCh
	klog.Info("Stopping custom controller")
}

func main() {
	var (
		k8sconfig  *string //使用kubeconfig配置文件进行集群权限认证
		restConfig *rest.Config
		err        error
	)
	if home := homedir.HomeDir(); home != "" {
		k8sconfig = flag.String("kubeconfig", fmt.Sprintf("%s/.kube/config", home), "kubernetes auth config")
	}
	k8sconfig = k8sconfig
	flag.Parse()
	if _, err := os.Stat(*k8sconfig); err != nil {
		panic(err)
	}

	if restConfig, err = rest.InClusterConfig(); err != nil {
		// 这里是从masterUrl 或者 kubeconfig传入集群的信息,两者选一
		restConfig, err = clientcmd.BuildConfigFromFlags("", *k8sconfig)
		if err != nil {
			panic(err)
		}
	}
	restset, err := kubernetes.NewForConfig(restConfig)
	lister := cache.NewListWatchFromClient(restset.CoreV1().RESTClient(), "pods", "default", fields.Everything())
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
	indexer, controller := cache.NewIndexerInformer(lister, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			fmt.Println("add ", obj.(*v1.Pod).GetName())
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}

		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			fmt.Println("update", newObj.(*v1.Pod).GetName())
			if newObj.(*v1.Pod).Status.Conditions[0].Status == "True" {
				fmt.Println("update: the Initialized Status", newObj.(*v1.Pod).Status.Conditions[0].Status)
			} else {
				fmt.Println("update: the Initialized Status ", newObj.(*v1.Pod).Status.Conditions[0].Status)
				fmt.Println("update: the Initialized Reason ", newObj.(*v1.Pod).Status.Conditions[0].Reason)
			}

			if len(newObj.(*v1.Pod).Status.Conditions) > 1 {
				if newObj.(*v1.Pod).Status.Conditions[1].Status == "True" {
					fmt.Println("update: the Ready Status", newObj.(*v1.Pod).Status.Conditions[1].Status)
				} else {
					fmt.Println("update: the Ready Status ", newObj.(*v1.Pod).Status.Conditions[1].Status)
					fmt.Println("update: the Ready Reason ", newObj.(*v1.Pod).Status.Conditions[1].Reason)
				}

				if newObj.(*v1.Pod).Status.Conditions[2].Status == "True" {
					fmt.Println("update: the PodCondition Status", newObj.(*v1.Pod).Status.Conditions[2].Status)
				} else {
					fmt.Println("update: the PodCondition Status ", newObj.(*v1.Pod).Status.Conditions[2].Status)
					fmt.Println("update: the PodCondition Reason ", newObj.(*v1.Pod).Status.Conditions[2].Reason)
				}

				if newObj.(*v1.Pod).Status.Conditions[3].Status == "True" {
					fmt.Println("update: the PodScheduled Status", newObj.(*v1.Pod).Status.Conditions[3].Status)
				} else {
					fmt.Println("update: the PodScheduled Status ", newObj.(*v1.Pod).Status.Conditions[3].Status)
					fmt.Println("update: the PodScheduled Reason ", newObj.(*v1.Pod).Status.Conditions[3].Reason)
				}
			}

		},
		DeleteFunc: func(obj interface{}) {
			fmt.Println("delete ", obj.(*v1.Pod).GetName(), "Status ", obj.(*v1.Pod).Status.Phase)
			// 上面是事件函数的处理,下面是对workqueue的操作
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})

	c := NewController(indexer, controller, queue)
	stopCh := make(chan struct{})
	stopCh1 := make(chan struct{})
	c.Run(1, stopCh)
	defer close(stopCh)
	<-stopCh1
}

通过日志可以看出,Pod create后的步骤大概为4步:

  • Initialized:初始化好后状态为Pending
  • PodScheduled:然后调度
  • PodCondition
  • Ready
text
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
add  netbox
default/netbox
netbox
update netbox status Pending to Pending
update: the Initialized Status True
update netbox status Pending to Pending
update: the Initialized Status True
update: the Ready Status  False
update: the Ready Reason  ContainersNotReady
update: the PodCondition Status  False
update: the PodCondition Reason  ContainersNotReady
update: the PodScheduled Status True


update netbox status Pending to Running
update: the Initialized Status True
update: the Ready Status True
update: the PodCondition Status True
update: the PodScheduled Status True

大致上与 kubectl describe pod 看到的内容页相似

text
1
2
3
4
5
default-scheduler  Successfully assigned default/netbox to master-machine
  Normal  Pulling    85s   kubelet            Pulling image "cylonchau/netbox"
  Normal  Pulled     30s   kubelet            Successfully pulled image "cylonchau/netbox"
  Normal  Created    30s   kubelet            Created container netbox
  Normal  Started    30s   kubelet            Started container netbox

Reference

controllers.md