Golang: How to Watch for Changes to Custom Resources

Golang: How to Watch for Changes to Custom Resources
watch for changes to custom resources golang

In the rapidly evolving landscape of cloud-native development, Kubernetes has emerged as the de facto operating system for the cloud, orchestrating containers with unparalleled efficiency. A cornerstone of Kubernetes' extensibility is its Custom Resource Definition (CRD) mechanism, allowing users to define their own API objects, extending the Kubernetes API beyond its built-in types. While defining these custom resources (CRs) is straightforward, the real power unlocks when applications can dynamically react to their creation, updates, or deletions. This is where the art of "watching for changes" comes into play, particularly when building sophisticated operators and controllers in Go.

This comprehensive guide will delve deep into the methodologies and best practices for developing Go-based applications that reliably and efficiently observe and respond to changes in Custom Resources. We will explore the fundamental components of the client-go library, dissect the event-driven architecture, and provide a detailed, step-by-step implementation, ensuring you gain a mastery over building resilient and scalable Kubernetes operators.

The Foundation: Understanding Kubernetes Custom Resources

Before we embark on the journey of watching, it's crucial to grasp what Custom Resources are and why they are so vital to the Kubernetes ecosystem. At its core, Kubernetes is a declarative system. You describe the desired state of your applications and infrastructure using YAML or JSON manifests, and Kubernetes works tirelessly to make the actual state match your desired state. While Kubernetes provides a rich set of built-in resources like Pods, Deployments, Services, and Ingresses, complex applications often require domain-specific abstractions that these native types cannot fully capture.

This is precisely where Custom Resources step in. A Custom Resource Definition (CRD) is a powerful Kubernetes object that allows cluster administrators to define new, entirely custom resource types. Once a CRD is registered with the Kubernetes API server, users can create instances of these custom resources, just like they would create a Pod or a Deployment. These custom resources become first-class citizens in the Kubernetes API, meaning they can be managed with kubectl, stored in etcd, and watched for changes.

Why are Custom Resources Indispensable?

The utility of Custom Resources spans a multitude of use cases, fundamentally transforming how we build and operate applications on Kubernetes:

  1. Domain-Specific Abstractions: They enable developers to model complex application configurations or infrastructure components directly within the Kubernetes API. For instance, you might define a DatabaseCluster CR to represent a high-availability database setup, encapsulating its replicas, storage, and backup policies. Or, a CDNConfig CR could manage content delivery network settings for a specific service.
  2. Operator Pattern Enablement: Custom Resources are the bedrock of the Kubernetes Operator pattern. An Operator is a method of packaging, deploying, and managing a Kubernetes application. Operators extend the Kubernetes API and automate the lifecycle of applications beyond simple stateless deployments. They achieve this by watching for changes to specific Custom Resources and then performing complex, application-specific actions to reconcile the desired state (defined by the CR) with the actual state. This includes provisioning external resources, scaling, backups, and upgrades.
  3. Simplified Management for End-Users: By providing higher-level abstractions, CRs simplify the interaction for application developers or platform engineers. Instead of dealing with numerous low-level Kubernetes objects (Deployments, Services, ConfigMaps, Secrets, PVCs), they can interact with a single, intuitive Custom Resource that represents their application's specific needs. For example, instead of configuring an Ingress, a Service, and a Pod template, a developer might simply create an ApplicationRoute CR.
  4. Extensibility and Ecosystem Integration: CRDs allow third-party tools and services to integrate seamlessly with Kubernetes. An external api gateway or a secret management system, for instance, might expose its configurations or resources as Custom Resources within Kubernetes, enabling a unified control plane. This is especially relevant in contexts where custom configurations are required for an api service to be exposed securely and efficiently.

Consider a scenario where you're building a sophisticated api gateway for your microservices infrastructure. Instead of manually configuring routing rules, security policies, and rate limits for each api, you could define a GatewayConfig Custom Resource. This CR would encapsulate all the necessary parameters for a specific API endpoint, such as its upstream service, path matching rules, authentication requirements, and traffic shaping policies. An operator designed to watch these GatewayConfig CRs would then translate these declarative definitions into the actual configuration for the underlying api gateway. This approach brings immense flexibility, automation, and a single source of truth for your API configurations. For enterprises looking for robust, open-source solutions to manage their APIs, especially in AI-driven environments, platforms like APIPark offer comprehensive api gateway and API management capabilities, allowing for unified control over various API services, including those defined by custom resources. By using Custom Resources to define how APIs are exposed and managed, developers can integrate APIPark's powerful features directly into their Kubernetes-native workflows, benefiting from its quick integration of 100+ AI models, unified API format for AI invocation, and end-to-end API lifecycle management, all orchestrated through familiar Kubernetes constructs.

The Core Problem: How to Detect Changes Efficiently?

Once you have defined your Custom Resources and instances of them are flowing into your Kubernetes cluster, the next logical step is to build an application that can react to their changes. This is not a trivial task, especially in a dynamic and distributed environment like Kubernetes. Several approaches exist, each with its own trade-offs:

  1. Direct Polling: The simplest, yet often least efficient, method is to periodically query the Kubernetes API server for the current state of a resource. This involves making GET requests to the /apis/<group>/<version>/<plural> endpoint at regular intervals.
    • Pros: Easy to implement for basic needs.
    • Cons:
      • Inefficiency: Generates unnecessary load on the API server, especially for frequently polling multiple resources.
      • Latency: Changes are only detected on the next poll interval, leading to potential delays in reaction.
      • Resource Inconsistency: It's hard to guarantee that you haven't missed intermediate changes between polls.
  2. Long Polling/Watch API: Kubernetes provides a dedicated "watch" api endpoint that allows clients to establish a persistent connection with the API server. When a change occurs to a watched resource, the API server pushes an event (ADD, UPDATE, DELETE) over this connection.
    • Pros: Real-time event delivery, much more efficient than polling, significantly lower latency.
    • Cons:
      • Connection Management: Clients need to handle connection drops, retries, and resuming watches from the correct resource version.
      • State Management: If a watch connection drops for an extended period, the client might miss events. Reconciling the local state with the actual cluster state after a reconnection can be complex.
      • Scalability: While better than polling, a large number of independent watch connections for the same resource type can still strain the API server and etcd.

These challenges highlight the need for a more robust and scalable solution, which leads us to the client-go library's powerful informer pattern.

The Golang client-go Library: Your Gateway to Kubernetes

Golang's client-go library is the official client library for interacting with the Kubernetes API from Go applications. It provides high-level abstractions over the raw HTTP api calls, simplifying tasks like authentication, resource serialization/deserialization, and event watching. For building Kubernetes operators and controllers, client-go is an indispensable tool.

Key Components of client-go for Watching Resources

client-go introduces a robust pattern for watching resources, centered around the concept of "informers." Informers abstract away the complexities of the watch api, connection management, and local caching, providing a reliable and efficient way for your controller to receive notifications about resource changes.

The informer pattern typically involves three core components working in harmony:

  1. Reflector: This component is responsible for watching a specific resource type. It establishes a long-lived watch connection to the Kubernetes API server and, when the connection drops, it re-establishes it and performs a "list" operation to ensure its local cache is up-to-date. The Reflector also handles the initial list operation to populate the cache. It continuously monitors the Kubernetes API server for ADD, UPDATE, and DELETE events for the resources it's configured to watch. It intelligently manages resource versions to ensure it doesn't miss events or process duplicates.
  2. DeltaFIFO / Store: The Reflector feeds the events it receives into a queue, often a DeltaFIFO (First-In, First-Out queue that tracks changes or "deltas"). The DeltaFIFO then pushes these events into a local, in-memory cache, known as the Store. This Store serves as a consistent, up-to-date replica of the watched resources from the API server. This local cache is crucial because it allows controllers to retrieve resource objects quickly without constantly hitting the API server, significantly reducing api server load and improving performance.
  3. Controller (Your Application Logic): Your actual controller logic interacts with the Store and registers event handlers with the informer. When an event (ADD, UPDATE, DELETE) occurs and is processed by the Reflector and DeltaFIFO, the informer invokes the corresponding event handler in your controller. Your controller then typically adds the key (namespace/name) of the affected resource to a work queue, which decouples event reception from event processing, preventing backpressure on the informer and api server.

This entire mechanism is encapsulated within client-go's SharedInformerFactory.

SharedInformerFactory: The Orchestrator

The SharedInformerFactory is the central component that orchestrates the creation and management of informers for multiple resource types within your application.

  • Sharing: Its primary benefit is "sharing." If multiple controllers within your application need to watch the same resource type (e.g., both a Pod scaler and a network policy controller need to know about Pods), they can all use the same informer provided by the SharedInformerFactory. This means only one Reflector and one Watch api connection are established to the API server for that resource type, drastically reducing resource consumption and api server load compared to each controller maintaining its own informer.
  • Starting and Stopping: The SharedInformerFactory provides methods to Start() all managed informers concurrently and to WaitForCacheSync() to ensure all caches are populated before your controllers begin processing events, preventing race conditions where controllers might try to retrieve non-existent resources from an empty cache.
  • Typed vs. Dynamic Informers:
    • Typed Informers: For built-in Kubernetes resources (Pods, Deployments) and Custom Resources for which you have generated Go types (using tools like controller-gen), client-go provides strongly typed informers. These return actual Go structs (e.g., *v1.Pod or *v1alpha1.MyCustomResource), offering compile-time safety and easier field access.
    • Dynamic Informers: When you need to watch Custom Resources for which you don't have generated Go types (perhaps because they are defined by a third-party, or you want to build a generic controller), client-go offers dynamic.SharedInformerFactory. This returns unstructured runtime.Unstructured objects, requiring manual type assertion or JSON marshaling/unmarshaling to access fields. While less type-safe, it offers maximum flexibility.

Interacting with the Kubernetes API: Clients

Beyond informers, client-go provides various client types for making direct API calls (GET, POST, PUT, DELETE, PATCH) to the Kubernetes API server:

  1. Clientset: The kubernetes.Clientset is the primary client for interacting with built-in Kubernetes resources. It provides strongly typed methods for each resource, like clientset.AppsV1().Deployments("namespace").Get(...).
  2. CRD Client: For strongly typed Custom Resources, once you've generated Go types, you'll create a dedicated client for your CRD. This client is similar to the Clientset but specific to your custom api group and version. It also offers type-safe operations.
  3. Dynamic Client: The dynamic.Interface is a powerful, generic client that can interact with any resource in Kubernetes, whether built-in or custom, without needing generated Go types. It operates on runtime.Unstructured objects, making it ideal for generic controllers or when resource types are not known at compile time. This is particularly useful for building api gateway components that need to adapt to various custom api definitions.
  4. RESTClient: The lowest-level client in client-go, rest.RESTClient allows you to make raw HTTP requests to the Kubernetes API, providing maximum control but requiring more manual handling of URLs, verbs, and serialization. This is generally not recommended for day-to-day controller development unless specific, highly custom api interactions are required.

For watching Custom Resources, the combination of SharedInformerFactory (potentially for your custom resource types) and your specific CRD client (for operations like updating the CR's status) will be your most frequent companions.

APIPark is a high-performance AI gateway that allows you to securely access the most comprehensive LLM APIs globally on the APIPark platform, including OpenAI, Anthropic, Mistral, Llama2, Google Gemini, and more.Try APIPark now! πŸ‘‡πŸ‘‡πŸ‘‡

Step-by-Step Implementation: Building a Golang Operator to Watch Custom Resources

Let's walk through building a simple Kubernetes operator in Go that watches for changes to a custom resource named MyResource. This operator will demonstrate how to define a CRD, generate Go types, set up informers, and handle events.

Our hypothetical MyResource will have a single string field in its spec and a status field to indicate its processing state.

1. Project Setup and Dependencies

First, initialize your Go module and add necessary client-go dependencies.

mkdir my-operator
cd my-operator
go mod init github.com/your-org/my-operator
go get k8s.io/client-go@v0.29.0 # Use a stable version suitable for your K8s cluster
go get sigs.k8s.io/controller-runtime/pkg/manager@v0.17.0 # Useful for boilerplate
go get sigs.k8s.io/controller-tools/cmd/controller-gen@v0.14.0 # For generating types and CRDs

The controller-gen tool is crucial for generating Kubernetes-specific code, including CRD definitions and Go types from your struct definitions.

2. Define Your Custom Resource Definition (CRD)

Create a directory for your API types, e.g., api/v1alpha1/.

mkdir -p api/v1alpha1

Now, define your MyResource Go struct in api/v1alpha1/myresource_types.go:

package v1alpha1

import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// MyResource is the Schema for the myresources API
type MyResource struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   MyResourceSpec   `json:"spec,omitempty"`
    Status MyResourceStatus `json:"status,omitempty"`
}

// MyResourceSpec defines the desired state of MyResource
type MyResourceSpec struct {
    Message string `json:"message,omitempty"`
}

// MyResourceStatus defines the observed state of MyResource
type MyResourceStatus struct {
    Phase   string `json:"phase,omitempty"`
    Message string `json:"message,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// MyResourceList contains a list of MyResource
type MyResourceList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []MyResource `json:"items"`
}

Annotations Explained: * +genclient: This annotation tells controller-gen to generate a client for this resource type. * +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object: This generates DeepCopy methods, essential for safe concurrency in client-go caches and other components. * metav1.TypeMeta and metav1.ObjectMeta: These are standard Kubernetes metadata fields.

3. Generate Code: CRD, DeepCopy, and Client

Now, use controller-gen to generate the necessary boilerplate code. From your project root:

go mod tidy
go generate ./...

You'll need a zz_generated.deepcopy.go file for deep copy methods and a doc.go to define +groupName for the API.

Create api/v1alpha1/doc.go:

// Package v1alpha1 contains API Schema definitions for the example v1alpha1 API group
// +kubebuilder:object:generate=true
// +groupName=example.com
package v1alpha1

Now, set up a go generate command in your go.mod or a Makefile. For simplicity, add a //go:generate directive to a Go file (e.g., main.go later) or create a Makefile:

# Makefile
.PHONY: generate manifests

generate:
    controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
    controller-gen crd:crdVersions=v1 output:crd:dir=config/crd paths="./..."

manifests: generate

And create a dummy hack/boilerplate.go.txt (or copy from kubernetes/kubernetes repo):

/*
Copyright The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

Run make generate. This will: * Generate api/v1alpha1/zz_generated.deepcopy.go. * Generate client code in pkg/client/. * Generate the CRD YAML manifest in config/crd/example.com_myresources.yaml.

You can now apply this CRD to your cluster: kubectl apply -f config/crd/example.com_myresources.yaml.

4. Implement the Controller Logic

This is the heart of your operator. We'll define a Controller struct, a Run method, and a syncHandler function.

main.go: Entry point for our operator.

package main

import (
    "context"
    "flag"
    "fmt"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/your-org/my-operator/pkg/controller"
    "github.com/your-org/my-operator/pkg/generated/clientset/versioned"
    "github.com/your-org/my-operator/pkg/generated/informers/externalversions"

    kubeinformers "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/klog/v2"
)

func main() {
    klog.InitFlags(nil)
    defer klog.Flush()

    var masterURL string
    var kubeconfig string
    flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
    flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
    flag.Parse()

    // 1. Build Kubernetes client configuration
    cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
    if err != nil {
        klog.Fatalf("Error building kubeconfig: %s", err.Error())
    }

    // 2. Create standard Kubernetes clientset
    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }

    // 3. Create our Custom Resource clientset
    myResourceClient, err := versioned.NewForConfig(cfg)
    if err != nil {
        klog.Fatalf("Error building custom resource clientset: %s", err.Error())
    }

    // 4. Set up informers
    // Standard K8s informers (e.g., Pods, Deployments)
    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
    // Custom Resource informers (MyResource)
    myResourceInformerFactory := externalversions.NewSharedInformerFactory(myResourceClient, time.Second*30)

    // Create and run our controller
    myController := controller.NewController(
        kubeClient,
        myResourceClient,
        kubeInformerFactory.Apps().V1().Deployments(), // Example: If our controller needs to watch Deployments
        myResourceInformerFactory.Example().V1alpha1().MyResources(),
    )

    // Context for graceful shutdown
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Set up OS signal handler for graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    go func() {
        <-sigChan
        klog.Info("Received termination signal, shutting down controller...")
        cancel()
    }()

    // Start all informers
    klog.Info("Starting informers...")
    kubeInformerFactory.Start(ctx.Done())
    myResourceInformerFactory.Start(ctx.Done())

    // Wait for caches to sync
    if !kubeInformerFactory.WaitForCacheSync(ctx.Done()) {
        klog.Fatalf("Failed to sync kube informers cache")
    }
    if !myResourceInformerFactory.WaitForCacheSync(ctx.Done()) {
        klog.Fatalf("Failed to sync custom resource informers cache")
    }
    klog.Info("Informers caches synced successfully.")

    // Run the controller
    if err = myController.Run(2, ctx.Done()); err != nil { // 2 workers
        klog.Fatalf("Error running controller: %s", err.Error())
    }
    klog.Info("Controller gracefully stopped.")
}

pkg/controller/controller.go: Contains the core controller logic.

package controller

import (
    "context"
    "fmt"
    "time"

    "github.com/your-org/my-operator/api/v1alpha1"
    clientset "github.com/your-org/my-operator/pkg/generated/clientset/versioned"
    myresourceinformer "github.com/your-org/my-operator/pkg/generated/informers/externalversions/example/v1alpha1"
    myresourcelister "github.com/your-org/my-operator/pkg/generated/listers/example/v1alpha1"

    appsv1 "k8s.io/api/apps/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    appsinformers "k8s.io/client-go/informers/apps/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/kubernetes/scheme"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog/v2"
)

const controllerAgentName = "my-resource-controller"

const (
    // SuccessSynced is used as part of the Event 'reason' field
    SuccessSynced = "Synced"
    // MessageResourceSynced is the message used for Events when a resource
    // is synced successfully
    MessageResourceSynced = "MyResource synced successfully"
)

// Controller is the controller for MyResource
type Controller struct {
    kubeclientset    kubernetes.Interface
    myresourceclientset clientset.Interface

    deploymentsLister appsv1.DeploymentLister
    deploymentsSynced cache.InformerSynced
    myresourcesLister myresourcelister.MyResourceLister
    myresourcesSynced cache.InformerSynced

    // workqueue is a rate limited work queue. This is used to queue work to be
    // processed instead of performing it immediately. This allows us to group
    // together several similar events and process them batch-wise. It also ensures
    // clients are not blocked by long-running operations.
    workqueue workqueue.RateLimitingInterface
    // recorder record.EventRecorder // for emitting Kubernetes events (optional but good practice)
}

// NewController returns a new MyResource controller
func NewController(
    kubeclientset kubernetes.Interface,
    myresourceclientset clientset.Interface,
    deploymentInformer appsinformers.DeploymentInformer,
    myresourceInformer myresourceinformer.MyResourceInformer) *Controller {

    // Add MyResource to the Kubernetes client scheme (needed for event recorder)
    utilruntime.Must(v1alpha1.AddToScheme(scheme.Scheme))
    klog.V(4).Info("Creating event broadcaster")
    // eventBroadcaster := record.NewBroadcaster()
    // eventBroadcaster.StartLogging(klog.Infof)
    // eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    // recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

    controller := &Controller{
        kubeclientset:    kubeclientset,
        myresourceclientset: myresourceclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        myresourcesLister: myresourceInformer.Lister(),
        myresourcesSynced: myresourceInformer.Informer().HasSynced,
        workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MyResources"),
        // recorder:          recorder,
    }

    klog.Info("Setting up event handlers")
    // Set up an event handler for when MyResource resources change
    myresourceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueMyResource,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueMyResource(new)
        },
        DeleteFunc: controller.enqueueMyResource, // Reconcile on delete as well
    })

    // If our controller also watches other resources (e.g., Deployments that it manages)
    // deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    //  AddFunc: controller.handleObject,
    //  UpdateFunc: func(old, new interface{}) {
    //      newDepl := new.(*appsv1.Deployment)
    //      oldDepl := old.(*appsv1.Deployment)
    //      if newDepl.ResourceVersion == oldDepl.ResourceVersion {
    //          return // Objects are equal, no need to process
    //      }
    //      controller.handleObject(new)
    //  },
    //  DeleteFunc: controller.handleObject,
    // })

    return controller
}

// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until ctx.Done() is closed,
// at which point it will shutdown the workqueue and wait for workers to finish.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
    defer utilruntime.HandleCrash()
    defer c.workqueue.ShutDown()

    klog.Info("Starting MyResource controller")

    // Wait for the caches to be synced before starting workers
    klog.Info("Waiting for informer caches to sync")
    if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.myresourcesSynced); !ok {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    klog.Info("Starting workers")
    for i := 0; i < threadiness; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    klog.Info("Started workers")
    <-stopCh
    klog.Info("Shutting down workers")

    return nil
}

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message or
// event from the workqueue.
func (c *Controller) runWorker() {
    for c.processNextWorkItem() {
    }
}

// processNextWorkItem will read a single item from the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
    obj, shutdown := c.workqueue.Get()

    if shutdown {
        return false
    }

    // We wrap this block in a func so we can defer c.workqueue.Done.
    err := func(obj interface{}) error {
        defer c.workqueue.Done(obj)
        var key string
        var ok bool
        // We expect strings to come off the workqueue. These are of the
        // form namespace/name. We do this as the delayed nature of the
        // workqueue means the items in the informer cache may actually be
        // more up to date than when the item was initially put onto the
        // workqueue.
        if key, ok = obj.(string); !ok {
            // As the item in the workqueue is actually invalid, we call
            // Forget to ensure it does not get re-queued.
            c.workqueue.Forget(obj)
            utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
            return nil
        }
        // Run the syncHandler, passing the resource key to be processed.
        if err := c.syncHandler(key); err != nil {
            // Put the item back on the workqueue to handle any transient errors.
            c.workqueue.AddRateLimited(key)
            return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
        }
        // If no error occurs we Forget this item so it does not get requeued
        // no matter what.
        c.workqueue.Forget(obj)
        klog.V(4).Infof("Successfully synced '%s'", key)
        return nil
    }(obj)

    if err != nil {
        utilruntime.HandleError(err)
        return true
    }

    return true
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the MyResource resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
    // Convert the namespace/name string into a distinct namespace and name
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
        return nil
    }

    // Get the MyResource resource with this namespace/name
    myresource, err := c.myresourcesLister.MyResources(namespace).Get(name)
    if err != nil {
        // The MyResource resource may no longer exist, in which case we stop
        // processing.
        if errors.IsNotFound(err) {
            klog.V(4).Infof("MyResource '%s' in work queue no longer exists", key)
            // Here you would typically perform cleanup actions for deleted resources
            return nil
        }
        return err
    }

    klog.Infof("Processing MyResource: %s/%s with message: %s", myresource.Namespace, myresource.Name, myresource.Spec.Message)

    // Here is where your main reconciliation logic goes.
    // For demonstration, we'll just log and update the status.
    // In a real operator, you would create/update/delete other Kubernetes resources
    // (e.g., Deployments, Services) based on myresource.Spec.

    // Example: If MyResource.Spec.Message is "fail", simulate an error
    if myresource.Spec.Message == "fail" {
        c.updateMyResourceStatus(myresource, "Failed", "Processing failed as requested.")
        return fmt.Errorf("simulated failure for MyResource '%s'", key)
    }

    // Update the status of the MyResource
    err = c.updateMyResourceStatus(myresource, "Processed", fmt.Sprintf("Message '%s' handled successfully.", myresource.Spec.Message))
    if err != nil {
        return err
    }

    // c.recorder.Event(myresource, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
    return nil
}

func (c *Controller) updateMyResourceStatus(myresource *v1alpha1.MyResource, phase, message string) error {
    // NEVER modify objects from the store. It's a read-only cache.
    // Instead, copy the object, modify the copy, and send it to the API server.
    myresourceCopy := myresource.DeepCopy()
    myresourceCopy.Status.Phase = phase
    myresourceCopy.Status.Message = message

    // If the CustomResource `api` supports it, you should use the
    // `/status` subresource for status updates.
    _, err := c.myresourceclientset.ExampleV1alpha1().MyResources(myresource.Namespace).UpdateStatus(context.TODO(), myresourceCopy, metav1.UpdateOptions{})
    if err != nil {
        return fmt.Errorf("error updating status for MyResource '%s': %v", myresource.Name, err)
    }
    return nil
}

// enqueueMyResource takes a MyResource resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed objects which may be mutated afterwards.
func (c *Controller) enqueueMyResource(obj interface{}) {
    var key string
    var err error
    if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
        utilruntime.HandleError(err)
        return
    }
    c.workqueue.Add(key)
}

// handleObject will take any resource implementing metav1.Object and attempt
// to find the MyResource resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that MyResource resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
// func (c *Controller) handleObject(obj interface{}) {
//  object, ok := obj.(metav1.Object)
//  if !ok {
//      tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
//      if !ok {
//          utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
//          return
//      }
//      object, ok = tombstone.Obj.(metav1.Object)
//      if !ok {
//          utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
//          return
//      }
//      klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
//  }
//  klog.V(4).Infof("Processing object: %s", object.GetName())
//  if ownerRef := metav1.Get='MyResource-Label'; ownerRef != nil {
//      // ... logic to find parent MyResource and enqueue ...
//  }
// }

Key Components and Patterns Explained in the Controller:

  • NewController: This constructor initializes the controller.
    • It creates a workqueue.RateLimitingInterface. This queue is crucial for client-go controllers. Instead of processing events directly in the AddFunc/UpdateFunc/DeleteFunc, we add the resource's namespace/name key to this queue. This decouples event reception from processing, prevents blocking the informer, and allows for rate-limiting, retries, and concurrent processing.
    • It registers event handlers (AddFunc, UpdateFunc, DeleteFunc) with the myresourceInformer. These handlers simply call enqueueMyResource, which pushes the resource's key to the workqueue.
  • Run: This method starts the controller's main loop.
    • It calls cache.WaitForCacheSync to ensure that all informer caches (both for built-in and custom resources) are fully populated before starting the workers. This prevents controllers from trying to fetch non-existent resources from an empty cache immediately after startup.
    • It starts a configurable number of worker goroutines (threadiness) that continuously call processNextWorkItem.
  • processNextWorkItem: This function pulls an item (a resource key) from the workqueue and passes it to syncHandler. It handles successful processing by calling workqueue.Forget(obj) and transient errors by calling workqueue.AddRateLimited(key) to re-add the item to the queue with a backoff.
  • syncHandler: This is the core reconciliation logic.
    • It retrieves the MyResource object from the informer's local cache using the myresourcesLister. Using the lister is critical: it queries the local cache, not the api server, providing fast access and reducing api server load.
    • It checks for NotFound errors, indicating the resource was deleted.
    • It then performs the actual logic based on the myresource.Spec. In this example, it simply logs and updates the MyResource's status field via the myresourceclientset. In a real operator, this is where you would create, update, or delete other Kubernetes resources (e.g., a Deployment for an application, a Service for exposing an api, or a ConfigMap for api gateway configurations) to match the desired state declared in myresource.Spec.
    • Crucially, when updating a resource, especially its status, you should always work on a DeepCopy() of the object retrieved from the lister. The objects in the informer's cache are read-only and should not be mutated directly.
  • updateMyResourceStatus: A helper function to update the status of MyResource. It uses the /status subresource if available (recommended for status updates).
  • enqueueMyResource: A simple helper that gets the namespace/name key of a resource and adds it to the workqueue.

5. Building and Running the Operator

Build your Go operator:

go build -o my-operator .

You can then run it locally (if configured to connect to your cluster via kubeconfig) or containerize it for deployment to Kubernetes.

Example Dockerfile for your operator:

# Use a minimal base image
FROM alpine/git as builder

WORKDIR /src
# Copy go.mod and go.sum to download dependencies
COPY go.mod go.sum ./
RUN go mod download

# Copy the rest of the source code
COPY . .

# Generate code and manifests (if not done locally)
# RUN make generate

# Build the operator binary
RUN CGO_ENABLED=0 go build -o /my-operator -ldflags="-s -w" ./main.go

# Final image
FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /my-operator /my-operator
USER nonroot

ENTRYPOINT ["/my-operator"]

Build the Docker image:

docker build -t your-registry/my-operator:v1.0.0 .
docker push your-registry/my-operator:v1.0.0

6. Deploying the Operator to Kubernetes

You'll need a Deployment, a Service Account, and RBAC (Role-Based Access Control) permissions for your operator.

config/rbac/role.yaml: Define the necessary permissions.

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: my-operator-role
rules:
  - apiGroups: [""] # "" indicates the core API group
    resources: ["pods", "services", "configmaps", "secrets"]
    verbs: ["get", "list", "watch", "create", "update", "delete"]
  - apiGroups: ["apps"]
    resources: ["deployments"]
    verbs: ["get", "list", "watch", "create", "update", "delete"]
  - apiGroups: ["example.com"] # Your custom API group
    resources: ["myresources"]
    verbs: ["get", "list", "watch", "update", "patch"] # patch for status updates
  - apiGroups: ["example.com"]
    resources: ["myresources/status"] # for status subresource
    verbs: ["get", "update", "patch"]
  - apiGroups: ["example.com"]
    resources: ["myresources/finalizers"] # if you implement finalizers
    verbs: ["update"]

config/rbac/role_binding.yaml: Bind the role to a service account.

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: my-operator-rolebinding
subjects:
  - kind: ServiceAccount
    name: my-operator-sa
    namespace: default # Or your operator's namespace
roleRef:
  kind: ClusterRole
  name: my-operator-role
  apiGroup: rbac.authorization.k8s.io

config/rbac/service_account.yaml: Create a service account.

apiVersion: v1
kind: ServiceAccount
metadata:
  name: my-operator-sa
  namespace: default # Or your operator's namespace

config/deployment.yaml: Your operator's Deployment.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: my-operator
  namespace: default # Or your operator's namespace
spec:
  replicas: 1
  selector:
    matchLabels:
      app: my-operator
  template:
    metadata:
      labels:
        app: my-operator
    spec:
      serviceAccountName: my-operator-sa
      containers:
        - name: my-operator
          image: your-registry/my-operator:v1.0.0 # Replace with your image
          imagePullPolicy: Always
          # Add resource limits, probes as appropriate for production

Apply all these manifests:

kubectl apply -f config/rbac/service_account.yaml
kubectl apply -f config/rbac/role.yaml
kubectl apply -f config/rbac/role_binding.yaml
kubectl apply -f config/crd/example.com_myresources.yaml
kubectl apply -f config/deployment.yaml

Now, create an instance of MyResource:

# my-resource-instance.yaml
apiVersion: example.com/v1alpha1
kind: MyResource
metadata:
  name: my-test-resource
  namespace: default
spec:
  message: "Hello from Custom Resource!"
kubectl apply -f my-resource-instance.yaml

Watch your operator's logs: kubectl logs -f deployment/my-operator. You should see it processing the MyResource. You can also inspect the status of your CR: kubectl get myresource my-test-resource -o yaml.

Try updating the message:

kubectl patch myresource my-test-resource --type='json' -p='[{"op": "replace", "path": "/spec/message", "value": "Updated message!"}]'

And deleting it:

kubectl delete myresource my-test-resource

Your operator should log events for each action.

Table: Comparing Kubernetes API Interaction Methods

Feature / Method Direct client-go API Calls (e.g., clientset.Get()) client-go Informers (via SharedInformerFactory)
Primary Use Case One-off operations, imperative actions, specific queries Continuous observation, event-driven reconciliation
API Server Load High for frequent operations Low, primarily one watch connection per resource type
Event Detection Immediate (if using Watch api directly, but complex) Near real-time via event handlers
Local State Management None (client maintains no local cache) Comprehensive local, in-memory cache
Concurrency Safety Managed by caller Cache is read-only, DeepCopy for modifications
Fault Tolerance Requires manual reconnection and state reconciliation Built-in re-list and re-watch mechanisms
Complexity Simpler for one-off; complex for continuous watching Higher initial setup; simpler for ongoing event processing
Consistency Eventual, based on API calls Strong eventual consistency (with cache sync)
Scalability Poor for many clients watching same resources Excellent due to shared informers and local caches

This table clearly illustrates why informers are the preferred pattern for building robust Kubernetes controllers that need to react to changes efficiently and reliably.

Best Practices and Considerations

Building a production-ready Kubernetes operator requires more than just basic watching. Here are critical best practices:

  1. Idempotency: Your syncHandler must be idempotent. This means applying the same desired state multiple times should always result in the same actual state, without causing unintended side effects. Kubernetes guarantees at-least-once delivery of events, so your syncHandler might be called multiple times for the same change.
  2. Error Handling and Retries: Implement robust error handling. Transient errors (e.g., network issues, temporary API server unavailability) should trigger retries using the rate-limiting work queue. Permanent errors should be logged, and potentially reflected in the Custom Resource's status field, but not endlessly retried. utilruntime.HandleError is a good way to log errors without crashing your controller.
  3. Status Subresource: Always update the status of your Custom Resource using the /status subresource if your CRD defines it. This separates status updates from spec updates, improving concurrency and reducing conflicts. Your CRD should include subresources: { status: {} }.
  4. Resource Ownership and Garbage Collection: When your operator creates dependent resources (like Deployments, Services, ConfigMaps) based on a Custom Resource, establish OwnerReference relationships. This allows Kubernetes' garbage collector to automatically clean up dependent resources when the owner CR is deleted. This also helps with the handleObject logic mentioned earlier.
  5. Graceful Shutdown: Use context.Context and OS signal handling to ensure your operator shuts down cleanly, allowing ongoing processing to complete and informers to stop gracefully.
  6. Resource Limits and Requests: For your operator's Deployment, define appropriate CPU and memory requests and limits to ensure stable operation and prevent resource starvation or excessive consumption within the cluster.
  7. Testing:
    • Unit Tests: Test individual functions and logic components.
    • Integration Tests: Test your controller's interaction with a mock Kubernetes API server or a local kind cluster. controller-runtime provides excellent test frameworks for this.
    • End-to-End Tests: Deploy your operator and CRDs to a test cluster and verify its behavior by creating/updating/deleting CRs and observing resulting cluster changes.
  8. Observability (Logging, Metrics, Tracing):
    • Logging: Use structured logging (e.g., klog/v2 with --v=level) to provide clear insights into your operator's actions and state. Log significant events, errors, and reconciliation cycles.
    • Metrics: Expose Prometheus metrics from your operator to monitor its health, work queue depth, reconciliation times, and api call latencies. controller-runtime includes excellent helpers for this.
    • Tracing: Integrate with a distributed tracing system to understand the flow of requests and operations across your operator and the Kubernetes api server.
  9. Security:
    • Least Privilege: Grant your operator's Service Account only the minimum necessary RBAC permissions.
    • Secrets Management: Handle sensitive data (e.g., api keys for external services) using Kubernetes Secrets, mounted as files or environment variables, and avoid hardcoding.
    • Image Security: Use trusted base images for your Dockerfiles and scan your images for vulnerabilities.
  10. Finalizers (for complex cleanup): If deleting a Custom Resource requires complex external cleanup actions (e.g., de-provisioning a cloud database, deleting an api entry from an external api gateway), implement finalizers. A finalizer prevents a resource from being fully deleted until your operator explicitly removes the finalizer after completing its cleanup tasks.
  11. Webhooks (Validating and Mutating): For more advanced control over Custom Resources, consider implementing ValidatingAdmissionWebhooks (to enforce schema compliance beyond CRD validation, or complex business rules) and MutatingAdmissionWebhooks (to inject default values or modify a CR before it's persisted).

Advanced Concepts and Future Directions

The world of Kubernetes operators is constantly evolving. As your needs grow, you might explore:

  • Operator Frameworks: While client-go provides the primitives, frameworks like Kubebuilder and Operator SDK abstract away much of the boilerplate code and provide scaffolds for building robust operators, including webhooks, metrics, and testing utilities. They significantly reduce the development time for complex operators.
  • Context for Reconciliation: Passing context.Context through your reconciliation logic is crucial for enabling cancellation signals, especially in complex operations involving external services or long-running tasks.
  • External Dependencies and Idempotent APIs: When your operator interacts with external apis or infrastructure (like cloud providers, databases, or external api gateways such as APIPark for advanced api management), always ensure these interactions are idempotent. Design your syncHandler to check the current state of external resources before attempting modifications, minimizing side effects from repeated calls.

Conclusion

Mastering the art of watching for changes to Custom Resources in Golang is a cornerstone skill for anyone building sophisticated, cloud-native applications on Kubernetes. By leveraging client-go's SharedInformerFactory, workqueue pattern, and a deep understanding of reconciliation loops, developers can create powerful, resilient, and highly automated operators. These operators not only extend Kubernetes' capabilities but also bring a declarative, GitOps-friendly approach to managing any aspect of your infrastructure and applications, from database clusters to intricate api gateway configurations.

The api gateway pattern, for example, becomes incredibly powerful when coupled with Custom Resources. Imagine defining your entire api landscape, including security policies, routing, rate limiting, and analytics hooks, directly in Kubernetes YAML files as Custom Resources. An operator would then watch these CRs and dynamically configure an underlying api gateway to reflect these declarations, achieving a truly automated and self-managing api infrastructure. This convergence of Kubernetes' extensibility with robust api management platforms like APIPark represents the frontier of modern application orchestration, simplifying the deployment and governance of complex service meshes and AI integrations.

By following the detailed steps and best practices outlined in this guide, you are now equipped to build your own sophisticated Kubernetes controllers, transforming abstract resource definitions into tangible, automated actions, and ultimately, building a more intelligent and responsive cloud-native environment.

Frequently Asked Questions (FAQs)

1. What is the main difference between direct API calls and using Informers in client-go? Direct API calls (e.g., clientset.Get(), clientset.Create()) are imperative, one-off requests to the Kubernetes API server. They are suitable for specific actions. Informers, on the other hand, are a declarative, event-driven mechanism. They establish a continuous watch connection to the API server, maintain a local cache of resources, and notify your application via event handlers (Add, Update, Delete) when changes occur. Informers are highly efficient, reduce API server load, and abstract away complexities like connection management and state reconciliation, making them ideal for building controllers that react to ongoing changes.

2. Why is a work queue used in Kubernetes controllers, and what is AddRateLimited? A work queue (workqueue.RateLimitingInterface) is used to decouple event reception from event processing in a controller. Instead of immediately processing an event in the informer's event handler, the resource's key (namespace/name) is added to the work queue. This allows the informer to quickly process new events without being blocked by lengthy reconciliation logic, preventing backpressure on the API server. AddRateLimited adds an item to the work queue with an exponential backoff. If syncHandler encounters a transient error, AddRateLimited ensures the item is retried after increasing delays, preventing busy-looping on temporary failures and reducing API server load during outages.

3. How does a controller handle resource deletion when using Informers? When a Custom Resource (or any watched resource) is deleted, the informer's DeleteFunc event handler is triggered. This handler typically adds the deleted resource's key to the work queue. In the syncHandler, when attempting to retrieve the resource from the informer's local cache using the lister (myresourcesLister.Get(name)), it will return an IsNotFound error. This is the signal for the controller to perform any necessary cleanup actions related to the deleted resource (e.g., delete dependent Kubernetes objects, de-provision external resources, clean up api gateway configurations).

4. What are OwnerReferences and why are they important for operators? OwnerReferences are a Kubernetes metadata field that establishes a parent-child relationship between resources. When an operator creates secondary resources (e.g., a Deployment) based on a primary Custom Resource (e.g., MyResource), it should set the MyResource as the owner of the Deployment. This is important for two main reasons: a. Garbage Collection: Kubernetes' garbage collector can automatically delete dependent (child) resources when their owner (parent) resource is deleted, simplifying cleanup. b. Controller Logic: Controllers can use OwnerReferences to identify which primary Custom Resource an orphaned or changed secondary resource belongs to, enabling them to correctly reconcile the state.

5. How can I ensure my operator's logic is idempotent? Idempotency means that applying the same operation multiple times yields the same result as applying it once. For operator syncHandlers, this is crucial because Kubernetes guarantees at-least-once delivery of events, meaning your syncHandler might be called repeatedly for the same desired state. To ensure idempotency: a. Check Current State First: Before creating, updating, or deleting a resource, always check its current state. Only perform an action if the current state doesn't match the desired state (e.g., don't create a Deployment if one with the correct configuration already exists). b. Use Create-Or-Update (Upsert) Logic: Instead of separate create and update calls, use a logic that attempts to get a resource; if it doesn't exist, create it; otherwise, update the existing one to match the desired spec. c. Reflect Status: Update the Custom Resource's status to reflect the actual state of the cluster, providing clear feedback on the reconciliation process. This also helps in identifying discrepancies.

πŸš€You can securely and efficiently call the OpenAI API on APIPark in just two steps:

Step 1: Deploy the APIPark AI gateway in 5 minutes.

APIPark is developed based on Golang, offering strong product performance and low development and maintenance costs. You can deploy APIPark with a single command line.

curl -sSO https://download.apipark.com/install/quick-start.sh; bash quick-start.sh
APIPark Command Installation Process

In my experience, you can see the successful deployment interface within 5 to 10 minutes. Then, you can log in to APIPark using your account.

APIPark System Interface 01

Step 2: Call the OpenAI API.

APIPark System Interface 02
Article Summary Image