本文发布于Cylon的收藏册,转载请著名原文链接~

本文是关于Kubernetes 4A解析的第三章

所有关于Kubernetes 4A部分代码上传至仓库 github.com/cylonchau/hello-k8s-4A

如有错别字或理解错误地方请多多担待,代码是以1.24进行整理,实验是以1.19环境进行,差别不大

BACKGROUND

admission controllers的特点

  • 可定制性:准入功能可针对不同的场景进行调整。
  • 可预防性:审计则是为了检测问题,而准入控制器可以预防问题发生
  • 可扩展性:在kubernetes自有的验证机制外,增加了另外的防线,弥补了RBAC仅能对资源提供安全保证。

下图,显示了用户操作资源的流程,可以看出 admission controllers 作用是在通过身份验证资源持久化之前起到拦截作用。在准入控制器的加入会使kubernetes增加了更高级的安全功能。

准入控制器阶段

图:Kubernetes API 请求的请求处理步骤图
Source:https://kubernetes.io/blog/2019/03/21/a-guide-to-kubernetes-admission-controllers/

这里找到一个大佬博客画的图,通过两张图可以很清晰的了解到admission webhook流程,与官方给出的不一样的地方在于,这里清楚地定位了kubernetes admission webhook 处于准入控制中,RBAC之后,push 之前。

img

图:Kubernetes API 请求的请求处理步骤图(详细)
Source:https://www.armosec.io/blog/kubernetes-admission-controller/

两种控制器有什么区别?

根据官方提供的说法是

Mutating controllers may modify related objects to the requests they admit; validating controllers may not

从结构图中也可以看出,validating 是在持久化之前,而 Mutating 是在结构验证前,根据这些特性我们可以使用 Mutating 修改这个资源对象内容(如增加验证的信息),在 validating 中验证是否合法。

composition of admission controllers

kubernetes中的 admission controllers 由两部分组成:

  • 内置在APIServer中的准入控制器 build-in li.st
  • 特殊的控制器;也是内置在APIServer中,但提供一些自定义的功能
    • MutatingAdmission
    • ValidatingAdmission

Mutating 控制器可以修改他们处理的资源对象,Validating 控制器不会。当在任何一个阶段中的任何控制器拒绝这个了请求,则会立即拒绝整个请求,并将错误返回。

admission webhook

由于准入控制器是内置在 kube-apiserver 中的,这种情况下就限制了admission controller的可扩展性。在这种背景下,kubernetes提供了一种可扩展的准入控制器 extensible admission controllers,这种行为叫做动态准入控制 Dynamic Admission Control,而提供这个功能的就是 admission webhook

admission webhook 通俗来讲就是 HTTP 回调,通过定义一个http server,接收准入请求并处理。用户可以通过kubernetes提供的两种类型的 admission webhookvalidating admission webhookmutating admission webhook。来完成自定义的准入策略的处理。

webhook 就是

注:从上面的流程图也可以看出,admission webhook 也是有顺序的。首先调用mutating webhook,然后会调用validating webhook。

如何使用准入控制器

使用条件:kubernetes v1.16 使用 admissionregistration.k8s.io/v1 ;kubernetes v1.9 使用 admissionregistration.k8s.io/v1beta1

如何在集群中开启准入控制器? :查看kube-apiserver 的启动参数 --enable-admission-plugins ;通过该参数来配置要启动的准入控制器,如 --enable-admission-plugins=NodeRestriction 多个准入控制器以 , 分割,顺序无关紧要。 反之使用 --disable-admission-plugins 参数可以关闭相应的准入控制器(Refer to apiserver opts)。

通过 kubectl 命令可以看到,当前kubernetes集群所支持准入控制器的版本

1
2
3
$ kubectl api-versions | grep admissionregistration.k8s.io/v1
admissionregistration.k8s.io/v1
admissionregistration.k8s.io/v1beta1

webhook工作原理

通过上面的学习,已经了解到了两种webhook的工作原理如下所示:

mutating webhook,会在持久化前拦截在 MutatingWebhookConfiguration 中定义的规则匹配的请求。MutatingAdmissionWebhook 通过向 mutating webhook 服务器发送准入请求来执行验证。

validaing webhook,会在持久化前拦截在 ValidatingWebhookConfiguration 中定义的规则匹配的请求。ValidatingAdmissionWebhook 通过将准入请求发送到 validating webhook server来执行验证。

那么接下来将从源码中看这个在这个工作流程中,究竟做了些什么?

资源类型

对于 1.9 版本之后,也就是 v1 版本 ,admission 被定义在 k8s.io\api\admissionregistration\v1\types.go ,大同小异,因为本地只有1.18集群,所以以这个讲解。

对于 Validating Webhook 来讲实现主要都在webhook中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type ValidatingWebhookConfiguration struct {
    // 每个api必须包含下列的metadata,这个是kubernetes规范,可以在注释中的url看到相关文档
	metav1.TypeMeta `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`
	// Webhooks在这里被表示为[]ValidatingWebhook,表示我们可以注册多个
	// +optional
	// +patchMergeKey=name
	// +patchStrategy=merge
	Webhooks []ValidatingWebhook `json:"webhooks,omitempty" patchStrategy:"merge" patchMergeKey:"name" protobuf:"bytes,2,rep,name=Webhooks"`
}

webhook,则是对这种类型的webhook提供的操作、资源等。对于这部分不做过多的注释了,因为这里本身为kubernetes API资源,官网有很详细的例子与说明。这里更多字段的意思的可以参考官方 doc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
type ValidatingWebhook struct {
	//  admission webhook的名词,Required
	Name string `json:"name" protobuf:"bytes,1,opt,name=name"`

	// ClientConfig 定义了与webhook通讯的方式 Required
	ClientConfig WebhookClientConfig `json:"clientConfig" protobuf:"bytes,2,opt,name=clientConfig"`

	// rule表示了webhook对于哪些资源及子资源的操作进行关注
	Rules []RuleWithOperations `json:"rules,omitempty" protobuf:"bytes,3,rep,name=rules"`

	// FailurePolicy 对于无法识别的value将如何处理,allowed/Ignore optional
	FailurePolicy *FailurePolicyType `json:"failurePolicy,omitempty" protobuf:"bytes,4,opt,name=failurePolicy,casttype=FailurePolicyType"`

	// matchPolicy 定义了如何使用“rules”列表来匹配传入的请求。
	MatchPolicy *MatchPolicyType `json:"matchPolicy,omitempty" protobuf:"bytes,9,opt,name=matchPolicy,casttype=MatchPolicyType"`
	NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty" protobuf:"bytes,5,opt,name=namespaceSelector"`
	SideEffects *SideEffectClass `json:"sideEffects" protobuf:"bytes,6,opt,name=sideEffects,casttype=SideEffectClass"`
	AdmissionReviewVersions []string `json:"admissionReviewVersions" protobuf:"bytes,8,rep,name=admissionReviewVersions"`
}

到这里了解了一个webhook资源的定义,那么这个如何使用呢?通过 Find Usages 找到一个 k8s.io/apiserver/pkg/admission/plugin/webhook/accessors.go 在使用它。这里没有注释,但在结构上可以看出,包含客户端与一系列选择器组成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type mutatingWebhookAccessor struct {
	*v1.MutatingWebhook
	uid               string
	configurationName string

	initObjectSelector sync.Once
	objectSelector     labels.Selector
	objectSelectorErr  error

	initNamespaceSelector sync.Once
	namespaceSelector     labels.Selector
	namespaceSelectorErr  error

	initClient sync.Once
	client     *rest.RESTClient
	clientErr  error
}

accessor 因为包含了整个webhookconfig定义的一些动作(这里个人这么觉得)。

accessor.go 下面 有一个 GetRESTClient 方法 ,通过这里可以看出,这里做的就是使用根据 accessor 构造一个客户端。

1
2
3
4
5
6
func (m *mutatingWebhookAccessor) GetRESTClient(clientManager *webhookutil.ClientManager) (*rest.RESTClient, error) {
	m.initClient.Do(func() {
		m.client, m.clientErr = clientManager.HookClient(hookClientConfigForWebhook(m))
	})
	return m.client, m.clientErr
}

到这步骤已经没必要往下看了,因已经知道这里是请求webhook前的步骤了,下面就是何时请求了。

k8s.io\apiserver\pkg\admission\plugin\webhook\validating\dispatcher.go 下面有两个方法,Dispatch去请求我们自己定义的webhook

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
func (d *validatingDispatcher) Dispatch(ctx context.Context, attr admission.Attributes, o admission.ObjectInterfaces, hooks []webhook.WebhookAccessor) error {
	var relevantHooks []*generic.WebhookInvocation
	// Construct all the versions we need to call our webhooks
	versionedAttrs := map[schema.GroupVersionKind]*generic.VersionedAttributes{}
	for _, hook := range hooks {
		invocation, statusError := d.plugin.ShouldCallHook(hook, attr, o)
		if statusError != nil {
			return statusError
		}
		if invocation == nil {
			continue
		}
		relevantHooks = append(relevantHooks, invocation)
		// If we already have this version, continue
		if _, ok := versionedAttrs[invocation.Kind]; ok {
			continue
		}
		versionedAttr, err := generic.NewVersionedAttributes(attr, invocation.Kind, o)
		if err != nil {
			return apierrors.NewInternalError(err)
		}
		versionedAttrs[invocation.Kind] = versionedAttr
	}

	if len(relevantHooks) == 0 {
		// no matching hooks
		return nil
	}

	// Check if the request has already timed out before spawning remote calls
	select {
	case <-ctx.Done():
		// parent context is canceled or timed out, no point in continuing
		return apierrors.NewTimeoutError("request did not complete within requested timeout", 0)
	default:
	}

	wg := sync.WaitGroup{}
	errCh := make(chan error, len(relevantHooks))
	wg.Add(len(relevantHooks))
    // 循环所有相关的注册的hook
	for i := range relevantHooks {
		go func(invocation *generic.WebhookInvocation) {
			defer wg.Done()
            // invacation 中有一个 Accessor,Accessor注册了一个相关的webhookconfig
            // 也就是我们 kubectl -f 注册进来的那个webhook的相关配置
			hook, ok := invocation.Webhook.GetValidatingWebhook()
			if !ok {
				utilruntime.HandleError(fmt.Errorf("validating webhook dispatch requires v1.ValidatingWebhook, but got %T", hook))
				return
			}
			versionedAttr := versionedAttrs[invocation.Kind]
			t := time.Now()
            // 调用了callHook去请求我们自定义的webhook
			err := d.callHook(ctx, hook, invocation, versionedAttr)
			ignoreClientCallFailures := hook.FailurePolicy != nil && *hook.FailurePolicy == v1.Ignore
			rejected := false
			if err != nil {
				switch err := err.(type) {
				case *webhookutil.ErrCallingWebhook:
					if !ignoreClientCallFailures {
						rejected = true
						admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionCallingWebhookError, 0)
					}
				case *webhookutil.ErrWebhookRejection:
					rejected = true
					admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionNoError, int(err.Status.ErrStatus.Code))
				default:
					rejected = true
					admissionmetrics.Metrics.ObserveWebhookRejection(hook.Name, "validating", string(versionedAttr.Attributes.GetOperation()), admissionmetrics.WebhookRejectionAPIServerInternalError, 0)
				}
			}
			admissionmetrics.Metrics.ObserveWebhook(time.Since(t), rejected, versionedAttr.Attributes, "validating", hook.Name)
			if err == nil {
				return
			}

			if callErr, ok := err.(*webhookutil.ErrCallingWebhook); ok {
				if ignoreClientCallFailures {
					klog.Warningf("Failed calling webhook, failing open %v: %v", hook.Name, callErr)
					utilruntime.HandleError(callErr)
					return
				}

				klog.Warningf("Failed calling webhook, failing closed %v: %v", hook.Name, err)
				errCh <- apierrors.NewInternalError(err)
				return
			}

			if rejectionErr, ok := err.(*webhookutil.ErrWebhookRejection); ok {
				err = rejectionErr.Status
			}
			klog.Warningf("rejected by webhook %q: %#v", hook.Name, err)
			errCh <- err
		}(relevantHooks[i])
	}
	wg.Wait()
	close(errCh)

	var errs []error
	for e := range errCh {
		errs = append(errs, e)
	}
	if len(errs) == 0 {
		return nil
	}
	if len(errs) > 1 {
		for i := 1; i < len(errs); i++ {
			// TODO: merge status errors; until then, just return the first one.
			utilruntime.HandleError(errs[i])
		}
	}
	return errs[0]
}

callHook 可以理解为真正去请求我们自定义的webhook服务的动作

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
func (d *validatingDispatcher) callHook(ctx context.Context, h *v1.ValidatingWebhook, invocation *generic.WebhookInvocation, attr *generic.VersionedAttributes) error {
   if attr.Attributes.IsDryRun() {
      if h.SideEffects == nil {
         return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: fmt.Errorf("Webhook SideEffects is nil")}
      }
      if !(*h.SideEffects == v1.SideEffectClassNone || *h.SideEffects == v1.SideEffectClassNoneOnDryRun) {
         return webhookerrors.NewDryRunUnsupportedErr(h.Name)
      }
   }

   uid, request, response, err := webhookrequest.CreateAdmissionObjects(attr, invocation)
   if err != nil {
      return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
   }
   // 发生请求,可以看到,这里从上面的讲到的地方获取了一个客户端
   client, err := invocation.Webhook.GetRESTClient(d.cm)
   if err != nil {
      return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
   }
   trace := utiltrace.New("Call validating webhook",
      utiltrace.Field{"configuration", invocation.Webhook.GetConfigurationName()},
      utiltrace.Field{"webhook", h.Name},
      utiltrace.Field{"resource", attr.GetResource()},
      utiltrace.Field{"subresource", attr.GetSubresource()},
      utiltrace.Field{"operation", attr.GetOperation()},
      utiltrace.Field{"UID", uid})
   defer trace.LogIfLong(500 * time.Millisecond)

   // 这里设置超时,超时时长就是在yaml资源清单中设置的那个值
   if h.TimeoutSeconds != nil {
      var cancel context.CancelFunc
      ctx, cancel = context.WithTimeout(ctx, time.Duration(*h.TimeoutSeconds)*time.Second)
      defer cancel()
   }
   // 直接用post请求我们自己定义的webhook接口
   r := client.Post().Body(request)

   // if the context has a deadline, set it as a parameter to inform the backend
   if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
      // compute the timeout
      if timeout := time.Until(deadline); timeout > 0 {
         // if it's not an even number of seconds, round up to the nearest second
         if truncated := timeout.Truncate(time.Second); truncated != timeout {
            timeout = truncated + time.Second
         }
         // set the timeout
         r.Timeout(timeout)
      }
   }

   if err := r.Do(ctx).Into(response); err != nil {
      return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
   }
   trace.Step("Request completed")

   result, err := webhookrequest.VerifyAdmissionResponse(uid, false, response)
   if err != nil {
      return &webhookutil.ErrCallingWebhook{WebhookName: h.Name, Reason: err}
   }

   for k, v := range result.AuditAnnotations {
      key := h.Name + "/" + k
      if err := attr.Attributes.AddAnnotation(key, v); err != nil {
         klog.Warningf("Failed to set admission audit annotation %s to %s for validating webhook %s: %v", key, v, h.Name, err)
      }
   }
   if result.Allowed {
      return nil
   }
   return &webhookutil.ErrWebhookRejection{Status: webhookerrors.ToStatusErr(h.Name, result.Result)}
}

走到这里基本上对 admission webhook 有了大致的了解,可以知道这个操作是由 apiserver 完成的。下面就实际操作下自定义一个webhook。

这里还有两个概念,就是请求参数 AdmissionRequest 和相应参数 AdmissionResponse,这些可以在 callHook 中看到,这两个参数被定义在 k8s.io\api\admission\v1\types.go ;这两个参数也就是我们在自定义 webhook 时需要处理接收到的body的结构,以及我们响应内容数据结构。

如何编写一个自定义的admission webhook

通过上面的学习了解到了,自定义的webhook就是做为kubernetes提供给用户两种admission controller来验证自定义业务的一个中间件 admission webhook。本质上他是一个HTTP Server,用户可以使用任何语言来完成这部分功能。当然,如果涉及到需要对kubernetes集群资源操作的话,还是建议使用kubernetes官方提供了SDK的编程语言来完成自定义的webhook。

那么完成一个自定义admission webhook需要两个步骤:

  • 将相关的webhook config注册给kubernetes,也就是让kubernetes知道你的webhook
  • 准备一个http server来处理 apiserver发过来验证的信息

注:这里使用go net/http包,本身不区分方法处理HTTP的何种请求,如果用其他框架实现的,如django,需要指定对应方法需要为POST

向kubernetes注册webhook对象

kubernetes提供的两种类型可自定义的准入控制器,和其他资源一样,可以利用资源清单,动态配置那些资源要被adminssion webhook处理。 kubernetes将这种形式抽象为两种资源:

  • ValidatingWebhookConfiguration

  • MutatingWebhookConfiguration

ValidatingAdmission

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: "pod-policy.example.com"
webhooks:
- name: "pod-policy.example.com"
  rules:
  - apiGroups:   [""] # 拦截资源的Group "" 表示 core。"*" 表示所有。
    apiVersions: ["v1"] # 拦截资源的版本
    operations:  ["CREATE"] # 什么请求下拦截
    resources:   ["pods"]  # 拦截什么资源
    scope:       "Namespaced" # 生效的范围,cluster还是namespace "*"表示没有范围限制。
  clientConfig: # 我们部署的webhook服务,
    service: # service是在cluster-in模式下
      namespace: "example-namespace"
      name: "example-service"
      port: 443 # 服务的端口
      path: "/validate" # path是对应用于验证的接口
    # caBundle是提供给 admission webhook CA证书  
    caBundle: "Ci0tLS0tQk...<base64-encoded PEM bundle containing the CA that signed the webhook's serving certificate>...tLS0K"
  admissionReviewVersions: ["v1", "v1beta1"]
  sideEffects: None
  timeoutSeconds: 5 # 1-30s直接,表示请求api的超时时间

MutatingAdmission

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: "valipod-policy.example.com"
webhooks:
- name: "valipod-policy.example.com"
  rules:
    - apiGroups:   ["apps"] # 拦截资源的Group "" 表示 core。"*" 表示所有。
      apiVersions: ["v1"] # 拦截资源的版本
      operations:  ["CREATE"] # 什么请求下拦截
      resources:   ["deployments"]  # 拦截什么资源
      scope:       "Namespaced" # 生效的范围,cluster还是namespace "*"表示没有范围限制。
  clientConfig: # 我们部署的webhook服务,
    url: "https://10.0.0.1:81/validate" # 这里是外部模式
    #      service: # service是在cluster-in模式下
    #        namespace: "default"
    #        name: "admission-webhook"
    #        port: 81 # 服务的端口
    #        path: "/mutate" # path是对应用于验证的接口
    # caBundle是提供给 admission webhook CA证书
    caBundle: "Ci0tLS0tQk...<base64-encoded PEM bundle containing the CA that signed the webhook's serving certificate>...tLS0K"
  admissionReviewVersions: ["v1"]
  sideEffects: None
  timeoutSeconds: 5 # 1-30s直接,表示请求api的超时时间

注:对于webhook,也可以引入外部的服务,并非必须部署到集群内部

对于外部服务来讲,需要 clientConfig 中的 service , 更换为 url ; 通过 url 参数可以将一个外部的服务引入

1
2
3
4
5
6
7
8
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
...
webhooks:
- name: my-webhook.example.com
  clientConfig:
    url: "https://my-webhook.example.com:9443/my-webhook-path"
  ...

注:这里的url规则必须准守下列形式:

  • scheme://host:port/path
  • 使用了url 时,这里不应填写集群内的服务
  • scheme 必须是 https,不能为http,这就意味着,引入外部时也需要
  • 配置时使用了,?xx=xx 的参数也是不被允许的(官方说法是这样的,通过源码学习了解到因为会发送特定的请求体,所以无需管参数)

更多的配置可以参考kubernetes官方提供的 doc

准备一个webhook

让我们编写我们的 webhook server。将创建两个钩子,/mutate/validate

  • /mutate 将在创建deployment资源时,基于版本,给资源加上注释 webhook.example.com/allow: true
  • /validate 将对 /mutate 增加的 allow:true 那么则继续,否则拒绝。

这里为了方便,全部写在一起了,实际上不符合软件的设计。在kubernetes代码库中也提供了一个webhook server,可以参考他这个webhook server来学习具体要做什么

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
package main

import (
	"context"
	"crypto/tls"
	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"
	"os"
	"os/signal"
	"strings"
	"syscall"

	v1admission "k8s.io/api/admission/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"

	appv1 "k8s.io/api/apps/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/klog"
)

type patch struct {
	Op    string            `json:"op"`
	Path  string            `json:"path"`
	Value map[string]string `json:"value"`
}

func serve(w http.ResponseWriter, r *http.Request) {

	var body []byte
	if data, err := ioutil.ReadAll(r.Body); err == nil {
		body = data
	}
	klog.Infof(fmt.Sprintf("receive request: %v....", string(body)[:130]))
	if len(body) == 0 {
		klog.Error(fmt.Sprintf("admission request body is empty"))
		http.Error(w, fmt.Errorf("admission request body is empty").Error(), http.StatusBadRequest)
		return
	}
	var admission v1admission.AdmissionReview
	codefc := serializer.NewCodecFactory(runtime.NewScheme())
	decoder := codefc.UniversalDeserializer()
	_, _, err := decoder.Decode(body, nil, &admission)

	if err != nil {
		msg := fmt.Sprintf("Request could not be decoded: %v", err)
		klog.Error(msg)
		http.Error(w, msg, http.StatusBadRequest)
		return
	}

	if admission.Request == nil {
		klog.Error(fmt.Sprintf("admission review can't be used: Request field is nil"))
		http.Error(w, fmt.Errorf("admission review can't be used: Request field is nil").Error(), http.StatusBadRequest)
		return
	}

	switch strings.Split(r.RequestURI, "?")[0] {
	case "/mutate":
		req := admission.Request
		var admissionResp v1admission.AdmissionReview
		admissionResp.APIVersion = admission.APIVersion
		admissionResp.Kind = admission.Kind
		klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v Operation=%v",
			req.Kind.Kind, req.Namespace, req.Name, req.UID, req.Operation)
		switch req.Kind.Kind {
		case "Deployment":
			var (
				respstr []byte
				err     error
				deploy  appv1.Deployment
			)
			if err = json.Unmarshal(req.Object.Raw, &deploy); err != nil {
				respStructure := v1admission.AdmissionResponse{Result: &metav1.Status{
					Message: fmt.Sprintf("could not unmarshal resouces review request: %v", err),
					Code:    http.StatusInternalServerError,
				}}
				klog.Error(fmt.Sprintf("could not unmarshal resouces review request: %v", err))
				if respstr, err = json.Marshal(respStructure); err != nil {
					klog.Error(fmt.Errorf("could not unmarshal resouces review response: %v", err))
					http.Error(w, fmt.Errorf("could not unmarshal resouces review response: %v", err).Error(), http.StatusInternalServerError)
					return
				}
				http.Error(w, string(respstr), http.StatusBadRequest)
				return
			}

			current_annotations := deploy.GetAnnotations()
			pl := []patch{}
			for k, v := range current_annotations {
				pl = append(pl, patch{
					Op:   "add",
					Path: "/metadata/annotations",
					Value: map[string]string{
						k: v,
					},
				})
			}
			pl = append(pl, patch{
				Op:   "add",
				Path: "/metadata/annotations",
				Value: map[string]string{
					deploy.Name + "/Allow": "true",
				},
			})

			annotationbyte, err := json.Marshal(pl)

			if err != nil {
				http.Error(w, err.Error(), http.StatusInternalServerError)
				return
			}
			respStructure := &v1admission.AdmissionResponse{
				UID:     req.UID,
				Allowed: true,
				Patch:   annotationbyte,
				PatchType: func() *v1admission.PatchType {
					t := v1admission.PatchTypeJSONPatch
					return &t
				}(),
				Result: &metav1.Status{
					Message: fmt.Sprintf("could not unmarshal resouces review request: %v", err),
					Code:    http.StatusOK,
				},
			}
			admissionResp.Response = respStructure

			klog.Infof("sending response: %s....", admissionResp.Response.String()[:130])
			respByte, err := json.Marshal(admissionResp)
			if err != nil {
				klog.Errorf("Can't encode response messages: %v", err)
				http.Error(w, err.Error(), http.StatusInternalServerError)
			}
			klog.Infof("prepare to write response...")
			w.Header().Set("Content-Type", "application/json")
			if _, err := w.Write(respByte); err != nil {
				klog.Errorf("Can't write response: %v", err)
				http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
			}

		default:
			klog.Error(fmt.Sprintf("unsupport resouces review request type"))
			http.Error(w, "unsupport resouces review request type", http.StatusBadRequest)
		}

	case "/validate":
		req := admission.Request
		var admissionResp v1admission.AdmissionReview
		admissionResp.APIVersion = admission.APIVersion
		admissionResp.Kind = admission.Kind
		klog.Infof("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v Operation=%v",
			req.Kind.Kind, req.Namespace, req.Name, req.UID, req.Operation)
		var (
			deploy  appv1.Deployment
			respstr []byte
		)
		switch req.Kind.Kind {
		case "Deployment":
			if err = json.Unmarshal(req.Object.Raw, &deploy); err != nil {
				respStructure := v1admission.AdmissionResponse{Result: &metav1.Status{
					Message: fmt.Sprintf("could not unmarshal resouces review request: %v", err),
					Code:    http.StatusInternalServerError,
				}}
				klog.Error(fmt.Sprintf("could not unmarshal resouces review request: %v", err))
				if respstr, err = json.Marshal(respStructure); err != nil {
					klog.Error(fmt.Errorf("could not unmarshal resouces review response: %v", err))
					http.Error(w, fmt.Errorf("could not unmarshal resouces review response: %v", err).Error(), http.StatusInternalServerError)
					return
				}
				http.Error(w, string(respstr), http.StatusBadRequest)
				return
			}
		}
		al := deploy.GetAnnotations()
		respStructure := v1admission.AdmissionResponse{
			UID: req.UID,
		}
		if al[fmt.Sprintf("%s/Allow", deploy.Name)] == "true" {
			respStructure.Allowed = true
			respStructure.Result = &metav1.Status{
				Code: http.StatusOK,
			}
		} else {
			respStructure.Allowed = false
			respStructure.Result = &metav1.Status{
				Code: http.StatusForbidden,
				Reason: func() metav1.StatusReason {
					return metav1.StatusReasonForbidden
				}(),
				Message: fmt.Sprintf("the resource %s couldn't to allow entry.", deploy.Kind),
			}
		}

		admissionResp.Response = &respStructure

		klog.Infof("sending response: %s....", admissionResp.Response.String()[:130])
		respByte, err := json.Marshal(admissionResp)
		if err != nil {
			klog.Errorf("Can't encode response messages: %v", err)
			http.Error(w, err.Error(), http.StatusInternalServerError)
		}
		klog.Infof("prepare to write response...")
		w.Header().Set("Content-Type", "application/json")
		if _, err := w.Write(respByte); err != nil {
			klog.Errorf("Can't write response: %v", err)
			http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
		}
	}
}

func main() {
	var (
		cert, key string
	)

	if cert = os.Getenv("TLS_CERT"); len(cert) == 0 {
		cert = "./tls/tls.crt"
	}

	if key = os.Getenv("TLS_KEY"); len(key) == 0 {
		key = "./tls/tls.key"
	}

	ca, err := tls.LoadX509KeyPair(cert, key)
	if err != nil {
		klog.Error(err.Error())
		return
	}

	server := &http.Server{
		Addr: ":81",
		TLSConfig: &tls.Config{
			Certificates: []tls.Certificate{
				ca,
			},
		},
	}

	httpserver := http.NewServeMux()

	httpserver.HandleFunc("/validate", serve)
	httpserver.HandleFunc("/mutate", serve)
	httpserver.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
		klog.Info(fmt.Sprintf("%s %s", r.RequestURI, "pong"))
		fmt.Fprint(w, "pong")
	})
	server.Handler = httpserver

	go func() {
		if err := server.ListenAndServeTLS("", ""); err != nil {
			klog.Errorf("Failed to listen and serve webhook server: %v", err)
		}
	}()

	klog.Info("starting serve.")
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
	<-signalChan

	klog.Infof("Got shut signal, shutting...")
	if err := server.Shutdown(context.Background()); err != nil {
		klog.Errorf("HTTP server Shutdown: %v", err)
	}
}

对应的Dockerfile

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
FROM golang:alpine AS builder
MAINTAINER cylon
WORKDIR /admission
COPY ./ /admission
ENV GOPROXY https://goproxy.cn,direct
RUN \
    sed -i 's/dl-cdn.alpinelinux.org/mirrors.ustc.edu.cn/g' /etc/apk/repositories && \
    apk add upx  && \
    GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -ldflags "-s -w" -o webhook main.go && \
    upx -1 webhook && \
    chmod +x webhook

FROM alpine AS runner
WORKDIR /go/admission
COPY --from=builder /admission/webhook .
VOLUME ["/admission"]

集群内部部署所需的资源清单

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
apiVersion: v1
kind: Service
metadata:
  name: admission-webhook
  labels:
    app: admission-webhook
spec:
  ports:
    - port: 81
      targetPort: 81
  selector:
    app: simple-webhook
---
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: simple-webhook
  name: simple-webhook
spec:
  replicas: 1
  selector:
    matchLabels:
      app: simple-webhook
  template:
    metadata:
      labels:
        app: simple-webhook
    spec:
      containers:
        - image: cylonchau/simple-webhook:v0.0.2
          imagePullPolicy: IfNotPresent
          name: webhook
          command: ["./webhook"]
          env:
            - name: "TLS_CERT"
              value: "./tls/tls.crt"
            - name: "TLS_KEY"
              value: "./tls/tls.key"
            - name: NS_NAME
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: metadata.namespace
          ports:
            - containerPort: 81
          volumeMounts:
            - name: tlsdir
              mountPath: /go/admission/tls
              readOnly: true
      volumes:
        - name: tlsdir
          secret:
            secretName: webhook
---
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  name: "pod-policy.example.com"
webhooks:
  - name: "pod-policy.example.com"
    rules:
      - apiGroups:   ["apps"] # 拦截资源的Group "" 表示 core。"*" 表示所有。
        apiVersions: ["v1"] # 拦截资源的版本
        operations:  ["CREATE"] # 什么请求下拦截
        resources:   ["deployments"]  # 拦截什么资源
        scope:       "Namespaced" # 生效的范围,cluster还是namespace "*"表示没有范围限制。
    clientConfig: # 我们部署的webhook服务,
      url: "https://10.0.0.1:81/mutate"
#      service: # service是在cluster-in模式下
#        namespace: "default"
#        name: "admission-webhook"
#        port: 81 # 服务的端口
#        path: "/mutate" # path是对应用于验证的接口
      # caBundle是提供给 admission webhook CA证书
      caBundle: Put you CA (base64 encode) in here
    admissionReviewVersions: ["v1"]
    sideEffects: None
    timeoutSeconds: 5 # 1-30s直接,表示请求api的超时时间
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  name: "valipod-policy.example.com"
webhooks:
- name: "valipod-policy.example.com"
  rules:
    - apiGroups:   ["apps"] # 拦截资源的Group "" 表示 core。"*" 表示所有。
      apiVersions: ["v1"] # 拦截资源的版本
      operations:  ["CREATE"] # 什么请求下拦截
      resources:   ["deployments"]  # 拦截什么资源
      scope:       "Namespaced" # 生效的范围,cluster还是namespace "*"表示没有范围限制。
  clientConfig: # 我们部署的webhook服务,
    #      service: # service是在cluster-in模式下
    #        namespace: "default"
    #        name: "admission-webhook"
    #        port: 81 # 服务的端口
    #        path: "/mutate" # path是对应用于验证的接口
    # caBundle是提供给 admission webhook CA证书
    caBundle: Put you CA (base64 encode) in here
  admissionReviewVersions: ["v1"]
  sideEffects: None
  timeoutSeconds: 5 # 1-30s直接,表示请求api的超时时间

这里需要主义的问题

证书问题

如果需要 cluster-in ,那么则需要对对应webhookconfig资源配置 service ;如果使用的是外部部署,则需要配置对应访问地址,如:“https://xxxx:port/method”

这两种方式的证书均需要对应的 subjectAltNamecluster-in 模式 需要对应service名称,如,至少包含serviceName.NS.svc 这一个域名。

下面就是证书类问题的错误

1
Failed calling webhook, failing closed pod-policy.example.com: failed calling webhook "pod-policy.example.com": Post https://admission-webhook.default.svc:81/mutate?timeout=5s: x509: certificate signed by unknown authority (possibly because of "crypto/rsa: verification error" while trying to verify candidate authority certificate "admission-webhook-ca")

相应信息问题

上面我们了解到的APIServer是去发出 v1admission.AdmissionReview 也就是 Request 和 Response类型的,所以,为了更清晰的表示出问题所在,需要对响应格式中的 ReasonMessage 配置,这也就是我们在客户端看到的报错信息。

1
2
3
4
5
6
7
&metav1.Status{
    Code: http.StatusForbidden,
    Reason: func() metav1.StatusReason {
        return metav1.StatusReasonForbidden
    }(),
    Message: fmt.Sprintf("the resource %s couldn't to allow entry.", deploy.Kind),
}

通过上面的设置用户可以看到下列错误

1
2
$ kubectl apply -f nginx.yaml 
Error from server (Forbidden): error when creating "nginx.yaml": admission webhook "valipod-policy.example.com" denied the request: the resource Deployment couldn't to allow entry.

注:必须的参数还包含,UID,allowed,这两个是必须的,上面阐述的只是对用户友好的提示信息

下面的报错就是对相应格式设置错误

1
Error from server (InternalError): error when creating "nginx.yaml": Internal error occurred: failed calling webhook "pod-policy.example.com": the server rejected our request for an unknown reason

相应信息版本问题

相应信息也需要指定一个版本,这个与请求来的结构中拿即可

1
2
admissionResp.APIVersion = admission.APIVersion
admissionResp.Kind = admission.Kind

下面是没有为对应相应信息配置对应KV的值出现的报错

1
Error from server (InternalError): error when creating "nginx.yaml": Internal error occurred: failed calling webhook "pod-policy.example.com": expected webhook response of admission.k8s.io/v1, Kind=AdmissionReview, got /, Kind=

关于patch

kubernetes中patch使用的是特定的规范,如 jsonpatch

kubernetes当前唯一支持的 patchTypeJSONPatch。 有关更多详细信息,请参见 JSON patch

对于 jsonpatch 是一个固定的类型,在go中必须定义其结构体

1
2
3
4
5
{
	"op": "add", // 做什么操作
	"path": "/spec/replicas", // 操作的路径
	"value": 3 // 对应添加的key value
}

下面就是字符串类型设置为布尔型产生的报错

1
Error from server (InternalError): error when creating "nginx.yaml": Internal error occurred: v1.Deployment.ObjectMeta: v1.ObjectMeta.Annotations: ReadString: expects " or n, but found t, error found in #10 byte of ...|t/Allow":true},"crea|..., bigger context ...|tadata":{"annotations":{"nginx-deployment/Allow":true},"creationTimestamp":null,"managedFields":[{"m|..

准备证书

Ubuntu

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
touch ./demoCAindex.txt
touch ./demoCA/serial 
touch ./demoCA/crlnumber
echo 01 > ./demoCA/serial
mkdir ./demoCA/newcerts

openssl genrsa -out cakey.pem 2048

openssl req -new \
	-x509 \
	-key cakey.pem \
	-out cacert.pem \
	-days 3650 \
	-subj "/CN=admission webhook ca"

openssl genrsa -out tls.key 2048

openssl req -new \
	-key tls.key \
	-subj "/CN=admission webhook client" \
	-reqexts webhook \
	-config <(cat /etc/ssl/openssl.cnf \
	<(printf "[webhook]\nsubjectAltName=DNS: admission-webhook, DNS: admission-webhook.default.svc, DNS: admission-webhook.default.svc.cluster.local, IP:10.0.0.1,  IP:10.0.0.4")) \
	-out tls.csr

sed -i 's/= match/= optional/g' /etc/ssl/openssl.cnf

openssl ca \
	-in tls.csr \
	-cert cacert.pem \
	-keyfile cakey.pem \
	-out tls.crt \
	-days 300 \
	-extensions webhook \
	-extfile <(cat /etc/ssl/openssl.cnf \
    <(printf "[webhook]\nsubjectAltName=DNS: admission-webhook, DNS: admission-webhook.default.svc, DNS: admission-webhook.default.svc.cluster.local, IP:10.0.0.1,  IP:10.0.0.4"))

CentOS

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
touch /etc/pki/CA/index.txt
touch /etc/pki/CA/serial # 下一个要颁发的编号 16进制
touch /etc/pki/CA/crlnumber
echo 01 > /etc/pki/CA/serial

openssl req -new \
	-x509 \
	-key cakey.pem \
	-out cacert.pem \
	-days 3650 \
	-subj "/CN=admission webhook ca"

openssl genrsa -out tls.key 2048

openssl req -new \
	-key tls.key \
	-subj "/CN=admission webhook client" \
	-reqexts webhook \
	-config <(cat /etc/pki/tls/openssl.cnf \
	<(printf "[webhook]\nsubjectAltName=DNS: admission-webhook, DNS: admission-webhook.default.svc, DNS: admission-webhook.default.svc.cluster.local, IP:10.0.0.1,  IP:10.0.0.4")) \
	-out tls.csr

sed -i 's/= match/= optional/g' /etc/ssl/openssl.cnf

openssl ca \
	-in tls.csr \
	-cert cacert.pem \
	-keyfile cakey.pem \
	-out tls.crt \
	-days 300 \
	-extensions webhook \
	-extfile <(cat /etc/pki/tls/openssl.cnf \
    <(printf "[webhook]\nsubjectAltName=DNS: admission-webhook, DNS: admission-webhook.default.svc, DNS: admission-webhook.default.svc.cluster.local, IP:10.0.0.1,  IP:10.0.0.4"))

通过部署测试结果

可以看到我们自己注入的 annotation nginx-deployment/Allow: true,在该示例中,仅为演示过程,而不是真的策略,实际环境中可以根据情况进行定制自己的策略。

结果可以看出,当在 mutating 中不通过,即缺少对应的 annotation 标签 , 则 validating 会不允许准入

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
$ kubectl describe deploy nginx-deployment
Name:                   nginx-deployment
Namespace:              default
CreationTimestamp:      Mon, 11 Jul 2022 20:25:16 +0800
Labels:                 <none>
Annotations:            deployment.kubernetes.io/revision: 1
                        nginx-deployment/Allow: true
Selector:               app=nginx
Replicas:               1 desired | 1 updated | 1 total | 1 available | 0 unavailable
StrategyType:           RollingUpdate
MinReadySeconds:        0
RollingUpdateStrategy:  25% max unavailable, 25% max surge
Pod Template:
  Labels:  app=nginx
  Containers:
   nginx:
    Image:        nginx:1.14.2

Reference

extensible admission controllers

K8S client-go Patch example

admission controllers response

a guide to kubernetes admission controllers

本文发布于Cylon的收藏册,转载请著名原文链接~

链接:深入理解Kubernetes 4A - Admission Control源码解析

版权:本作品采用「署名-非商业性使用-相同方式共享 4.0 国际」 许可协议进行许可。