DEV Community

Cover image for How to Write a Kubernetes Operator Using client-go
Chi-Sheng Liu
Chi-Sheng Liu

Posted on • Originally published at chishengliu.com

How to Write a Kubernetes Operator Using client-go

What is client-go?

client-go is the official Golang client for Kubernetes, responsible for interacting with the Kubernetes API server using REST API. In fact, client-go can do almost anything, not just for writing operators. Even the internal implementation of kubectl is based on client-go. As for more specialized frameworks used to write operators, including controller-runtime, kubebuilder, and operator-sdk, they will be introduced later in this series.

Introduction to Sample Controller Mechanism

sample-controller is an official Kubernetes example operator implemented using client-go.

To understand the code, we need to first understand how the operator we write interacts with client-go. The explanation here is a simplified version of the official documentation.

client-go-controller-interaction

The above image comes from the official documentation and can also be found in many tutorials online.

The upper part of the image shows the internal components of client-go. It looks complicated, with terms like Reflector, Informer, and Indexer, but actually, you only need to understand Informer. The main function of Informer is to notify us when the status of resources changes. Why don’t we just hit the API directly to the Kubernetes API server? This is because calling the API is an expensive operation, and so the Informer maintains an Informer Cache to reduce the number of requests to the API server.

The lower part shows what we need to write ourselves:

  • Resource Event Handlers: When Informer notifies us of a change in a resource’s status, we decide what to do, which usually means putting its key (namespace + name) into the workqueue.
  • Workqueue: This stores the keys of all objects waiting to be processed. Our operator constantly retrieves items from the workqueue and tries to bring the cluster to the desired state. If it fails, the object key may need to be added back to the workqueue for further processing.

Sample Controller Codebase Walkthrough

Defining the CRD

In register.go, the GroupName is defined, and in v1alpha1/types.go, the type for the CRD is defined. You can see that it defines a Foo resource as follows:

// Foo is a specification for a Foo resource
type Foo struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   FooSpec   `json:"spec"`
    Status FooStatus `json:"status"`
}

// FooSpec is the spec for a Foo resource
type FooSpec struct {
    DeploymentName string `json:"deploymentName"`
    Replicas       *int32 `json:"replicas"`
}

// FooStatus is the status for a Foo resource
type FooStatus struct {
    AvailableReplicas int32 `json:"availableReplicas"`
}
Enter fullscreen mode Exit fullscreen mode

Apart from the basic TypeMeta and ObjectMeta, it defines Spec and Status. Spec is where users can input data, defining the "desired state of the resource." Status is where our Operator writes values, representing the "current state of the resource."

The Sample Controller uses Kubernetes' code-generator to generate typed clients, informers, listers, and deep-copy functions for the CRD. So whenever you modify types.go, you need to run ./hack/update-codegen.sh to regenerate the code.

Program Entrypoint

Next, look at main.go, which is the entry point of the program. It's actually very simple, just pay attention to these lines:

kubeClient, err := kubernetes.NewForConfig(cfg)
exampleClient, err := clientset.NewForConfig(cfg)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(ctx, kubeClient, exampleClient,
    kubeInformerFactory.Apps().V1().Deployments(),
    exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
controller.Run(ctx, 2)
Enter fullscreen mode Exit fullscreen mode

Basically, it creates clients and informers for both Kubernetes built-in resources and our custom Foo resource, then passes them to NewController, and finally calls controller.Run.

Main Logic

Now, let’s examine the main part: controller.go.

fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: controller.enqueueFoo,
    UpdateFunc: func(old, new interface{}) {
        controller.enqueueFoo(new)
    },
})

deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: controller.handleObject,
    UpdateFunc: func(old, new interface{}) {
        newDepl := new.(*appsv1.Deployment)
        oldDepl := old.(*appsv1.Deployment)
        if newDepl.ResourceVersion == oldDepl.ResourceVersion {
            // Periodic resync will send update events for all known Deployments.
            // Two different versions of the same Deployment will always have different RVs.
            return
        }
        controller.handleObject(new)
    },
    DeleteFunc: controller.handleObject,
})
Enter fullscreen mode Exit fullscreen mode

This part shows the event handler we talked about earlier, where you can register AddFunc, UpdateFunc, and DeleteFunc. When the informer detects a change in the resource, it will call the corresponding function. You can see that for fooInformer, it simply calls enqueueFoo, while for deploymentInformer, it calls handleObject.

func (c *Controller) enqueueFoo(obj interface{}) {
    if objectRef, err := cache.ObjectToName(obj); err != nil {
        utilruntime.HandleError(err)
        return
    } else {
        c.workqueue.Add(objectRef)
    }
}
Enter fullscreen mode Exit fullscreen mode

enqueueFoo is just adding the key of the Foo object to the workqueue. You can see here:

func (c *Controller) handleObject(obj interface{}) {
    ...
    if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
        // If this object is not owned by a Foo, we should not do anything more
        // with it.
        if ownerRef.Kind != "Foo" {
            return
        }

        foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
        if err != nil {
            logger.V(4).Info("Ignore orphaned object", "object", klog.KObj(object), "foo", ownerRef.Name)
            return
        }

        c.enqueueFoo(foo)
        return
    }
}
Enter fullscreen mode Exit fullscreen mode

This is part of the handleObject function. It checks whether the owner of the deployment is Foo. If it’s not, we ignore it. If it is, we add the corresponding Foo key to the workqueue. This relates to a concept called OwnerReference, where certain objects in Kubernetes are owned by others. The default behavior is that when the owner is deleted, the owned objects are also deleted. For example, a ReplicaSet is the owner of Pods, so when the ReplicaSet is deleted, the Pods it manages are also deleted. This is also why there is no DeleteFunc handler for fooInformer — when Foo is deleted, we want to delete all corresponding deployments, but since the owner of the deployment is already set to Foo, they will be deleted automatically without further handling.

func (c *Controller) Run(ctx context.Context, workers int) error {
    ...
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    }
    ...
}

func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextWorkItem(ctx) {
    }
}
Enter fullscreen mode Exit fullscreen mode

Run is the entry point called by the controller in main.go. It starts multiple goroutines to run runWorker. runWorker is simply an infinite loop calling processNextWorkItem.

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    objRef, shutdown := c.workqueue.Get()

    ...

    // Run the syncHandler, passing it the structured reference to the object to be synced.
    err := c.syncHandler(ctx, objRef)
    if err == nil {
        c.workqueue.Forget(objRef)
        logger.Info("Successfully synced", "objectName", objRef)
        return true
    }
    utilruntime.HandleErrorWithContext(ctx, err, "Error syncing;

 requeuing for later retry", "objectReference", objRef)
    c.workqueue.AddRateLimited(objRef)
    return true
}
Enter fullscreen mode Exit fullscreen mode

This is a portion of processNextWorkItem. First, it retrieves an object key from the workqueue, then calls syncHandler to handle it. If successful, it removes it from the workqueue. Otherwise, it performs error handling and puts the key back into the workqueue for later processing.

func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
    ...

    // Get the Foo resource with this namespace/name
    foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)

    ...

    deploymentName := foo.Spec.DeploymentName

    ...

    // Get the deployment with the name specified in Foo.spec
    deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
    // If the resource doesn't exist, we'll create it
    if errors.IsNotFound(err) {
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(context.TODO(), newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager})
    }

    if err != nil {
        return err
    }

    // If the Deployment is not controlled by this Foo resource, we should log
    // a warning to the event recorder and return error msg.
    if !metav1.IsControlledBy(deployment, foo) {
        msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
        c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
        return fmt.Errorf("%s", msg)
    }

    // If this number of the replicas on the Foo resource is specified, and the
    // number does not equal the current desired replicas on the Deployment, we
    // should update the Deployment resource.
    if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
        logger.V(4).Info("Update deployment resource", "currentReplicas", *foo.Spec.Replicas, "desiredReplicas", *deployment.Spec.Replicas)
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(context.TODO(), newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager})
    }

    if err != nil {
        return err
    }

    // Finally, we update the status block of the Foo resource to reflect the
    // current state of the world
    err = c.updateFooStatus(foo, deployment)
    if err != nil {
        return err
    }

    c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Finally, this is a portion of syncHandler. Here is where we write the actual logic, adjusting the cluster to match the desired state declared by the user in the Spec. The desired state in this case is that the deployment specified in Spec has been created and that the replica count matches what is declared in Spec.

Conclusion

After going through this, you may feel that I’ve only covered a small part of the Sample Controller code. That’s because client-go is a rather low-level library, and there are some downsides to using it for writing operators:

  • We don’t need to write much custom logic, but still have to write some boilerplate, which can feel redundant.
  • When watching to different resources, we need to declare informers, listers, and other repetitive things for each resource. For example, in the Sample Controller, both fooInformer and deploymentInformer are declared, and managing multiple resources becomes cumbersome.

These drawbacks have led to the development of other frameworks that are more specialized for writing operators, such as controller-runtime, kubebuilder, and operator-sdk. Stay tuned for future articles in this series to learn about these frameworks.

References

Top comments (0)