如何从零开始编写一个Kubernetes CRD

Kubernetes的自定义资源定义(Custom Resource Definition,CRD)是一种扩展Kubernetes API的机制,允许在不改变代码的情况下管理自定义对象。CRD是Kubernetes v1.7+引入的,用于替代在v1.8中被移除的ThirdPartyResources (TPR)。

在使用CRD扩展Kubernetes API时,通常需要一个控制器来处理新资源的创建和进一步处理。Kubernetes官方的sample-controller项目提供了一个实现CRD控制器的例子,包括注册新的自定义资源类型(Foo),创建/获取/列出新类型的资源,以及处理创建/更新/删除事件。

在编写CRD控制器之前,建议使用Kubernetes提供的代码生成工具来生成必要的客户端,通知器,列表器和深度复制函数。这个过程可以通过官方项目提供的代码生成脚本来简化。代码生成工具只需要一个shell脚本调用和一些代码注释,就可以生成必要的代码,减少错误和工作量

以下是一个简单的Go代码示例,用于创建一个CRD:

package main

import (
    apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
    apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func main() {
    // 创建一个API扩展客户端
    clientset, err := apiextensionsclient.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    // 定义一个新的CRD对象
    crd := &apiextensionsv1beta1.CustomResourceDefinition{
        ObjectMeta: metav1.ObjectMeta{Name: "foos.samplecontroller.k8s.io"},
        Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
            Group:   "samplecontroller.k8s.io",
            Version: "v1alpha1",
            Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
                Plural:     "foos",
                Singular:   "foo",
                Kind:       "Foo",
                ShortNames: []string{"foo"},
            },
            Scope: apiextensionsv1beta1.NamespaceScoped,
        },
    }

    // 使用API扩展客户端创建CRD
    _, err = clientset.ApiextensionsV1beta1().CustomResourceDefinitions().Create(crd)
    if err != nil {
        panic(err.Error())
    }
}

这段代码首先创建了一个API扩展客户端,然后定义了一个新的CRD对象,最后使用API扩展客户端创建了这个CRD。这个CRD定义了一个名为”Foo”的新资源类型,属于”samplecontroller.k8s.io”这个API组,版本为”v1alpha1”,在命名空间范围内有效。

在Kubernetes中,CRD(CustomResourceDefinition)本身只是一个数据模式,而CRD控制器负责实现所需的功能。控制器会监听CRD实例(以及关联的资源)的CRUD事件,然后执行相应的业务逻辑。

apiVersion: apiextensions.k8s.io/v1beta1
  kind: CustomResourceDefinition
  metadata:
    # name must match the spec fields below, and be in the form: <plural>.<group>
    name: crontabs.stable.example.com
  spec:
    # group name to use for REST API: /apis/<group>/<version>
    group: stable.example.com
    # list of versions supported by this CustomResourceDefinition
    version: v1beta1
    # either Namespaced or Cluster
    scope: Namespaced
    names:
      # plural name to be used in the URL: /apis/<group>/<version>/<plural>
      plural: crontabs
      # singular name to be used as an alias on the CLI and for display
      singular: crontab
      # kind is normally the CamelCased singular type. Your resource manifests use this.
      kind: CronTab
      # shortNames allow shorter string to match your resource on the CLI
      shortNames:
      - ct

通过kubectl create -f crd.yaml可以创建一个CRD。

下面是一个简单的Go代码示例,用于创建一个CRD的控制器

package main

import (
    "fmt"
    "time"

    "k8s.io/apimachinery/pkg/fields"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/api/core/v1"
)

type Controller struct {
    indexer  cache.Indexer
    queue    workqueue.RateLimitingInterface
    informer cache.Controller
}

func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
    return &Controller{
        informer: informer,
        indexer:  indexer,
        queue:    queue,
    }
}

func (c *Controller) processNextItem() bool {
    // Wait until there is a new item in the working queue
    key, quit := c.queue.Get()
    if quit {
        return false
    }
    // Tell the queue that we are done with processing this key. This unblocks the key for other workers
    // This allows safe parallel processing because two CRDs with the same key are never processed in
    // parallel.
    defer c.queue.Done(key)

    // Invoke the method containing the business logic
    err := c.syncToStdout(key.(string))
    // Handle the error if something went wrong during the execution of the business logic
    c.handleErr(err, key)
    return true
}

func (c *Controller) syncToStdout(key string) error {
    obj, exists, err := c.indexer.GetByKey(key)
    if err != nil {
        fmt.Errorf("Fetching object with key %s from store failed with %v", key, err)
        return err
    }

    if !exists {
        // Below we will warm up our cache with a CronTab, so that we will see a delete for one CronTab
        fmt.Printf("CronTab %s does not exist anymore\n", key)
    } else {
        // Note that you also have to check the uid if you have a local controlled resource, which
        // is dependent on the actual instance, to detect that a CronTab was recreated with the same name
        fmt.Printf("Sync/Add/Update for CronTab %s\n", obj.(*v1.Pod).GetName())
    }
    return nil
}

func (c *Controller) handleErr(err error, key interface{}) {
    if err == nil {
        // Forget about the #AddRateLimited history of the key on every successful synchronization.
        // This ensures that future processing of updates for this key is not delayed because of
        // an outdated error history.
        c.queue.Forget(key)
        return
    }

    // This controller retries 5 times if something goes wrong. After that, it stops trying.
    if c.queue.NumRequeues(key) < 5 {
        fmt.Errorf("Error syncing CronTab %v: %v", key, err)

        // Re-enqueue the key rate limited. Based on the rate limiter在Kubernetes中,CRD(CustomResourceDefinition)本身只是一个数据模式,而CRD控制器负责实现所需的功能[[2](https://www.sobyte.net/post/2022-03/k8s-crd-controller/)]。控制器会监听CRD实例(以及关联的资源)的CRUD事件,然后执行相应的业务逻辑。

Tags: Kubernetes
Share: X (Twitter) Facebook LinkedIn