Golang: How to Watch for Changes to Custom Resources
In the rapidly evolving landscape of cloud-native development, Kubernetes has emerged as the de facto operating system for the cloud, orchestrating containers with unparalleled efficiency. A cornerstone of Kubernetes' extensibility is its Custom Resource Definition (CRD) mechanism, allowing users to define their own API objects, extending the Kubernetes API beyond its built-in types. While defining these custom resources (CRs) is straightforward, the real power unlocks when applications can dynamically react to their creation, updates, or deletions. This is where the art of "watching for changes" comes into play, particularly when building sophisticated operators and controllers in Go.
This comprehensive guide will delve deep into the methodologies and best practices for developing Go-based applications that reliably and efficiently observe and respond to changes in Custom Resources. We will explore the fundamental components of the client-go library, dissect the event-driven architecture, and provide a detailed, step-by-step implementation, ensuring you gain a mastery over building resilient and scalable Kubernetes operators.
The Foundation: Understanding Kubernetes Custom Resources
Before we embark on the journey of watching, it's crucial to grasp what Custom Resources are and why they are so vital to the Kubernetes ecosystem. At its core, Kubernetes is a declarative system. You describe the desired state of your applications and infrastructure using YAML or JSON manifests, and Kubernetes works tirelessly to make the actual state match your desired state. While Kubernetes provides a rich set of built-in resources like Pods, Deployments, Services, and Ingresses, complex applications often require domain-specific abstractions that these native types cannot fully capture.
This is precisely where Custom Resources step in. A Custom Resource Definition (CRD) is a powerful Kubernetes object that allows cluster administrators to define new, entirely custom resource types. Once a CRD is registered with the Kubernetes API server, users can create instances of these custom resources, just like they would create a Pod or a Deployment. These custom resources become first-class citizens in the Kubernetes API, meaning they can be managed with kubectl, stored in etcd, and watched for changes.
Why are Custom Resources Indispensable?
The utility of Custom Resources spans a multitude of use cases, fundamentally transforming how we build and operate applications on Kubernetes:
- Domain-Specific Abstractions: They enable developers to model complex application configurations or infrastructure components directly within the Kubernetes API. For instance, you might define a
DatabaseClusterCR to represent a high-availability database setup, encapsulating its replicas, storage, and backup policies. Or, aCDNConfigCR could manage content delivery network settings for a specific service. - Operator Pattern Enablement: Custom Resources are the bedrock of the Kubernetes Operator pattern. An Operator is a method of packaging, deploying, and managing a Kubernetes application. Operators extend the Kubernetes API and automate the lifecycle of applications beyond simple stateless deployments. They achieve this by watching for changes to specific Custom Resources and then performing complex, application-specific actions to reconcile the desired state (defined by the CR) with the actual state. This includes provisioning external resources, scaling, backups, and upgrades.
- Simplified Management for End-Users: By providing higher-level abstractions, CRs simplify the interaction for application developers or platform engineers. Instead of dealing with numerous low-level Kubernetes objects (Deployments, Services, ConfigMaps, Secrets, PVCs), they can interact with a single, intuitive Custom Resource that represents their application's specific needs. For example, instead of configuring an Ingress, a Service, and a Pod template, a developer might simply create an
ApplicationRouteCR. - Extensibility and Ecosystem Integration: CRDs allow third-party tools and services to integrate seamlessly with Kubernetes. An external
api gatewayor a secret management system, for instance, might expose its configurations or resources as Custom Resources within Kubernetes, enabling a unified control plane. This is especially relevant in contexts where custom configurations are required for anapiservice to be exposed securely and efficiently.
Consider a scenario where you're building a sophisticated api gateway for your microservices infrastructure. Instead of manually configuring routing rules, security policies, and rate limits for each api, you could define a GatewayConfig Custom Resource. This CR would encapsulate all the necessary parameters for a specific API endpoint, such as its upstream service, path matching rules, authentication requirements, and traffic shaping policies. An operator designed to watch these GatewayConfig CRs would then translate these declarative definitions into the actual configuration for the underlying api gateway. This approach brings immense flexibility, automation, and a single source of truth for your API configurations. For enterprises looking for robust, open-source solutions to manage their APIs, especially in AI-driven environments, platforms like APIPark offer comprehensive api gateway and API management capabilities, allowing for unified control over various API services, including those defined by custom resources. By using Custom Resources to define how APIs are exposed and managed, developers can integrate APIPark's powerful features directly into their Kubernetes-native workflows, benefiting from its quick integration of 100+ AI models, unified API format for AI invocation, and end-to-end API lifecycle management, all orchestrated through familiar Kubernetes constructs.
The Core Problem: How to Detect Changes Efficiently?
Once you have defined your Custom Resources and instances of them are flowing into your Kubernetes cluster, the next logical step is to build an application that can react to their changes. This is not a trivial task, especially in a dynamic and distributed environment like Kubernetes. Several approaches exist, each with its own trade-offs:
- Direct Polling: The simplest, yet often least efficient, method is to periodically query the Kubernetes API server for the current state of a resource. This involves making
GETrequests to the/apis/<group>/<version>/<plural>endpoint at regular intervals.- Pros: Easy to implement for basic needs.
- Cons:
- Inefficiency: Generates unnecessary load on the API server, especially for frequently polling multiple resources.
- Latency: Changes are only detected on the next poll interval, leading to potential delays in reaction.
- Resource Inconsistency: It's hard to guarantee that you haven't missed intermediate changes between polls.
- Long Polling/Watch API: Kubernetes provides a dedicated "watch"
apiendpoint that allows clients to establish a persistent connection with the API server. When a change occurs to a watched resource, the API server pushes an event (ADD, UPDATE, DELETE) over this connection.- Pros: Real-time event delivery, much more efficient than polling, significantly lower latency.
- Cons:
- Connection Management: Clients need to handle connection drops, retries, and resuming watches from the correct resource version.
- State Management: If a watch connection drops for an extended period, the client might miss events. Reconciling the local state with the actual cluster state after a reconnection can be complex.
- Scalability: While better than polling, a large number of independent watch connections for the same resource type can still strain the API server and
etcd.
These challenges highlight the need for a more robust and scalable solution, which leads us to the client-go library's powerful informer pattern.
The Golang client-go Library: Your Gateway to Kubernetes
Golang's client-go library is the official client library for interacting with the Kubernetes API from Go applications. It provides high-level abstractions over the raw HTTP api calls, simplifying tasks like authentication, resource serialization/deserialization, and event watching. For building Kubernetes operators and controllers, client-go is an indispensable tool.
Key Components of client-go for Watching Resources
client-go introduces a robust pattern for watching resources, centered around the concept of "informers." Informers abstract away the complexities of the watch api, connection management, and local caching, providing a reliable and efficient way for your controller to receive notifications about resource changes.
The informer pattern typically involves three core components working in harmony:
- Reflector: This component is responsible for watching a specific resource type. It establishes a long-lived watch connection to the Kubernetes API server and, when the connection drops, it re-establishes it and performs a "list" operation to ensure its local cache is up-to-date. The Reflector also handles the initial list operation to populate the cache. It continuously monitors the Kubernetes API server for
ADD,UPDATE, andDELETEevents for the resources it's configured to watch. It intelligently manages resource versions to ensure it doesn't miss events or process duplicates. - DeltaFIFO / Store: The Reflector feeds the events it receives into a queue, often a
DeltaFIFO(First-In, First-Out queue that tracks changes or "deltas"). TheDeltaFIFOthen pushes these events into a local, in-memory cache, known as theStore. ThisStoreserves as a consistent, up-to-date replica of the watched resources from the API server. This local cache is crucial because it allows controllers to retrieve resource objects quickly without constantly hitting the API server, significantly reducingapiserver load and improving performance. - Controller (Your Application Logic): Your actual controller logic interacts with the
Storeand registers event handlers with the informer. When an event (ADD, UPDATE, DELETE) occurs and is processed by the Reflector and DeltaFIFO, the informer invokes the corresponding event handler in your controller. Your controller then typically adds the key (namespace/name) of the affected resource to a work queue, which decouples event reception from event processing, preventing backpressure on the informer andapiserver.
This entire mechanism is encapsulated within client-go's SharedInformerFactory.
SharedInformerFactory: The Orchestrator
The SharedInformerFactory is the central component that orchestrates the creation and management of informers for multiple resource types within your application.
- Sharing: Its primary benefit is "sharing." If multiple controllers within your application need to watch the same resource type (e.g., both a Pod scaler and a network policy controller need to know about Pods), they can all use the same informer provided by the
SharedInformerFactory. This means only one Reflector and one Watchapiconnection are established to the API server for that resource type, drastically reducing resource consumption andapiserver load compared to each controller maintaining its own informer. - Starting and Stopping: The
SharedInformerFactoryprovides methods toStart()all managed informers concurrently and toWaitForCacheSync()to ensure all caches are populated before your controllers begin processing events, preventing race conditions where controllers might try to retrieve non-existent resources from an empty cache. - Typed vs. Dynamic Informers:
- Typed Informers: For built-in Kubernetes resources (Pods, Deployments) and Custom Resources for which you have generated Go types (using tools like
controller-gen),client-goprovides strongly typed informers. These return actual Go structs (e.g.,*v1.Podor*v1alpha1.MyCustomResource), offering compile-time safety and easier field access. - Dynamic Informers: When you need to watch Custom Resources for which you don't have generated Go types (perhaps because they are defined by a third-party, or you want to build a generic controller),
client-gooffersdynamic.SharedInformerFactory. This returns unstructuredruntime.Unstructuredobjects, requiring manual type assertion or JSON marshaling/unmarshaling to access fields. While less type-safe, it offers maximum flexibility.
- Typed Informers: For built-in Kubernetes resources (Pods, Deployments) and Custom Resources for which you have generated Go types (using tools like
Interacting with the Kubernetes API: Clients
Beyond informers, client-go provides various client types for making direct API calls (GET, POST, PUT, DELETE, PATCH) to the Kubernetes API server:
- Clientset: The
kubernetes.Clientsetis the primary client for interacting with built-in Kubernetes resources. It provides strongly typed methods for each resource, likeclientset.AppsV1().Deployments("namespace").Get(...). - CRD Client: For strongly typed Custom Resources, once you've generated Go types, you'll create a dedicated client for your CRD. This client is similar to the
Clientsetbut specific to your customapigroup and version. It also offers type-safe operations. - Dynamic Client: The
dynamic.Interfaceis a powerful, generic client that can interact with any resource in Kubernetes, whether built-in or custom, without needing generated Go types. It operates onruntime.Unstructuredobjects, making it ideal for generic controllers or when resource types are not known at compile time. This is particularly useful for buildingapi gatewaycomponents that need to adapt to various customapidefinitions. - RESTClient: The lowest-level client in
client-go,rest.RESTClientallows you to make raw HTTP requests to the Kubernetes API, providing maximum control but requiring more manual handling of URLs, verbs, and serialization. This is generally not recommended for day-to-day controller development unless specific, highly customapiinteractions are required.
For watching Custom Resources, the combination of SharedInformerFactory (potentially for your custom resource types) and your specific CRD client (for operations like updating the CR's status) will be your most frequent companions.
APIPark is a high-performance AI gateway that allows you to securely access the most comprehensive LLM APIs globally on the APIPark platform, including OpenAI, Anthropic, Mistral, Llama2, Google Gemini, and more.Try APIPark now! πππ
Step-by-Step Implementation: Building a Golang Operator to Watch Custom Resources
Let's walk through building a simple Kubernetes operator in Go that watches for changes to a custom resource named MyResource. This operator will demonstrate how to define a CRD, generate Go types, set up informers, and handle events.
Our hypothetical MyResource will have a single string field in its spec and a status field to indicate its processing state.
1. Project Setup and Dependencies
First, initialize your Go module and add necessary client-go dependencies.
mkdir my-operator
cd my-operator
go mod init github.com/your-org/my-operator
go get k8s.io/client-go@v0.29.0 # Use a stable version suitable for your K8s cluster
go get sigs.k8s.io/controller-runtime/pkg/manager@v0.17.0 # Useful for boilerplate
go get sigs.k8s.io/controller-tools/cmd/controller-gen@v0.14.0 # For generating types and CRDs
The controller-gen tool is crucial for generating Kubernetes-specific code, including CRD definitions and Go types from your struct definitions.
2. Define Your Custom Resource Definition (CRD)
Create a directory for your API types, e.g., api/v1alpha1/.
mkdir -p api/v1alpha1
Now, define your MyResource Go struct in api/v1alpha1/myresource_types.go:
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyResource is the Schema for the myresources API
type MyResource struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MyResourceSpec `json:"spec,omitempty"`
Status MyResourceStatus `json:"status,omitempty"`
}
// MyResourceSpec defines the desired state of MyResource
type MyResourceSpec struct {
Message string `json:"message,omitempty"`
}
// MyResourceStatus defines the observed state of MyResource
type MyResourceStatus struct {
Phase string `json:"phase,omitempty"`
Message string `json:"message,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyResourceList contains a list of MyResource
type MyResourceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []MyResource `json:"items"`
}
Annotations Explained: * +genclient: This annotation tells controller-gen to generate a client for this resource type. * +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object: This generates DeepCopy methods, essential for safe concurrency in client-go caches and other components. * metav1.TypeMeta and metav1.ObjectMeta: These are standard Kubernetes metadata fields.
3. Generate Code: CRD, DeepCopy, and Client
Now, use controller-gen to generate the necessary boilerplate code. From your project root:
go mod tidy
go generate ./...
You'll need a zz_generated.deepcopy.go file for deep copy methods and a doc.go to define +groupName for the API.
Create api/v1alpha1/doc.go:
// Package v1alpha1 contains API Schema definitions for the example v1alpha1 API group
// +kubebuilder:object:generate=true
// +groupName=example.com
package v1alpha1
Now, set up a go generate command in your go.mod or a Makefile. For simplicity, add a //go:generate directive to a Go file (e.g., main.go later) or create a Makefile:
# Makefile
.PHONY: generate manifests
generate:
controller-gen object:headerFile="hack/boilerplate.go.txt" paths="./..."
controller-gen crd:crdVersions=v1 output:crd:dir=config/crd paths="./..."
manifests: generate
And create a dummy hack/boilerplate.go.txt (or copy from kubernetes/kubernetes repo):
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
Run make generate. This will: * Generate api/v1alpha1/zz_generated.deepcopy.go. * Generate client code in pkg/client/. * Generate the CRD YAML manifest in config/crd/example.com_myresources.yaml.
You can now apply this CRD to your cluster: kubectl apply -f config/crd/example.com_myresources.yaml.
4. Implement the Controller Logic
This is the heart of your operator. We'll define a Controller struct, a Run method, and a syncHandler function.
main.go: Entry point for our operator.
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/your-org/my-operator/pkg/controller"
"github.com/your-org/my-operator/pkg/generated/clientset/versioned"
"github.com/your-org/my-operator/pkg/generated/informers/externalversions"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)
func main() {
klog.InitFlags(nil)
defer klog.Flush()
var masterURL string
var kubeconfig string
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.Parse()
// 1. Build Kubernetes client configuration
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}
// 2. Create standard Kubernetes clientset
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
// 3. Create our Custom Resource clientset
myResourceClient, err := versioned.NewForConfig(cfg)
if err != nil {
klog.Fatalf("Error building custom resource clientset: %s", err.Error())
}
// 4. Set up informers
// Standard K8s informers (e.g., Pods, Deployments)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
// Custom Resource informers (MyResource)
myResourceInformerFactory := externalversions.NewSharedInformerFactory(myResourceClient, time.Second*30)
// Create and run our controller
myController := controller.NewController(
kubeClient,
myResourceClient,
kubeInformerFactory.Apps().V1().Deployments(), // Example: If our controller needs to watch Deployments
myResourceInformerFactory.Example().V1alpha1().MyResources(),
)
// Context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Set up OS signal handler for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
klog.Info("Received termination signal, shutting down controller...")
cancel()
}()
// Start all informers
klog.Info("Starting informers...")
kubeInformerFactory.Start(ctx.Done())
myResourceInformerFactory.Start(ctx.Done())
// Wait for caches to sync
if !kubeInformerFactory.WaitForCacheSync(ctx.Done()) {
klog.Fatalf("Failed to sync kube informers cache")
}
if !myResourceInformerFactory.WaitForCacheSync(ctx.Done()) {
klog.Fatalf("Failed to sync custom resource informers cache")
}
klog.Info("Informers caches synced successfully.")
// Run the controller
if err = myController.Run(2, ctx.Done()); err != nil { // 2 workers
klog.Fatalf("Error running controller: %s", err.Error())
}
klog.Info("Controller gracefully stopped.")
}
pkg/controller/controller.go: Contains the core controller logic.
package controller
import (
"context"
"fmt"
"time"
"github.com/your-org/my-operator/api/v1alpha1"
clientset "github.com/your-org/my-operator/pkg/generated/clientset/versioned"
myresourceinformer "github.com/your-org/my-operator/pkg/generated/informers/externalversions/example/v1alpha1"
myresourcelister "github.com/your-org/my-operator/pkg/generated/listers/example/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
const controllerAgentName = "my-resource-controller"
const (
// SuccessSynced is used as part of the Event 'reason' field
SuccessSynced = "Synced"
// MessageResourceSynced is the message used for Events when a resource
// is synced successfully
MessageResourceSynced = "MyResource synced successfully"
)
// Controller is the controller for MyResource
type Controller struct {
kubeclientset kubernetes.Interface
myresourceclientset clientset.Interface
deploymentsLister appsv1.DeploymentLister
deploymentsSynced cache.InformerSynced
myresourcesLister myresourcelister.MyResourceLister
myresourcesSynced cache.InformerSynced
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it immediately. This allows us to group
// together several similar events and process them batch-wise. It also ensures
// clients are not blocked by long-running operations.
workqueue workqueue.RateLimitingInterface
// recorder record.EventRecorder // for emitting Kubernetes events (optional but good practice)
}
// NewController returns a new MyResource controller
func NewController(
kubeclientset kubernetes.Interface,
myresourceclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
myresourceInformer myresourceinformer.MyResourceInformer) *Controller {
// Add MyResource to the Kubernetes client scheme (needed for event recorder)
utilruntime.Must(v1alpha1.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
// eventBroadcaster := record.NewBroadcaster()
// eventBroadcaster.StartLogging(klog.Infof)
// eventBroadcaster.StartRecordingToSink(&v1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
// recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
myresourceclientset: myresourceclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
myresourcesLister: myresourceInformer.Lister(),
myresourcesSynced: myresourceInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MyResources"),
// recorder: recorder,
}
klog.Info("Setting up event handlers")
// Set up an event handler for when MyResource resources change
myresourceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueMyResource,
UpdateFunc: func(old, new interface{}) {
controller.enqueueMyResource(new)
},
DeleteFunc: controller.enqueueMyResource, // Reconcile on delete as well
})
// If our controller also watches other resources (e.g., Deployments that it manages)
// deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// AddFunc: controller.handleObject,
// UpdateFunc: func(old, new interface{}) {
// newDepl := new.(*appsv1.Deployment)
// oldDepl := old.(*appsv1.Deployment)
// if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// return // Objects are equal, no need to process
// }
// controller.handleObject(new)
// },
// DeleteFunc: controller.handleObject,
// })
return controller
}
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until ctx.Done() is closed,
// at which point it will shutdown the workqueue and wait for workers to finish.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
klog.Info("Starting MyResource controller")
// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.myresourcesSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.Info("Starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
klog.Info("Started workers")
<-stopCh
klog.Info("Shutting down workers")
return nil
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message or
// event from the workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single item from the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date than when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget to ensure it does not get re-queued.
c.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing the resource key to be processed.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// If no error occurs we Forget this item so it does not get requeued
// no matter what.
c.workqueue.Forget(obj)
klog.V(4).Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the MyResource resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get the MyResource resource with this namespace/name
myresource, err := c.myresourcesLister.MyResources(namespace).Get(name)
if err != nil {
// The MyResource resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
klog.V(4).Infof("MyResource '%s' in work queue no longer exists", key)
// Here you would typically perform cleanup actions for deleted resources
return nil
}
return err
}
klog.Infof("Processing MyResource: %s/%s with message: %s", myresource.Namespace, myresource.Name, myresource.Spec.Message)
// Here is where your main reconciliation logic goes.
// For demonstration, we'll just log and update the status.
// In a real operator, you would create/update/delete other Kubernetes resources
// (e.g., Deployments, Services) based on myresource.Spec.
// Example: If MyResource.Spec.Message is "fail", simulate an error
if myresource.Spec.Message == "fail" {
c.updateMyResourceStatus(myresource, "Failed", "Processing failed as requested.")
return fmt.Errorf("simulated failure for MyResource '%s'", key)
}
// Update the status of the MyResource
err = c.updateMyResourceStatus(myresource, "Processed", fmt.Sprintf("Message '%s' handled successfully.", myresource.Spec.Message))
if err != nil {
return err
}
// c.recorder.Event(myresource, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
func (c *Controller) updateMyResourceStatus(myresource *v1alpha1.MyResource, phase, message string) error {
// NEVER modify objects from the store. It's a read-only cache.
// Instead, copy the object, modify the copy, and send it to the API server.
myresourceCopy := myresource.DeepCopy()
myresourceCopy.Status.Phase = phase
myresourceCopy.Status.Message = message
// If the CustomResource `api` supports it, you should use the
// `/status` subresource for status updates.
_, err := c.myresourceclientset.ExampleV1alpha1().MyResources(myresource.Namespace).UpdateStatus(context.TODO(), myresourceCopy, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating status for MyResource '%s': %v", myresource.Name, err)
}
return nil
}
// enqueueMyResource takes a MyResource resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed objects which may be mutated afterwards.
func (c *Controller) enqueueMyResource(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
// handleObject will take any resource implementing metav1.Object and attempt
// to find the MyResource resource that 'owns' it. It does this by looking at the
// objects metadata.ownerReferences field for an appropriate OwnerReference.
// It then enqueues that MyResource resource to be processed. If the object does not
// have an appropriate OwnerReference, it will simply be skipped.
// func (c *Controller) handleObject(obj interface{}) {
// object, ok := obj.(metav1.Object)
// if !ok {
// tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
// if !ok {
// utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
// return
// }
// object, ok = tombstone.Obj.(metav1.Object)
// if !ok {
// utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
// return
// }
// klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
// }
// klog.V(4).Infof("Processing object: %s", object.GetName())
// if ownerRef := metav1.Get='MyResource-Label'; ownerRef != nil {
// // ... logic to find parent MyResource and enqueue ...
// }
// }
Key Components and Patterns Explained in the Controller:
NewController: This constructor initializes the controller.- It creates a
workqueue.RateLimitingInterface. This queue is crucial forclient-gocontrollers. Instead of processing events directly in theAddFunc/UpdateFunc/DeleteFunc, we add the resource'snamespace/namekey to this queue. This decouples event reception from processing, prevents blocking the informer, and allows for rate-limiting, retries, and concurrent processing. - It registers event handlers (
AddFunc,UpdateFunc,DeleteFunc) with themyresourceInformer. These handlers simply callenqueueMyResource, which pushes the resource's key to theworkqueue.
- It creates a
Run: This method starts the controller's main loop.- It calls
cache.WaitForCacheSyncto ensure that all informer caches (both for built-in and custom resources) are fully populated before starting the workers. This prevents controllers from trying to fetch non-existent resources from an empty cache immediately after startup. - It starts a configurable number of worker goroutines (
threadiness) that continuously callprocessNextWorkItem.
- It calls
processNextWorkItem: This function pulls an item (a resource key) from theworkqueueand passes it tosyncHandler. It handles successful processing by callingworkqueue.Forget(obj)and transient errors by callingworkqueue.AddRateLimited(key)to re-add the item to the queue with a backoff.syncHandler: This is the core reconciliation logic.- It retrieves the
MyResourceobject from the informer's local cache using themyresourcesLister. Using the lister is critical: it queries the local cache, not theapiserver, providing fast access and reducingapiserver load. - It checks for
NotFounderrors, indicating the resource was deleted. - It then performs the actual logic based on the
myresource.Spec. In this example, it simply logs and updates theMyResource'sstatusfield via themyresourceclientset. In a real operator, this is where you would create, update, or delete other Kubernetes resources (e.g., a Deployment for an application, a Service for exposing anapi, or a ConfigMap forapi gatewayconfigurations) to match the desired state declared inmyresource.Spec. - Crucially, when updating a resource, especially its
status, you should always work on aDeepCopy()of the object retrieved from the lister. The objects in the informer's cache are read-only and should not be mutated directly.
- It retrieves the
updateMyResourceStatus: A helper function to update the status ofMyResource. It uses the/statussubresource if available (recommended for status updates).enqueueMyResource: A simple helper that gets thenamespace/namekey of a resource and adds it to theworkqueue.
5. Building and Running the Operator
Build your Go operator:
go build -o my-operator .
You can then run it locally (if configured to connect to your cluster via kubeconfig) or containerize it for deployment to Kubernetes.
Example Dockerfile for your operator:
# Use a minimal base image
FROM alpine/git as builder
WORKDIR /src
# Copy go.mod and go.sum to download dependencies
COPY go.mod go.sum ./
RUN go mod download
# Copy the rest of the source code
COPY . .
# Generate code and manifests (if not done locally)
# RUN make generate
# Build the operator binary
RUN CGO_ENABLED=0 go build -o /my-operator -ldflags="-s -w" ./main.go
# Final image
FROM gcr.io/distroless/static:nonroot
WORKDIR /
COPY --from=builder /my-operator /my-operator
USER nonroot
ENTRYPOINT ["/my-operator"]
Build the Docker image:
docker build -t your-registry/my-operator:v1.0.0 .
docker push your-registry/my-operator:v1.0.0
6. Deploying the Operator to Kubernetes
You'll need a Deployment, a Service Account, and RBAC (Role-Based Access Control) permissions for your operator.
config/rbac/role.yaml: Define the necessary permissions.
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: my-operator-role
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["pods", "services", "configmaps", "secrets"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: ["example.com"] # Your custom API group
resources: ["myresources"]
verbs: ["get", "list", "watch", "update", "patch"] # patch for status updates
- apiGroups: ["example.com"]
resources: ["myresources/status"] # for status subresource
verbs: ["get", "update", "patch"]
- apiGroups: ["example.com"]
resources: ["myresources/finalizers"] # if you implement finalizers
verbs: ["update"]
config/rbac/role_binding.yaml: Bind the role to a service account.
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: my-operator-rolebinding
subjects:
- kind: ServiceAccount
name: my-operator-sa
namespace: default # Or your operator's namespace
roleRef:
kind: ClusterRole
name: my-operator-role
apiGroup: rbac.authorization.k8s.io
config/rbac/service_account.yaml: Create a service account.
apiVersion: v1
kind: ServiceAccount
metadata:
name: my-operator-sa
namespace: default # Or your operator's namespace
config/deployment.yaml: Your operator's Deployment.
apiVersion: apps/v1
kind: Deployment
metadata:
name: my-operator
namespace: default # Or your operator's namespace
spec:
replicas: 1
selector:
matchLabels:
app: my-operator
template:
metadata:
labels:
app: my-operator
spec:
serviceAccountName: my-operator-sa
containers:
- name: my-operator
image: your-registry/my-operator:v1.0.0 # Replace with your image
imagePullPolicy: Always
# Add resource limits, probes as appropriate for production
Apply all these manifests:
kubectl apply -f config/rbac/service_account.yaml
kubectl apply -f config/rbac/role.yaml
kubectl apply -f config/rbac/role_binding.yaml
kubectl apply -f config/crd/example.com_myresources.yaml
kubectl apply -f config/deployment.yaml
Now, create an instance of MyResource:
# my-resource-instance.yaml
apiVersion: example.com/v1alpha1
kind: MyResource
metadata:
name: my-test-resource
namespace: default
spec:
message: "Hello from Custom Resource!"
kubectl apply -f my-resource-instance.yaml
Watch your operator's logs: kubectl logs -f deployment/my-operator. You should see it processing the MyResource. You can also inspect the status of your CR: kubectl get myresource my-test-resource -o yaml.
Try updating the message:
kubectl patch myresource my-test-resource --type='json' -p='[{"op": "replace", "path": "/spec/message", "value": "Updated message!"}]'
And deleting it:
kubectl delete myresource my-test-resource
Your operator should log events for each action.
Table: Comparing Kubernetes API Interaction Methods
| Feature / Method | Direct client-go API Calls (e.g., clientset.Get()) |
client-go Informers (via SharedInformerFactory) |
|---|---|---|
| Primary Use Case | One-off operations, imperative actions, specific queries | Continuous observation, event-driven reconciliation |
| API Server Load | High for frequent operations | Low, primarily one watch connection per resource type |
| Event Detection | Immediate (if using Watch api directly, but complex) |
Near real-time via event handlers |
| Local State Management | None (client maintains no local cache) | Comprehensive local, in-memory cache |
| Concurrency Safety | Managed by caller | Cache is read-only, DeepCopy for modifications |
| Fault Tolerance | Requires manual reconnection and state reconciliation | Built-in re-list and re-watch mechanisms |
| Complexity | Simpler for one-off; complex for continuous watching | Higher initial setup; simpler for ongoing event processing |
| Consistency | Eventual, based on API calls | Strong eventual consistency (with cache sync) |
| Scalability | Poor for many clients watching same resources | Excellent due to shared informers and local caches |
This table clearly illustrates why informers are the preferred pattern for building robust Kubernetes controllers that need to react to changes efficiently and reliably.
Best Practices and Considerations
Building a production-ready Kubernetes operator requires more than just basic watching. Here are critical best practices:
- Idempotency: Your
syncHandlermust be idempotent. This means applying the same desired state multiple times should always result in the same actual state, without causing unintended side effects. Kubernetes guaranteesat-least-oncedelivery of events, so yoursyncHandlermight be called multiple times for the same change. - Error Handling and Retries: Implement robust error handling. Transient errors (e.g., network issues, temporary API server unavailability) should trigger retries using the rate-limiting work queue. Permanent errors should be logged, and potentially reflected in the Custom Resource's
statusfield, but not endlessly retried.utilruntime.HandleErroris a good way to log errors without crashing your controller. - Status Subresource: Always update the
statusof your Custom Resource using the/statussubresource if your CRD defines it. This separates status updates fromspecupdates, improving concurrency and reducing conflicts. YourCRDshould includesubresources: { status: {} }. - Resource Ownership and Garbage Collection: When your operator creates dependent resources (like Deployments, Services, ConfigMaps) based on a Custom Resource, establish
OwnerReferencerelationships. This allows Kubernetes' garbage collector to automatically clean up dependent resources when the owner CR is deleted. This also helps with thehandleObjectlogic mentioned earlier. - Graceful Shutdown: Use
context.Contextand OS signal handling to ensure your operator shuts down cleanly, allowing ongoing processing to complete and informers to stop gracefully. - Resource Limits and Requests: For your operator's Deployment, define appropriate CPU and memory requests and limits to ensure stable operation and prevent resource starvation or excessive consumption within the cluster.
- Testing:
- Unit Tests: Test individual functions and logic components.
- Integration Tests: Test your controller's interaction with a mock Kubernetes API server or a local
kindcluster.controller-runtimeprovides excellent test frameworks for this. - End-to-End Tests: Deploy your operator and CRDs to a test cluster and verify its behavior by creating/updating/deleting CRs and observing resulting cluster changes.
- Observability (Logging, Metrics, Tracing):
- Logging: Use structured logging (e.g.,
klog/v2with--v=level) to provide clear insights into your operator's actions and state. Log significant events, errors, and reconciliation cycles. - Metrics: Expose Prometheus metrics from your operator to monitor its health, work queue depth, reconciliation times, and
apicall latencies.controller-runtimeincludes excellent helpers for this. - Tracing: Integrate with a distributed tracing system to understand the flow of requests and operations across your operator and the Kubernetes
apiserver.
- Logging: Use structured logging (e.g.,
- Security:
- Least Privilege: Grant your operator's Service Account only the minimum necessary RBAC permissions.
- Secrets Management: Handle sensitive data (e.g.,
apikeys for external services) using Kubernetes Secrets, mounted as files or environment variables, and avoid hardcoding. - Image Security: Use trusted base images for your Dockerfiles and scan your images for vulnerabilities.
- Finalizers (for complex cleanup): If deleting a Custom Resource requires complex external cleanup actions (e.g., de-provisioning a cloud database, deleting an
apientry from an externalapi gateway), implement finalizers. A finalizer prevents a resource from being fully deleted until your operator explicitly removes the finalizer after completing its cleanup tasks. - Webhooks (Validating and Mutating): For more advanced control over Custom Resources, consider implementing ValidatingAdmissionWebhooks (to enforce schema compliance beyond CRD validation, or complex business rules) and MutatingAdmissionWebhooks (to inject default values or modify a CR before it's persisted).
Advanced Concepts and Future Directions
The world of Kubernetes operators is constantly evolving. As your needs grow, you might explore:
- Operator Frameworks: While
client-goprovides the primitives, frameworks like Kubebuilder and Operator SDK abstract away much of the boilerplate code and provide scaffolds for building robust operators, including webhooks, metrics, and testing utilities. They significantly reduce the development time for complex operators. - Context for Reconciliation: Passing
context.Contextthrough your reconciliation logic is crucial for enabling cancellation signals, especially in complex operations involving external services or long-running tasks. - External Dependencies and Idempotent
APIs: When your operator interacts with externalapis or infrastructure (like cloud providers, databases, or externalapi gateways such as APIPark for advancedapimanagement), always ensure these interactions are idempotent. Design yoursyncHandlerto check the current state of external resources before attempting modifications, minimizing side effects from repeated calls.
Conclusion
Mastering the art of watching for changes to Custom Resources in Golang is a cornerstone skill for anyone building sophisticated, cloud-native applications on Kubernetes. By leveraging client-go's SharedInformerFactory, workqueue pattern, and a deep understanding of reconciliation loops, developers can create powerful, resilient, and highly automated operators. These operators not only extend Kubernetes' capabilities but also bring a declarative, GitOps-friendly approach to managing any aspect of your infrastructure and applications, from database clusters to intricate api gateway configurations.
The api gateway pattern, for example, becomes incredibly powerful when coupled with Custom Resources. Imagine defining your entire api landscape, including security policies, routing, rate limiting, and analytics hooks, directly in Kubernetes YAML files as Custom Resources. An operator would then watch these CRs and dynamically configure an underlying api gateway to reflect these declarations, achieving a truly automated and self-managing api infrastructure. This convergence of Kubernetes' extensibility with robust api management platforms like APIPark represents the frontier of modern application orchestration, simplifying the deployment and governance of complex service meshes and AI integrations.
By following the detailed steps and best practices outlined in this guide, you are now equipped to build your own sophisticated Kubernetes controllers, transforming abstract resource definitions into tangible, automated actions, and ultimately, building a more intelligent and responsive cloud-native environment.
Frequently Asked Questions (FAQs)
1. What is the main difference between direct API calls and using Informers in client-go? Direct API calls (e.g., clientset.Get(), clientset.Create()) are imperative, one-off requests to the Kubernetes API server. They are suitable for specific actions. Informers, on the other hand, are a declarative, event-driven mechanism. They establish a continuous watch connection to the API server, maintain a local cache of resources, and notify your application via event handlers (Add, Update, Delete) when changes occur. Informers are highly efficient, reduce API server load, and abstract away complexities like connection management and state reconciliation, making them ideal for building controllers that react to ongoing changes.
2. Why is a work queue used in Kubernetes controllers, and what is AddRateLimited? A work queue (workqueue.RateLimitingInterface) is used to decouple event reception from event processing in a controller. Instead of immediately processing an event in the informer's event handler, the resource's key (namespace/name) is added to the work queue. This allows the informer to quickly process new events without being blocked by lengthy reconciliation logic, preventing backpressure on the API server. AddRateLimited adds an item to the work queue with an exponential backoff. If syncHandler encounters a transient error, AddRateLimited ensures the item is retried after increasing delays, preventing busy-looping on temporary failures and reducing API server load during outages.
3. How does a controller handle resource deletion when using Informers? When a Custom Resource (or any watched resource) is deleted, the informer's DeleteFunc event handler is triggered. This handler typically adds the deleted resource's key to the work queue. In the syncHandler, when attempting to retrieve the resource from the informer's local cache using the lister (myresourcesLister.Get(name)), it will return an IsNotFound error. This is the signal for the controller to perform any necessary cleanup actions related to the deleted resource (e.g., delete dependent Kubernetes objects, de-provision external resources, clean up api gateway configurations).
4. What are OwnerReferences and why are they important for operators? OwnerReferences are a Kubernetes metadata field that establishes a parent-child relationship between resources. When an operator creates secondary resources (e.g., a Deployment) based on a primary Custom Resource (e.g., MyResource), it should set the MyResource as the owner of the Deployment. This is important for two main reasons: a. Garbage Collection: Kubernetes' garbage collector can automatically delete dependent (child) resources when their owner (parent) resource is deleted, simplifying cleanup. b. Controller Logic: Controllers can use OwnerReferences to identify which primary Custom Resource an orphaned or changed secondary resource belongs to, enabling them to correctly reconcile the state.
5. How can I ensure my operator's logic is idempotent? Idempotency means that applying the same operation multiple times yields the same result as applying it once. For operator syncHandlers, this is crucial because Kubernetes guarantees at-least-once delivery of events, meaning your syncHandler might be called repeatedly for the same desired state. To ensure idempotency: a. Check Current State First: Before creating, updating, or deleting a resource, always check its current state. Only perform an action if the current state doesn't match the desired state (e.g., don't create a Deployment if one with the correct configuration already exists). b. Use Create-Or-Update (Upsert) Logic: Instead of separate create and update calls, use a logic that attempts to get a resource; if it doesn't exist, create it; otherwise, update the existing one to match the desired spec. c. Reflect Status: Update the Custom Resource's status to reflect the actual state of the cluster, providing clear feedback on the reconciliation process. This also helps in identifying discrepancies.
πYou can securely and efficiently call the OpenAI API on APIPark in just two steps:
Step 1: Deploy the APIPark AI gateway in 5 minutes.
APIPark is developed based on Golang, offering strong product performance and low development and maintenance costs. You can deploy APIPark with a single command line.
curl -sSO https://download.apipark.com/install/quick-start.sh; bash quick-start.sh

In my experience, you can see the successful deployment interface within 5 to 10 minutes. Then, you can log in to APIPark using your account.

Step 2: Call the OpenAI API.

