How to Watch for Changes to Custom Resources in Golang
In the rapidly evolving landscape of cloud-native computing, Kubernetes has emerged as the de facto operating system for managing containerized workloads. Its extensibility, powered by Custom Resource Definitions (CRDs), has unlocked unprecedented possibilities for domain-specific automation and infrastructure management. However, merely defining a custom resource is only half the battle; the true power lies in building controllers that actively watch for changes to these resources and react intelligently. This detailed guide delves into the intricacies of leveraging Golang and the client-go library to effectively monitor and respond to alterations in your Kubernetes Custom Resources, laying the foundation for powerful, self-healing, and automated systems.
The Foundation: Understanding Custom Resources and the Imperative for Dynamic Observation
Kubernetes, at its core, operates on a declarative model. You describe the desired state of your applications and infrastructure, and the control plane works tirelessly to bring the actual state in line with your declarations. While Kubernetes provides a rich set of built-in resources like Pods, Deployments, and Services, the introduction of Custom Resource Definitions (CRDs) revolutionized its extensibility. CRDs allow users to define their own Kubernetes api objects, effectively extending the Kubernetes api without modifying the core code. This capability is paramount for modeling domain-specific concepts, such as a "DatabaseInstance," "CDNConfig," or "WorkflowTrigger," directly within the Kubernetes ecosystem.
A Custom Resource (CR) is an instance of a CRD. For instance, if you define a DatabaseInstance CRD, then my-prod-db and dev-test-db would be individual Custom Resources of that type. These custom resources become first-class citizens in Kubernetes, benefiting from its robust api server, authentication, authorization (RBAC), and validation mechanisms.
The immediate challenge, once you've defined a CRD and created CRs, is to make something useful happen. Merely existing, these custom resources don't perform any actions. This is where the concept of a "controller" or "operator" comes into play. A Kubernetes controller is a control loop that watches the state of your cluster and makes changes to move the current state closer to the desired state. For custom resources, a custom controller performs this reconciliation logic. It observes DatabaseInstance resources, for example, and then provisions actual database instances in a cloud provider or on-premises, configures them, and updates the status field of the DatabaseInstance CR to reflect its current state.
The critical requirement for any effective controller is the ability to watch for changes. Without this capability, a controller would either have to constantly poll the Kubernetes api server, which is inefficient and resource-intensive, or remain oblivious to state changes, rendering it ineffective. Imagine a controller managing a DatabaseInstance: if a user updates the requested storage size, the controller must be immediately aware of this change to scale up the underlying database. If a DatabaseInstance is deleted, the controller must de-provision the actual database. This dynamic, event-driven reaction is the cornerstone of robust and responsive Kubernetes operators.
Direct polling, where a controller periodically fetches a list of all resources and compares it to a previously known state, is fraught with issues. It generates unnecessary load on the api server, introduces latency in reacting to changes, and can lead to complex race conditions if not carefully managed. Furthermore, identifying granular changes (e.g., which field was updated) becomes a non-trivial task. This is precisely why Kubernetes provides sophisticated mechanisms for event-driven watching, which we will explore in detail using Golang's client-go library.
Architectural Deep Dive: Kubernetes API and the Client-Go Ecosystem
At the heart of Kubernetes lies the API Server, the central nervous system of the cluster. All internal components (kubelet, kube-scheduler, kube-controller-manager) and external clients (kubectl, custom controllers) communicate with the API Server via a RESTful API. This API is the single source of truth for the cluster's desired and actual state.
For developers building Kubernetes applications and controllers in Golang, the client-go library is the official and most comprehensive toolkit. It provides a type-safe way to interact with the Kubernetes API Server, abstracting away the complexities of HTTP requests, JSON serialization/deserialization, and authentication. client-go is not a monolithic library; it's a collection of packages, each serving a specific purpose:
k8s.io/client-go/kubernetes: This package provides clientsets for interacting with standard, built-in Kubernetes resources (e.g., Pods, Deployments, Services).k8s.io/client-go/dynamic: This client allows for generic interaction with any Kubernetes resource, including Custom Resources, without requiring their Go types to be known at compile time. It operates usingunstructured.Unstructuredobjects. This is incredibly flexible and often used for generic CRD controllers.k8s.io/client-go/rest: Contains core types for Kubernetes REST clients, including configuration (rest.Config) for connecting to the API Server.k8s.io/client-go/tools/clientcmd: Handles loading Kubernetes configuration fromkubeconfigfiles, crucial for out-of-cluster development.k8s.io/client-go/informers: This is the pivotal package for efficiently watching resources. It provides a powerful caching and eventing mechanism that offloads much of the complexity of state synchronization.
Before diving into informers, a crucial first step for any client-go application is establishing a connection to the Kubernetes API Server. This involves obtaining a rest.Config object, which encapsulates the API server address, authentication credentials, and TLS configuration.
In-Cluster Configuration: When your controller runs inside a Kubernetes cluster (e.g., as a Pod), it typically leverages the service account token mounted into its container. client-go automatically detects this environment:
import (
"k8s.io/client-go/rest"
)
func getConfigInCluster() (*rest.Config, error) {
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get in-cluster config: %w", err)
}
return config, nil
}
Out-of-Cluster Configuration: For local development or debugging, your controller needs to connect to a remote cluster using your kubeconfig file. This usually involves specifying the path to the kubeconfig file or relying on its default location (~/.kube/config):
import (
"flag"
"path/filepath"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func getConfigOutOfCluster() (*rest.Config, error) {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
return nil, fmt.Errorf("failed to get out-of-cluster config: %w", err)
}
return config, nil
}
Once you have a rest.Config, you can create specific clientsets, for example:
import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)
func createClients(config *rest.Config) (kubernetes.Interface, dynamic.Interface, error) {
// Create standard clientset for built-in resources
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, fmt.Errorf("failed to create kubernetes clientset: %w", err)
}
// Create dynamic client for custom resources
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, nil, fmt.Errorf("failed to create dynamic clientset: %w", err)
}
return kubeClient, dynamicClient, nil
}
The dynamic.Interface is particularly powerful for working with Custom Resources as it allows you to interact with objects generically using unstructured.Unstructured maps, without needing pre-generated Go types for every CRD you might encounter. This flexibility is often preferred for general-purpose controllers or when you don't want to regenerate code for every CRD change.
The Heart of Watching: Understanding Informers for Efficient Event Handling
As established, direct polling of the Kubernetes API Server for changes is inefficient and prone to issues. The solution provided by client-go is the Informer. An informer is a sophisticated mechanism designed to efficiently watch a specific Kubernetes resource type for changes, maintain an in-memory cache of these objects, and deliver events to registered handlers. It significantly reduces the load on the API Server and simplifies controller development by providing a reliable, event-driven stream of changes.
An informer essentially combines two fundamental API operations: List and Watch. 1. List: Initially, the informer performs a List operation to fetch all existing objects of the specified type. This populates its local cache. 2. Watch: After the initial List, the informer establishes a persistent Watch connection to the API Server. Any subsequent changes (creation, update, deletion) to objects of that type are streamed to the informer.
If the watch connection breaks (due to network issues, API Server restarts, etc.), the informer intelligently re-establishes the connection. More importantly, it performs a new List operation to resynchronize its cache and ensure no events were missed during the downtime. This robust re-listing and re-watching mechanism guarantees eventual consistency between the informer's cache and the API Server's state.
Key Components of an Informer
To truly appreciate the power of informers, it's helpful to understand their internal architecture:
Reflector: TheReflectoris the component directly responsible for interacting with the Kubernetes API Server. It performs theListoperation to initially populate the cache and then maintains aWatchconnection. When theWatchconnection breaks, theReflectorhandles the re-listing and re-watching logic, ensuring the event stream is continuous. It uses theresourceVersionfrom the last known object to ensure it only requests changes since that point.DeltaFIFO: This is a thread-safe queue that stores "deltas" (changes) received from theReflector. When theReflectorreceives anAdd,Update, orDeleteevent, it puts aDeltaobject, representing that change, into theDeltaFIFO. TheDeltaFIFOis crucial for preventing event loss during short processing delays and for handling theListandWatchresults seamlessly. It deduplicates events and ensures that if an object is updated multiple times before the controller processes it, only the latest version (or a sequence of meaningful deltas) is presented.Indexer: TheIndexeris a thread-safe, in-memory store (a local cache) that holds the actual Kubernetes objects. It's populated by processing events from theDeltaFIFO. TheIndexerallows controllers to quickly retrieve the latest state of an object without repeatedly querying the API Server. It supports indexing objects by arbitrary fields (e.g., by namespace, by labels), making it efficient to query for subsets of objects. This local cache is a significant performance optimization, drastically reducing the load on the API Server.Controller(within Informer context): ThisController(not to be confused with your custom application controller) orchestrates theReflectorandDeltaFIFO. It pops items from theDeltaFIFO, processes them (applying changes to theIndexer), and then dispatches theAdd,Update, orDeleteevent to any registeredResourceEventHandlerfunctions.
Shared Informer Factories
For controllers that manage multiple types of resources or for applications where several internal components need to watch the same resource type, client-go offers SharedInformerFactory. Instead of each component creating its own dedicated informer (which would lead to multiple List and Watch connections for the same resource type, causing redundant API Server load and memory consumption), a SharedInformerFactory provides a single entry point.
When you request an informer for a specific resource type from a SharedInformerFactory, it creates and manages a single underlying informer for that type. All components requesting that resource type will receive a pointer to the same informer and its cache. This optimizes resource usage both on the client side (memory, network connections) and on the Kubernetes API Server.
SharedInformerFactory is the recommended way to use informers in most production-grade controllers. It simplifies the overall architecture and ensures efficient resource watching across your application.
APIPark is a high-performance AI gateway that allows you to securely access the most comprehensive LLM APIs globally on the APIPark platform, including OpenAI, Anthropic, Mistral, Llama2, Google Gemini, and more.Try APIPark now! πππ
Implementing a Custom Resource Watcher in Golang: A Step-by-Step Guide
Building a custom resource watcher involves several interconnected steps, from defining your custom resource to setting up the informer and processing events. We'll walk through this process, focusing on best practices and the client-go components.
Step 1: Define Your Custom Resource Types
First, you need to define the Go structs that represent your Custom Resource. These structs typically reside in a types.go file within your project's api/<version> directory. A Custom Resource typically consists of TypeMeta (for API version and kind), ObjectMeta (for name, namespace, labels, annotations), Spec (the desired state), and Status (the actual observed state).
Let's imagine a custom resource called MyService within the example.com group, version v1alpha1.
// api/v1alpha1/types.go
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +genclient:nonNamespaced
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyService is the Schema for the myservices API
type MyService struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MyServiceSpec `json:"spec,omitempty"`
Status MyServiceStatus `json:"status,omitempty"`
}
// MyServiceSpec defines the desired state of MyService
type MyServiceSpec struct {
Image string `json:"image"`
Replicas int32 `json:"replicas"`
Port int32 `json:"port"`
// Add other fields relevant to your service
}
// MyServiceStatus defines the observed state of MyService
type MyServiceStatus struct {
ReadyReplicas int32 `json:"readyReplicas"`
Phase string `json:"phase"` // e.g., "Pending", "Running", "Failed"
// Add other status fields
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyServiceList contains a list of MyService
type MyServiceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []MyService `json:"items"`
}
The comments +genclient, +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object are directives for controller-gen, a powerful tool that automatically generates client-go boilerplate (deep copy methods, clientsets, informers, listers) from your Go types. This is highly recommended for any non-trivial controller.
Step 2: Generate Client-Go Boilerplate (Recommended)
While you can interact with CRs using the dynamic client and unstructured.Unstructured objects, for a more type-safe and integrated experience, especially for larger projects, generating client-go code is the standard approach. Tools like controller-gen (part of the kubernetes-sigs/controller-tools project) automate this:
- Define your CRD YAML manifest: This describes your
MyServiceCRD to Kubernetes. - Run
controller-gen: This command generates:zz_generated.deepcopy.go: Deep copy methods for your Go types.- Client-sets: Type-safe clients for your custom resource.
- Informers and Listers: For efficient watching and caching specific to your
MyServicetype.
For the scope of this guide, we will primarily demonstrate using the dynamic client and dynamicinformer to keep the example self-contained, but be aware of controller-gen for production-grade setups.
Step 3: Create a Kubernetes Client for Your CRD
As discussed, we'll use the dynamic.Interface for generic CRD interaction. This avoids the need for pre-generated client-sets for our MyService type, making the example more universal for any CRD.
import (
"fmt"
"os"
"k8s.io/client-go/rest"
"k8s.io/client-go/dynamic"
// ... other imports
)
func getDynamicClient() (dynamic.Interface, error) {
config, err := rest.InClusterConfig()
if err != nil {
// Fallback to kubeconfig for local development
// In a real application, you might use clientcmd.BuildConfigFromFlags
// with proper flag parsing.
fmt.Println("Running outside cluster, attempting to load kubeconfig...")
kubeconfigPath := os.Getenv("KUBECONFIG") // Or hardcode ~/.kube/config
if kubeconfigPath == "" {
kubeconfigPath = filepath.Join(homedir.HomeDir(), ".kube", "config")
}
config, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return nil, fmt.Errorf("error creating rest config from kubeconfig: %w", err)
}
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("error creating dynamic client: %w", err)
}
return dynamicClient, nil
}
Step 4: Set Up the Shared Informer Factory
We'll use dynamicinformer.NewFilteredDynamicSharedInformerFactory because it works with unstructured.Unstructured objects and can be easily configured to watch specific namespaces.
import (
"time"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
// ... other imports
)
func createDynamicInformerFactory(dynamicClient dynamic.Interface) dynamicinformer.DynamicSharedInformerFactory {
// Resync period: How often the informer's cache should be resynced
// In practice, a long period like 10 hours is fine, as most changes come via watch events.
resyncPeriod := 10 * time.Hour
// Filter by namespace if needed. Use metav1.NamespaceAll to watch all namespaces.
// In a real controller, you might scope this to the controller's own namespace.
namespace := metav1.NamespaceAll
// Create a dynamic shared informer factory.
// The TweakListOptions can be used to add label selectors, field selectors etc.
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
dynamicClient,
resyncPeriod,
namespace,
nil, // No TweakListOptions for this example
)
return factory
}
Step 5: Obtain the Informer for Your Custom Resource
You need to specify the Group, Version, and Resource (GVR) of your Custom Resource to the factory. The resource name should be the plural form used in your CRD. For our MyService example, assuming its plural name is myservices:
import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/dynamic/dynamicinformer"
// ... other imports
)
func getCRDInformer(factory dynamicinformer.DynamicSharedInformerFactory) cache.SharedIndexInformer {
// Define the GroupVersionResource (GVR) for your Custom Resource
// This must match your CRD's group, version, and plural resource name.
gvr := schema.GroupVersionResource{
Group: "example.com",
Version: "v1alpha1",
Resource: "myservices", // Plural name of your CR
}
// Get the informer for your specific GVR
// The factory will create it if it doesn't exist.
informer := factory.ForResource(gvr).Informer()
return informer
}
Step 6: Register Event Handlers
This is where your controller defines its reactions to Add, Update, and Delete events. You provide callback functions that will be invoked when these events occur. Inside these handlers, you'll typically add the affected object to a work queue for asynchronous processing.
import (
"fmt"
"k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
// ... other imports
)
// Simplified event handler for demonstration
// In a real controller, you would push the key to a workqueue.
func registerEventHandlers(informer cache.SharedIndexInformer) {
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
fmt.Println("Error: object is not an Unstructured on AddFunc")
return
}
fmt.Printf("MyService ADDED: %s/%s\n", unstructuredObj.GetNamespace(), unstructuredObj.GetName())
// In a real controller, you'd add this object's key to a workqueue
// e.g., controller.workqueue.Add(cache.MetaNamespaceKeyFunc(unstructuredObj))
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldUnstructured, ok := oldObj.(*unstructured.Unstructured)
if !ok {
fmt.Println("Error: old object is not an Unstructured on UpdateFunc")
return
}
newUnstructured, ok := newObj.(*unstructured.Unstructured)
if !ok {
fmt.Println("Error: new object is not an Unstructured on UpdateFunc")
return
}
// Often, you'd check for specific changes in Spec or annotations here
fmt.Printf("MyService UPDATED: %s/%s -> ResourceVersion: %s\n",
newUnstructured.GetNamespace(), newUnstructured.GetName(),
newUnstructured.GetResourceVersion())
// Add to workqueue
},
DeleteFunc: func(obj interface{}) {
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
// Handle DeletedFinalStateUnknown objects
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
fmt.Println("Error: object is not Unstructured or DeletedFinalStateUnknown on DeleteFunc")
return
}
unstructuredObj, ok = tombstone.Obj.(*unstructured.Unstructured)
if !ok {
fmt.Println("Error: tombstone object is not Unstructured on DeleteFunc")
return
}
}
fmt.Printf("MyService DELETED: %s/%s\n", unstructuredObj.GetNamespace(), unstructuredObj.GetName())
// Add to workqueue
},
})
}
In a full-fledged controller, the event handlers don't directly perform heavy logic. Instead, they extract a unique identifier for the object (often its namespace/name key) and add it to a rate-limiting work queue. This decouples event reception from event processing, allows for error retries, and prevents blocking the informer's event loop.
Step 7: Start the Informer and Wait for Sync
Once handlers are registered, you need to start the informer factory and ensure its caches are synchronized. factory.Start(stopCh) initiates all informers managed by the factory in goroutines. cache.WaitForCacheSync is a critical function that blocks until all caches managed by the informer factory are populated with the initial List results. This prevents your controller from acting on an incomplete view of the cluster state. The stopCh (a <-chan struct{} or context.Context) is used for graceful shutdown.
import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/dynamic/dynamicinformer"
// ... other imports
)
func runInformer(ctx context.Context, factory dynamicinformer.DynamicSharedInformerFactory, informer cache.SharedIndexInformer) {
// Start all informers in the factory.
factory.Start(ctx.Done())
// Wait for the cache to be synced. This is crucial!
// Without this, your handlers might fire before the cache is fully populated,
// leading to inconsistencies or errors.
fmt.Println("Waiting for MyService informer caches to sync...")
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
fmt.Println("Error: MyService informer caches failed to sync")
// Potentially exit or handle this critical error
return
}
fmt.Println("MyService informer caches synced successfully.")
// Keep the main goroutine alive until context is cancelled
<-ctx.Done()
fmt.Println("Informer stopped.")
}
Step 8: Implement the Reconciliation Logic (Controller Pattern)
The core logic of your controller resides in a reconciliation loop. This loop continuously pulls items from the work queue, fetches the latest object from the informer's local cache, compares the desired state (from MyService.Spec) with the actual state (by checking other Kubernetes resources like Deployments/Services or external apis), and takes corrective actions. After performing actions, the controller updates the MyService.Status field to reflect the current state.
This is where your business logic comes alive. For a MyService controller, this might involve: 1. Creating/Updating Deployments: To run the specified image with replicas. 2. Creating/Updating Services: To expose the Deployment internally or externally on port. 3. Configuring External Resources: Such as a cloud load balancer or a database instance. 4. Error Handling: If an action fails, the item is re-queued with a backoff. 5. Garbage Collection: Deleting dependent resources when a MyService is deleted.
// A very simplified representation of a Controller
type Controller struct {
informer cache.SharedIndexInformer
workqueue workqueue.RateLimitingInterface
dynamicClient dynamic.Interface
// Add other clients you need (e.g., kubernetes.Interface)
}
func NewController(
informer cache.SharedIndexInformer,
dynamicClient dynamic.Interface,
) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// Register event handlers that push keys to the workqueue
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(obj) // Get "namespace/name" key
queue.Add(key)
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(newObj)
queue.Add(key)
},
DeleteFunc: func(obj interface{}) {
key, _ := cache.MetaNamespaceKeyFunc(obj)
queue.Add(key)
},
})
return &Controller{
informer: informer,
workqueue: queue,
dynamicClient: dynamicClient,
}
}
func (c *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
fmt.Println("Starting MyService controller workers")
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, ctx.Done())
}
<-ctx.Done()
fmt.Println("Shutting down MyService controller workers")
}
func (c *Controller) runWorker() {
for c.processNextItem() {}
}
func (c *Controller) processNextItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
defer c.workqueue.Done(obj)
err := c.syncHandler(obj.(string)) // obj is the key "namespace/name"
c.handleErr(err, obj)
return true
}
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil // Don't retry invalid keys
}
// Get the MyService object from the informer's cache
obj, exists, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
return fmt.Errorf("error fetching object with key %s from store: %w", key, err)
}
if !exists {
fmt.Printf("MyService %s/%s no longer exists, cleaning up...\n", namespace, name)
// Perform cleanup (delete associated Deployments, Services, etc.)
return nil
}
// Cast to unstructured.Unstructured (since we're using dynamic client)
myService := obj.(*unstructured.Unstructured)
fmt.Printf("Reconciling MyService %s/%s (ResourceVersion: %s)\n",
myService.GetNamespace(), myService.GetName(), myService.GetResourceVersion())
// --- YOUR RECONCILIATION LOGIC GOES HERE ---
// Example: Read spec, create/update K8s Deployments/Services, update MyService status
//
// For instance, your controller might provision an actual service
// and then, potentially, expose it through an API Gateway.
// When managing various services, whether they originate from Kubernetes Custom Resources
// or external systems, a unified API management platform becomes invaluable.
// Tools like [APIPark](https://apipark.com/) offer an all-in-one solution for
// managing, integrating, and deploying AI and REST services.
// It can streamline the process of exposing the functionality your
// `MyService` controller provisions, providing features like authentication,
// traffic management, and detailed logging for the API consumers.
//
// Update MyService status to reflect the actual state
// For example:
// currentStatus := myService.Object["status"].(map[string]interface{})
// currentStatus["phase"] = "Running" // Or "Provisioning", "Failed"
// currentStatus["readyReplicas"] = 1 // Or actual count
// myService.Object["status"] = currentStatus
// _, err = c.dynamicClient.Resource(myServiceGVR).Namespace(namespace).UpdateStatus(ctx, myService, metav1.UpdateOptions{})
// if err != nil {
// return fmt.Errorf("failed to update status for MyService %s/%s: %w", namespace, name, err)
// }
fmt.Printf("MyService %s/%s reconciled successfully.\n", namespace, name)
return nil
}
func (c *Controller) handleErr(err error, obj interface{}) {
if err == nil {
c.workqueue.Forget(obj) // Item was processed successfully, remove from queue
return
}
if c.workqueue.NumRequeues(obj) < 5 { // Max retries
fmt.Printf("Error processing %v (will retry): %v\n", obj, err)
c.workqueue.AddRateLimited(obj) // Re-add with exponential backoff
return
}
c.workqueue.Forget(obj) // Too many retries, give up on this item
utilruntime.HandleError(fmt.Errorf("dropping MyService %q out of the workqueue: %v", obj, err))
}
This controller structure, utilizing a rate-limiting work queue and a reconciliation loop, is the canonical pattern for building robust Kubernetes operators. It ensures that events are processed reliably, even in the face of temporary errors or high event rates.
Putting It All Together: A main.go Example
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"k8s.io/client-go/util/workqueue"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
)
// MyService GVR - Must match your CRD
var myServiceGVR = schema.GroupVersionResource{
Group: "example.com",
Version: "v1alpha1",
Resource: "myservices", // Plural name of your CR
}
// Controller struct (as defined in Step 8)
type Controller struct {
informer cache.SharedIndexInformer
workqueue workqueue.RateLimitingInterface
dynamicClient dynamic.Interface
}
func NewController(
informer cache.SharedIndexInformer,
dynamicClient dynamic.Interface,
) *Controller {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
queue.Add(key)
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", newObj, err))
return
}
queue.Add(key)
},
DeleteFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
queue.Add(key)
},
})
return &Controller{
informer: informer,
workqueue: queue,
dynamicClient: dynamicClient,
}
}
func (c *Controller) Run(ctx context.Context, workers int) {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
fmt.Println("Starting MyService controller workers")
// Run the shared informer factory
go c.informer.Run(ctx.Done()) // Start the informer directly here
// Wait for the cache to be synced.
fmt.Println("Waiting for MyService informer caches to sync...")
if !cache.WaitForCacheSync(ctx.Done(), c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("failed to sync MyService informer cache"))
return
}
fmt.Println("MyService informer caches synced successfully.")
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, ctx.Done())
}
<-ctx.Done()
fmt.Println("Shutting down MyService controller workers")
}
func (c *Controller) runWorker() {
for c.processNextItem() {}
}
func (c *Controller) processNextItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
defer c.workqueue.Done(obj)
err := c.syncHandler(obj.(string)) // obj is the key "namespace/name"
c.handleErr(err, obj)
return true
}
func (c *Controller) syncHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil // Don't retry invalid keys
}
obj, exists, err := c.informer.GetIndexer().GetByKey(key)
if err != nil {
return fmt.Errorf("error fetching object with key %s from store: %w", key, err)
}
if !exists {
fmt.Printf("MyService %s/%s no longer exists, cleaning up associated resources...\n", namespace, name)
// Implement cleanup logic here (e.g., delete Deployment, Service)
return nil
}
myService := obj.(*unstructured.Unstructured)
fmt.Printf("Reconciling MyService %s/%s (ResourceVersion: %s) with spec: %+v\n",
myService.GetNamespace(), myService.GetName(), myService.GetResourceVersion(), myService.Object["spec"])
// --- YOUR RECONCILIATION LOGIC ---
// This is where you would interact with the Kubernetes API to create/update/delete
// other resources (e.g., Deployments, Services) based on myService.Object["spec"].
//
// For example, if MyService's spec includes an 'image' and 'replicas',
// you would ensure a Deployment exists with those properties.
// After your controller provisions the necessary infrastructure and services,
// you might consider how these services are exposed to consumers.
// [APIPark](https://apipark.com/) is an open-source AI gateway and API management platform
// that simplifies the management, integration, and deployment of both AI and REST services.
// It could be used to expose the services managed by your `MyService` controller,
// providing crucial features like authentication, traffic control, and monitoring,
// making your custom resources' capabilities accessible in a controlled and scalable manner.
//
// Don't forget to update the status of your MyService CR after reconciliation!
// Example status update:
// statusMap, ok := myService.Object["status"].(map[string]interface{})
// if !ok || statusMap == nil {
// statusMap = make(map[string]interface{})
// }
// statusMap["readyReplicas"] = 1 // Example
// statusMap["phase"] = "Running"
// myService.Object["status"] = statusMap
//
// _, err = c.dynamicClient.Resource(myServiceGVR).Namespace(namespace).UpdateStatus(ctx, myService, metav1.UpdateOptions{})
// if err != nil {
// runtime.HandleError(fmt.Errorf("failed to update status for MyService %s/%s: %w", namespace, name, err))
// return err // Requeue on status update failure
// }
fmt.Printf("MyService %s/%s reconciled successfully.\n", namespace, name)
return nil
}
func (c *Controller) handleErr(err error, obj interface{}) {
if err == nil {
c.workqueue.Forget(obj)
return
}
if c.workqueue.NumRequeues(obj) < 5 {
fmt.Printf("Error processing %v (will retry): %v\n", obj, err)
c.workqueue.AddRateLimited(obj)
return
}
c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("dropping MyService %q out of the workqueue: %v", obj, err))
}
func main() {
// Graceful shutdown context
ctx, cancel := context.WithCancel(context.Background())
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("Received shutdown signal, initiating graceful shutdown...")
cancel()
}()
// 1. Get Kubernetes REST config
var config *rest.Config
var err error
kubeconfig := flag.String("kubeconfig", "", "Path to a kubeconfig file (only for out-of-cluster development)")
flag.Parse()
if *kubeconfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else {
config, err = rest.InClusterConfig()
if err != nil {
fmt.Println("Could not get in-cluster config, falling back to default kubeconfig path.")
kubeconfigPath := filepath.Join(homedir.HomeDir(), ".kube", "config")
config, err = clientcmd.BuildConfigFromFlags("", kubeconfigPath)
}
}
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to get Kubernetes config: %v\n", err)
os.Exit(1)
}
// 2. Create Dynamic Client
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create dynamic client: %v\n", err)
os.Exit(1)
}
// 3. Create Dynamic Shared Informer Factory
resyncPeriod := 10 * time.Hour // Long resync period as watch handles most changes
// Only watch in current namespace for simplicity, or metav1.NamespaceAll for cluster-wide
factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(
dynamicClient,
resyncPeriod,
metav1.NamespaceAll, // Watch all namespaces
nil, // No TweakListOptions
)
// 4. Get Informer for MyService CRD
myServiceInformer := factory.ForResource(myServiceGVR).Informer()
// 5. Create and Run Controller
controller := NewController(myServiceInformer, dynamicClient)
// Start the controller (including its informers)
controller.Run(ctx, 2) // Run with 2 worker goroutines
// Wait for context cancellation (from signal handler)
<-ctx.Done()
fmt.Println("Controller gracefully shut down.")
}
This main.go structure demonstrates the full lifecycle: obtaining configuration, creating clients, setting up the informer factory, getting the specific informer, instantiating your custom controller with the informer, and running it with graceful shutdown.
Best Practices and Advanced Topics for Robust Controllers
Developing Kubernetes controllers involves more than just watching resources; it requires careful consideration of robustness, scalability, and security.
Error Handling and Retries
The workqueue.RateLimitingInterface is your best friend here. It provides automatic exponential backoff for failed reconciliation attempts, preventing a failing reconciliation from hammering the API server or constantly retrying a problematic item immediately. By calling c.workqueue.AddRateLimited(key) on errors and c.workqueue.Forget(key) on success, you ensure resilient processing.
Resource Version and Conflict Resolution
Every Kubernetes object has a resourceVersion field. When you update an object, you should include the resourceVersion of the object you last read. If the object has been modified by another client in the interim (meaning its resourceVersion is different), the API server will return a conflict error. Your controller should be prepared to handle apierrors.IsConflict(err) errors by re-fetching the latest object from the informer's cache (or the API server), re-evaluating its state, and retrying the update.
Garbage Collection and Owner References
When your controller creates other Kubernetes resources (like Deployments, Services) based on a custom resource, it's crucial to establish ownership. By setting the custom resource as the OwnerReference for the created resources, Kubernetes' built-in garbage collector will automatically delete the dependent resources when the custom resource itself is deleted. This prevents orphaned resources and simplifies cleanup logic.
// Example of setting owner reference (simplified)
ownerRef := *metav1.NewControllerRef(myService, myServiceGVR.GroupVersion().WithKind("MyService"))
deployment.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ownerRef}
Namespaces and Scoping
Controllers can be designed to be cluster-scoped (watching resources across all namespaces) or namespace-scoped (watching only in a single, designated namespace). The NewFilteredDynamicSharedInformerFactory allows you to specify a namespace argument (metav1.NamespaceAll for cluster-scope or a specific namespace string). Be mindful of the RBAC permissions required for each scope. Cluster-scoped controllers require broader permissions.
Context and Graceful Shutdown
Using context.Context throughout your controller's goroutines is essential for managing lifecycles and enabling graceful shutdown. When a termination signal (like SIGTERM or SIGINT) is received, you should cancel the root context. This signal propagates, allowing long-running goroutines (like the informer's Run loop and worker goroutines) to detect the cancellation and clean up resources before exiting. This prevents abrupt termination and potential data corruption.
Testing Your Controller
Thorough testing is paramount. * Unit Tests: Test individual functions and reconciliation logic in isolation, using mock clients where necessary. * Integration Tests: Run your controller against a real (or kind/minikube) Kubernetes cluster. Use a test framework like envtest from sigs.k8s.io/controller-runtime/pkg/envtest to spin up a minimal API server and etcd for testing. * End-to-End (E2E) Tests: Deploy your controller and create/update/delete your custom resources, asserting that the expected changes occur in the cluster.
Performance Considerations
- Efficient Event Processing: The work queue pattern naturally handles bursty events. Avoid doing heavy computation directly in event handlers.
- Lister for Reads: Always read objects from the informer's local cache (via its
GetIndexer()andGet()orList()methods) during reconciliation. Avoid direct API server reads for objects you are watching, as this defeats the purpose of the cache and adds unnecessary load. - Resource Throttling: If your controller interacts with external apis or systems, implement proper rate limiting and circuit breakers to avoid overwhelming those services.
Security: RBAC for Controllers
Your controller Pod will run with a Service Account. You must define appropriate Role-Based Access Control (RBAC) rules (Roles/ClusterRoles and RoleBindings/ClusterRoleBindings) that grant this Service Account only the minimum necessary permissions to: * get, list, watch, create, update, patch, delete your custom resources. * get, list, watch, create, update, patch, delete any standard Kubernetes resources it manages (e.g., Deployments, Services, Pods). * get namespaces (if cluster-scoped).
Failing to apply the principle of least privilege can create significant security vulnerabilities.
Table: Comparison of Kubernetes Resource Interaction Methods
To summarize the benefits of informers, let's compare different approaches to interacting with Kubernetes resources:
| Feature / Method | Direct client-go List |
Direct client-go Watch |
Informer (ListAndWatch with Cache) |
|---|---|---|---|
| API Calls Made | GET (full list) |
GET (long-polling stream) |
Initial GET (list), then GET (watch stream) |
| State Management | None; client responsible for tracking state | None; client receives raw events, responsible for state | Local Cache (Indexer) maintained by informer |
| Efficiency | Low (full list on each check, high API Server load) | Better (event-driven), but client must handle re-connection & state manually | High (event-driven, auto-relist, shared cache, low API Server load) |
| API Server Load | High | Moderate (for continuous watch, but still a dedicated connection per client) | Low (single watch stream per resource type, shared cache reduces GETs) |
| Event Loss Risk | High (if polling interval is long, changes can be missed) | Moderate (on connection drops, unless client re-establishes carefully) | Low (DeltaFIFO and auto-relist logic ensure eventual consistency) |
| Latency to React | High (depends on polling interval) | Low (near real-time) | Low (near real-time, events processed from cache) |
| Complexity for Client | Low (simple List call) |
Medium (manual stream processing, re-connection, state tracking) | High (initial setup of factory, informer, handlers), Low (day-to-day event processing) |
| Ideal Use Case | One-off queries, diagnostic tools, kubectl get |
Simple scripts reacting to a few events, not for continuous state management | Robust controllers, operators, complex applications requiring consistent state |
The table clearly illustrates why informers are the preferred and almost universally adopted method for building sophisticated, reliable, and performant Kubernetes controllers in Golang. They abstract away a significant amount of boilerplate and complex error handling, allowing developers to focus on the core reconciliation logic of their custom resources.
Conclusion
The ability to watch for changes to Custom Resources is fundamental to unlocking the full power of Kubernetes extensibility. By diligently observing the desired state expressed through your custom objects and reacting dynamically, you can build self-managing, intelligent systems that automate complex operational tasks. Golang, with its robust client-go library and the sophisticated informer pattern, provides an elegant and efficient toolkit for this endeavor.
Throughout this guide, we've dissected the architecture of client-go informers, explored their core components like the Reflector, DeltaFIFO, and Indexer, and provided a step-by-step blueprint for implementing a custom resource watcher. From defining your custom types to establishing connection configurations, registering event handlers, and orchestrating the reconciliation loop, we've covered the essential elements. We also touched upon crucial best practices, including error handling, garbage collection with owner references, and the importance of RBAC, all designed to ensure your controllers are not only functional but also resilient, secure, and scalable.
Embracing this event-driven, cache-backed approach with informers will significantly reduce the complexity of your Kubernetes controllers, enhance their performance, and pave the way for a more automated and self-healing cloud-native infrastructure. As Kubernetes continues to evolve, mastering these core principles will remain a vital skill for any developer looking to build the next generation of cloud-native applications and platforms.
Frequently Asked Questions (FAQs)
1. What's the primary benefit of using an informer over direct Kubernetes API calls (List/Watch)?
The primary benefit of using an informer is efficiency and robustness. Informers maintain a local, in-memory cache of Kubernetes objects, significantly reducing the load on the Kubernetes API Server by minimizing direct GET requests. They also handle complex logic like maintaining a persistent Watch connection, re-listing on watch breaks, and ensuring event delivery order via DeltaFIFO. This allows your controller to react to changes near real-time without constantly polling, leading to better performance, lower resource consumption, and simplified controller logic compared to manual List and Watch implementation.
2. How do I handle updates and deletes in my Kubernetes controller?
Updates and deletes are handled by registering UpdateFunc and DeleteFunc callbacks with the informer. When an object is updated or deleted, these functions are triggered, and your controller typically adds the unique key (namespace/name) of the affected object to a rate-limiting work queue. The reconciliation loop then processes this key: if the object exists in the cache, it's an update; if it doesn't, it indicates a deletion, prompting cleanup of any associated resources. The DeltaFIFO within the informer ensures that even if an object is updated multiple times rapidly, the work queue receives a consolidated or ordered sequence of events, helping to avoid race conditions.
3. Can I watch custom resources in specific namespaces only?
Yes, you can. When creating a dynamicinformer.NewFilteredDynamicSharedInformerFactory (or any SharedInformerFactory), you provide a namespace argument. If you set this to metav1.NamespaceAll (an empty string), the informer will watch resources across all namespaces in the cluster. If you provide a specific namespace string (e.g., "my-app-namespace"), the informer will only watch for changes to custom resources within that particular namespace. This is crucial for scoping controllers to specific tenants or application environments.
4. What is a SharedInformerFactory and why is it useful?
A SharedInformerFactory is a mechanism in client-go that allows multiple components within a single application to share the same underlying informer for a given resource type. Instead of each component creating its own dedicated informer (which would lead to redundant API Server connections, caches, and memory usage), the factory creates and manages a single informer instance. When multiple parts of your application request an informer for the same resource, they all receive a pointer to this shared instance. This significantly optimizes resource utilization on both the client side and the Kubernetes API Server, making it the recommended approach for building complex, multi-functional controllers.
5. What are the common pitfalls to avoid when developing Kubernetes controllers in Go?
Common pitfalls include: * Forgetting cache.WaitForCacheSync: Not waiting for the informer's cache to synchronize before starting worker goroutines can lead to your controller acting on an incomplete view of the cluster state. * Direct API server reads in reconciliation: Always retrieve objects from the informer's local cache during reconciliation; direct API server calls defeat the purpose of the cache and add unnecessary load. * Lack of OwnerReference: Failing to set OwnerReference for dependent resources created by your controller can lead to orphaned resources when the custom resource is deleted. * Inadequate RBAC: Granting excessive permissions to your controller's Service Account violates the principle of least privilege and creates security vulnerabilities. * Blocking event handlers: Performing heavy computation or long-running operations directly in AddFunc, UpdateFunc, or DeleteFunc can block the informer's event loop, causing event backlogs. Always push items to a work queue for asynchronous processing. * Ignoring resourceVersion for updates: Not using the resourceVersion when updating objects can lead to lost updates due to optimistic locking conflicts.
πYou can securely and efficiently call the OpenAI API on APIPark in just two steps:
Step 1: Deploy the APIPark AI gateway in 5 minutes.
APIPark is developed based on Golang, offering strong product performance and low development and maintenance costs. You can deploy APIPark with a single command line.
curl -sSO https://download.apipark.com/install/quick-start.sh; bash quick-start.sh

In my experience, you can see the successful deployment interface within 5 to 10 minutes. Then, you can log in to APIPark using your account.

Step 2: Call the OpenAI API.

