How to Watch for Changes to Custom Resources in Golang
The modern cloud-native landscape, dominated by Kubernetes, thrives on extensibility and automation. At the heart of this extensibility lie Custom Resources (CRs), which allow users to define their own API objects, effectively extending the Kubernetes API to manage application-specific infrastructure and workflows. However, merely defining these resources is only half the battle; the true power emerges when systems can actively monitor these custom definitions and react to their changes, orchestrating complex operations and maintaining desired states. This capability is paramount for building robust operators and controllers, which are the backbone of self-managing applications within Kubernetes.
In this comprehensive guide, we embark on a journey to demystify the process of watching for changes to Custom Resources using Golang, the preferred language for Kubernetes development. We will delve deep into the client-go library, the canonical way to interact with the Kubernetes API from Go, exploring its powerful components like informers, listers, indexers, and workqueues. Our aim is to equip you with the knowledge and practical insights to build sophisticated, event-driven controllers that can seamlessly observe and respond to the evolving state of your Custom Resources, ensuring your applications remain resilient, automated, and perfectly aligned with your operational needs. This isn't just about parsing events; it's about understanding the foundational architecture that enables Kubernetes to become a truly programmable platform, where your custom logic can intertwine gracefully with its core functionalities.
The Foundation: Understanding Custom Resources and Kubernetes Extensibility
Before diving into the specifics of Go programming, it's crucial to grasp the fundamental concepts of Kubernetes extensibility, particularly Custom Resources (CRs) and Custom Resource Definitions (CRDs). Kubernetes, by design, is a highly extensible platform. While it provides a rich set of built-in resource types like Pods, Deployments, and Services, real-world applications often require managing unique types of infrastructure or application components that don't neatly fit into these predefined categories. This is where CRDs come into play, offering a powerful mechanism to extend the Kubernetes API with your own domain-specific objects.
A Custom Resource Definition (CRD) is essentially a blueprint that tells the Kubernetes API server about a new, user-defined resource type. It's a standard Kubernetes resource itself, but its purpose is to define other resources. When you create a CRD, you're not creating an instance of a custom resource yet; instead, you're informing Kubernetes how to validate, store, and serve your custom objects. This includes defining its schema (what fields it can have), its scope (namespace-scoped or cluster-scoped), and its versioning. Once a CRD is registered with Kubernetes, you can then create instances of that custom resource, which are actual data objects that conform to the schema defined by the CRD. These custom resources behave just like native Kubernetes resources: they can be created, updated, deleted, and watched through the standard Kubernetes API. This uniformity is incredibly powerful, as it allows developers to manage diverse workloads using the same declarative principles and tools they already use for native Kubernetes objects, creating a consistent operational experience across the entire cluster.
The motivation behind using CRDs is multi-faceted. Firstly, they enable true native integration. Instead of managing external databases or configurations for application-specific settings, you can store this information directly within the Kubernetes API server, leveraging its robust data store (etcd), replication, and security model. Secondly, CRDs promote a declarative API where users describe their desired state, and a controller (which we will build) works to achieve that state. This shifts the paradigm from imperative commands to a more resilient, self-healing system. Thirdly, CRDs allow for greater specialization. For example, you could define a DatabaseCluster CRD to manage a group of database instances, abstracting away the underlying complexities of StatefulSets, Services, and PersistentVolumes into a single, high-level object. This greatly simplifies the user experience for application developers, who can then interact with these specialized abstractions rather than the intricate low-level details. Understanding this foundational layer is critical because our Golang controllers will specifically target and interact with these custom resource instances, listening for every subtle change to maintain their desired operational state.
The Role of Controllers and Operators in Kubernetes
With Custom Resources defined, the next logical step is to bring them to life through automation. This is where Kubernetes controllers and operators enter the scene. At their core, controllers are reconciliation loops. They continuously watch a specific type of resource (be it a native Kubernetes resource like a Deployment or a custom resource like our MyApplication) and ensure that the actual state of the cluster matches the desired state described in that resource's specification. This fundamental principle is what makes Kubernetes so powerful and self-healing: you declare what you want, and the system continuously works to make it so.
An operator is essentially a specialized type of controller that understands how to manage a particular application or service. It leverages CRDs to encapsulate domain-specific knowledge, automating the deployment, scaling, backup, and even failure recovery of complex applications. For instance, a database operator might watch a PostgresCluster custom resource. When a user defines a PostgresCluster with three replicas, the operator springs into action. It creates the necessary StatefulSets, Services, PersistentVolumeClaims, and even configures replication and failover mechanisms, all driven by the specifications within that custom resource. If a database instance fails, the operator detects the discrepancy between the desired state (three healthy replicas) and the actual state (two healthy, one failed) and takes corrective actions, like provisioning a new instance and re-attaching it to the cluster. This level of sophisticated automation is what transforms Kubernetes from a container orchestrator into an application platform.
The process of watching for changes to custom resources is absolutely central to how controllers and operators function. Without this ability, a controller would be blind to user requests, configuration updates, or failures within the cluster. Every action, every state change, every decision an operator makes stems from observing the current state of its managed resources and comparing it against the desired state. Our Golang code will implement this crucial observation mechanism, forming the "eyes and ears" of our controller within the Kubernetes ecosystem. We'll be building the very core of what allows an operator to react intelligently and automatically to the dynamic environment of a Kubernetes cluster, bridging the gap between a declarative API and the imperative actions required to achieve the desired system state.
Introducing client-go: The Golang Gateway to Kubernetes API
To interact with the Kubernetes API from Go, the official and most widely used library is client-go. This library provides a comprehensive set of tools and packages that allow Go applications to programmatically create, read, update, delete, and watch Kubernetes resources. It handles the complexities of API communication, authentication, serialization, and error handling, abstracting away much of the underlying HTTP and JSON interactions. client-go is the same library used by Kubernetes itself for its internal components, making it the de-facto standard for building robust and reliable Kubernetes integrations in Go.
At a high level, client-go offers several key functionalities:
- REST Client: A low-level client for direct HTTP calls to the Kubernetes API. While powerful, it requires manual handling of resource types, versions, and serialization.
- Typed Clientset: The most common way to interact with standard Kubernetes resources. It provides strongly typed interfaces for each resource (e.g.,
corev1.Pods(),appsv1.Deployments()), making code safer and easier to write. These clients are generated from the Kubernetes API definitions. - Dynamic Client: A versatile client for interacting with any Kubernetes resource, including Custom Resources, without needing generated Go types. It operates on
unstructured.Unstructuredobjects, which are essentially Go maps representing the JSON structure of a resource. This is particularly useful for CRDs where code generation might not be feasible or desired, or for generic tools that need to work with arbitrary resource types. - Informers: The core mechanism for watching changes efficiently, which will be our primary focus. Informers provide a shared cache and event-driven notifications for resource changes, drastically reducing API server load and simplifying controller logic.
- Scheme and Code Generation: Tools to help define the Go types for your Custom Resources and register them with Kubernetes' internal type system, enabling typed clients for CRDs.
The client-go library is not just a thin wrapper around the Kubernetes API; it's a sophisticated framework designed to address the challenges of building scalable and resilient controllers. It incorporates best practices for caching, event handling, and error management, which are crucial for applications that need to maintain a consistent view of the cluster state over long periods. When you write a controller in Go, client-go becomes your indispensable toolkit, providing all the necessary primitives to observe, react, and manage your Kubernetes resources effectively. Its robust design is why it forms the bedrock for virtually all Kubernetes operators and custom controllers developed in Go, enabling complex automation scenarios to be built upon a solid, well-tested foundation.
Setting Up Your Go Environment and Initial Project Structure
Before we write any code to watch for Custom Resources, we need to ensure our Go development environment is properly configured and lay out a basic project structure. This foundational step is crucial for maintainability and scalability, especially as your controller logic grows in complexity.
First, ensure you have Go installed (version 1.18 or newer is recommended). You can verify your installation by running go version in your terminal.
Next, we'll create a new Go module for our controller project. Navigate to your desired development directory and execute the following commands:
mkdir my-custom-resource-controller
cd my-custom-resource-controller
go mod init github.com/your-username/my-custom-resource-controller # Replace with your actual module path
Now, we need to install the client-go library. This will pull in all the necessary packages for interacting with the Kubernetes API.
go get k8s.io/client-go@kubernetes-1.29.0 # Use the Kubernetes version you are targeting
(Note: Replace kubernetes-1.29.0 with the version of Kubernetes your cluster is running or the version you intend to target. It's generally a good practice to match client-go versions closely with your Kubernetes cluster version for optimal compatibility).
Our project structure might look something like this:
my-custom-resource-controller/
├── main.go
├── pkg/
│ └── apis/
│ └── myapp/
│ └── v1/
│ ├── doc.go
│ ├── types.go
│ └── register.go
├── hack/
│ └── update-codegen.sh # For generating client code
├── go.mod
├── go.sum
└── Dockerfile # (Optional) For containerizing the controller
main.go: This will be the entry point of our controller, responsible for setting up the Kubernetes client, starting informers, and running the main reconciliation loop.pkg/apis/myapp/v1/: This directory will house the Go type definitions for our Custom Resource. We'll define thestructthat represents our custom resource'sspecandstatushere.hack/update-codegen.sh: This script will be used to generateclient-gospecific code (clientsets, informers, listers) for our Custom Resources. This automation saves us from writing a huge amount of boilerplate code and ensures type safety.go.mod,go.sum: Managed by Go modules, these files track our project's dependencies.
This structured approach not only helps in organizing code but also aligns with the conventions often seen in official Kubernetes projects and operators, making it easier for others to understand and contribute to your work. With this setup complete, we are now ready to define our Custom Resource and begin the fascinating process of generating the Go types that will allow us to interact with it programmatically through the Kubernetes API.
Defining Your Custom Resource and Generating Go Types
To watch for changes to a Custom Resource, our Go application needs to understand its structure. This means defining Go structs that mirror the spec and status fields of our Custom Resource. While we could use the dynamic client to work with unstructured map[string]interface{} types, using typed clients provides compile-time safety and a much more pleasant development experience. This involves two primary steps: defining the Custom Resource Definition (CRD) YAML and then generating the corresponding Go types and client code using client-go's codegen tools.
Step 1: Define the Custom Resource Definition (CRD)
Let's imagine we're building a controller to manage a custom application deployment. We'll define a MyApp custom resource. Create a file named config/crd/bases/myapp.example.com_myapps.yaml (the directory structure is convention for controller-runtime but good practice):
# config/crd/bases/myapp.example.com_myapps.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: myapps.myapp.example.com
spec:
group: myapp.example.com
names:
plural: myapps
singular: myapp
kind: MyApp
listKind: MyAppList
scope: Namespaced # Or Cluster
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
type: object
properties:
image:
type: string
description: The container image to deploy for the application.
replicas:
type: integer
minimum: 1
description: The number of desired application replicas.
port:
type: integer
minimum: 80
maximum: 65535
description: The port the application listens on.
required:
- image
- replicas
status:
type: object
properties:
availableReplicas:
type: integer
description: The number of available application replicas.
conditions:
type: array
items:
type: object
properties:
type:
type: string
status:
type: string
message:
type: string
lastTransitionTime:
type: string
format: date-time
This CRD defines a MyApp resource with a spec for image, replicas, and port, and a status to track availableReplicas and conditions. You would apply this CRD to your Kubernetes cluster using kubectl apply -f config/crd/bases/myapp.example.com_myapps.yaml.
Step 2: Define Go Types for Your Custom Resource
Now, let's create the Go structs that correspond to our MyApp CRD. These types need special tags for code generation. Create pkg/apis/myapp/v1/types.go:
// pkg/apis/myapp/v1/types.go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyApp is the Schema for the myapps API
type MyApp struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MyAppSpec `json:"spec,omitempty"`
Status MyAppStatus `json:"status,omitempty"`
}
// MyAppSpec defines the desired state of MyApp
type MyAppSpec struct {
Image string `json:"image"`
Replicas int32 `json:"replicas"`
Port int32 `json:"port"`
}
// MyAppStatus defines the observed state of MyApp
type MyAppStatus struct {
AvailableReplicas int32 `json:"availableReplicas"`
Conditions []metav1.Condition `json:"conditions,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyAppList contains a list of MyApp
type MyAppList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []MyApp `json:"items"`
}
Notice the special comments like +genclient and +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object. These are directives for the client-go code generation tools, telling them to generate a clientset for MyApp and to generate deep-copy methods for these types, which are crucial for safe concurrent access within Kubernetes controllers.
Also, create pkg/apis/myapp/v1/register.go to register our types with a Kubernetes Scheme, which is how Kubernetes maps Go types to API versions and kinds.
// pkg/apis/myapp/v1/register.go
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
const GroupName = "myapp.example.com"
const GroupVersion = "v1"
var (
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
AddToScheme = SchemeBuilder.AddToScheme
)
// SchemeGroupVersion is the group version used to register these objects.
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
// addKnownTypes adds the set of types defined in this package to the supplied scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&MyApp{},
&MyAppList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
And pkg/apis/myapp/doc.go:
// +k8s:deepcopy-gen=package
// +groupName=myapp.example.com
// Package v1 is the v1 version of the API.
package myapp
This doc.go file contains package-level directives for code generation.
Step 3: Generate Client Code
Now, for the magic! We'll use client-go's kube-codegen tools to generate the clientset, informers, and listers for our MyApp custom resource. First, get the codegen tools:
go get k8s.io/code-generator@kubernetes-1.29.0 # Match client-go version
Then, create a script hack/update-codegen.sh:
#!/bin/bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(
cd "${SCRIPT_ROOT}";
go mod download k8s.io/code-generator;
go mod verify;
echo "$(go env GOPATH)/pkg/mod/k8s.io/code-generator@$(go list -m -f '{{.Version}}' k8s.io/code-generator)"
)}
# generate the code with:
# --output-base because this script is executed from the root of the repo to generate the output for all apis
# --go-header-file path to boilerplate text file
bash "${CODEGEN_PKG}/generate-groups.sh" all \
github.com/your-username/my-custom-resource-controller/pkg/client github.com/your-username/my-custom-resource-controller/pkg/apis \
"myapp:v1" \
--output-base "$(dirname "${BASH_SOURCE[0]}")/../.." \
--go-header-file "${SCRIPT_ROOT}/hack/boilerplate.go.txt"
# generate deepcopy for all
bash "${CODEGEN_PKG}/generate-groups.sh" deepcopy \
github.com/your-username/my-custom-resource-controller/pkg/client github.com/your-username/my-custom-resource-controller/pkg/apis \
"myapp:v1" \
--output-base "$(dirname "${BASH_SOURCE[0]}")/../.." \
--go-header-file "${SCRIPT_ROOT}/hack/boilerplate.go.txt"
# Add boilerplate file
# hack/boilerplate.go.txt:
# // +build !ignore_autogenerated
# // Code generated by controller-gen. DO NOT EDIT.
Replace github.com/your-username/my-custom-resource-controller with your actual module path. Create an empty hack/boilerplate.go.txt or add a standard license header.
Now, run the script:
chmod +x hack/update-codegen.sh
./hack/update-codegen.sh
This command will generate several new directories and files under pkg/client (or similar paths based on your --output-base and module structure), including: * pkg/client/clientset/versioned: Contains your typed clientset. * pkg/client/informers/externalversions: Contains informers for your types. * pkg/client/listers/myapp/v1: Contains listers for your types.
You'll also find deep-copy methods generated within pkg/apis/myapp/v1/zz_generated.deepcopy.go. These generated files are critical; they provide the strongly typed API to interact with your MyApp resources, making it possible to create, update, delete, and, most importantly, watch them with type safety and efficiency. This setup allows our Go controller to understand and manipulate MyApp objects just as natively as it would a Pod or Deployment.
Connecting to the Kubernetes API Server
Before we can start watching resources, our Go application needs to establish a connection to the Kubernetes API server. client-go provides convenient ways to achieve this, whether your controller is running inside a Kubernetes cluster or externally (e.g., during local development).
The core component for establishing this connection is rest.Config from k8s.io/client-go/rest. This struct holds all the necessary configuration parameters, such as the API server host, authentication credentials (e.g., service account token, kubeconfig path), and TLS configuration.
In-Cluster Configuration
When your controller runs as a Pod within a Kubernetes cluster, it typically uses a Service Account. Kubernetes automatically injects the Service Account's token and API server information (via environment variables and a mounted secret) into the Pod. client-go can detect and use this configuration automatically with rest.InClusterConfig(). This is the most secure and recommended way for production deployments.
package main
import (
"log"
"k8s.io/client-go/rest"
)
func main() {
// Creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
log.Fatalf("Error building in-cluster config: %v", err)
}
// Now you have a *rest.Config that can be used to create clientsets.
log.Printf("Successfully obtained in-cluster configuration.")
// ... proceed to create clients and informers
}
Out-of-Cluster Configuration (for Local Development)
During local development or when running your controller outside the cluster, you'll usually want to connect using your kubeconfig file. This file contains authentication details for your various Kubernetes clusters. client-go provides clientcmd.BuildConfigFromFlags to load this configuration.
package main
import (
"flag"
"log"
"os"
"path/filepath"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
var kubeconfig *string
if home := homeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
// Use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
log.Printf("Successfully obtained kubeconfig configuration.")
// ... proceed to create clients and informers
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // Windows
}
Creating Clientsets
Once you have a *rest.Config, you can create specific clients for interacting with Kubernetes resources. For our custom resource MyApp, we'll use the generated clientset.
package main
import (
"flag"
"log"
"os"
"path/filepath"
"k8s.io/client-go/kubernetes" // For built-in resources
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
// Import our generated clientset
myclientset "github.com/your-username/my-custom-resource-controller/pkg/client/clientset/versioned"
// Ensure you replace this with your actual module path
)
func main() {
// ... (kubeconfig or in-cluster config loading as above) ...
var config *rest.Config
var err error
if home := homeDir(); home != "" { // Assume out-of-cluster for example
kubeconfig := flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
flag.Parse()
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
log.Fatalf("Error building kubeconfig or in-cluster config: %v", err)
}
// Create a standard Kubernetes clientset for built-in resources (e.g., Pods, Deployments)
kubeClientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating kubernetes clientset: %v", err)
}
// Create a clientset for our custom MyApp resource
myAppClientset, err := myclientset.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating myApp clientset: %v", err)
}
log.Printf("Successfully connected to Kubernetes API server. KubeClientset: %p, MyAppClientset: %p", kubeClientset, myAppClientset)
// Now we have clientsets ready to interact with the API.
}
This main function now successfully connects to the Kubernetes API server and initializes two critical clientsets: kubeClientset for standard resources and myAppClientset for our custom MyApp resources. These clientsets provide the programmatic interface to perform CRUD operations on resources, but more importantly for our goal, they are the foundation upon which we build informers to efficiently watch for changes. Establishing this reliable connection is the gateway to all subsequent interactions with the Kubernetes ecosystem, enabling our controller to become an active participant in managing the cluster's state.
APIPark is a high-performance AI gateway that allows you to securely access the most comprehensive LLM APIs globally on the APIPark platform, including OpenAI, Anthropic, Mistral, Llama2, Google Gemini, and more.Try APIPark now! 👇👇👇
The Power of Informers: Caching and Event-Driven Watching
Directly watching Kubernetes resources by making continuous API calls (GET requests in a loop) is inefficient and places a heavy burden on the API server. It also makes it difficult to maintain a consistent local view of the cluster state. This is where client-go's informers come to the rescue, offering a robust and highly efficient mechanism for observing resource changes. Informers are the cornerstone of any well-behaved Kubernetes controller, providing a shared, cached, and event-driven way to interact with the Kubernetes API.
What are Informers?
An informer acts as a local cache of Kubernetes resources and notifies your controller about changes (additions, updates, deletions) to those resources. Instead of polling the API server, an informer establishes a long-lived API watch connection. When a change occurs, the API server pushes an event to the informer, which then updates its local cache and dispatches the event to registered handlers in your controller.
The lifecycle of an informer involves several key stages:
- Initial List: When an informer starts, it first performs a
LISToperation against the Kubernetes API server to retrieve all existing resources of a specific type. This populates its initial local cache. - Continuous Watch: After the initial list, the informer establishes a
WATCHconnection. The API server then sendsADDED,MODIFIED, orDELETEDevents for any changes to that resource type. - Local Cache Update: Upon receiving an event, the informer updates its internal cache (often implemented using an
Indexer). - Event Notification: Finally, the informer invokes registered event handlers in your controller, passing along the object that changed.
This design offers significant advantages:
- Reduced API Server Load: Instead of multiple controllers making separate
LISTandWATCHcalls, a single shared informer can serve many controllers. The initialLISTis expensive, but the subsequentWATCHevents are much lighter. - Performance and Latency: By maintaining a local cache, controllers can retrieve resource objects almost instantly without making network calls, drastically improving performance. Event-driven notifications ensure low latency in reacting to changes.
- Consistency: The informer ensures that the controller always has a consistent view of the resource state, even if events arrive out of order or if the API server's internal state lags slightly.
- Resource Versioning: Informers handle Kubernetes'
resourceVersionmechanism transparently, ensuring that watches are restarted correctly if the API server indicates the watch has become stale.
SharedInformerFactory: Managing Multiple Informers
In a typical controller, you might need to watch multiple types of resources (e.g., your MyApp custom resource, and also standard Kubernetes Deployments and Services that your MyApp controller manages). Creating and managing individual informers for each resource type can become cumbersome. This is where SharedInformerFactory (k8s.io/client-go/informers) comes in.
A SharedInformerFactory is a factory that can create and start informers for multiple resource types, all sharing a single underlying API connection where possible and a consistent local cache. It ensures that only one informer instance exists for a given resource type within the factory, even if multiple parts of your application request it. This further optimizes API server usage and simplifies informer management.
package main
import (
"context"
"flag"
"log"
"os"
"path/filepath"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/informers" // Standard informers
myclientset "github.com/your-username/my-custom-resource-controller/pkg/client/clientset/versioned"
myinformers "github.com/your-username/my-custom-resource-controller/pkg/client/informers/externalversions" // Our custom informers
// Ensure you replace this with your actual module path
)
func main() {
var config *rest.Config
var err error
// (kubeconfig or in-cluster config loading as above)
if home := homeDir(); home != "" {
kubeconfig := flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
flag.Parse()
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
log.Fatalf("Error building kubeconfig or in-cluster config: %v", err)
}
// Create a standard Kubernetes clientset
kubeClientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating kubernetes clientset: %v", err)
}
// Create a clientset for our custom MyApp resource
myAppClientset, err := myclientset.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating myApp clientset: %v", err)
}
// Create a context for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// --- Standard Informer Factory (for built-in resources) ---
// default resync period is 10 minutes, means informer will re-list all objects every 10 min
// even if no watch events occur. This helps recover from missed events.
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClientset, time.Minute*10)
// Get an informer for Deployments
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
deploymentInformer.Informer().AddEventHandler(
// Define event handlers for ADD, UPDATE, DELETE
// We'll elaborate on this in the next section
// For now, just a placeholder
nil,
)
// --- Custom Resource Informer Factory (for MyApps) ---
myAppInformerFactory := myinformers.NewSharedInformerFactory(myAppClientset, time.Minute*10)
// Get an informer for MyApps (using our generated informer code)
myAppInformer := myAppInformerFactory.Myapp().V1().MyApps()
myAppInformer.Informer().AddEventHandler(
// Placeholder event handler
nil,
)
// Start all informers in the factory. They will perform initial LIST and then WATCH.
go kubeInformerFactory.Start(ctx.Done())
go myAppInformerFactory.Start(ctx.Done())
// Wait for all caches to be synced. This is crucial:
// your controller should not process events until its cache is consistent.
if !kubeInformerFactory.WaitForCacheSync(ctx.Done()) {
log.Fatalf("Error syncing kube informer caches")
}
if !myAppInformerFactory.WaitForCacheSync(ctx.Done()) {
log.Fatalf("Error syncing myApp informer caches")
}
log.Println("Informers started and caches synced. Controller ready to process events.")
// ... continue with controller's main loop or reconciliation
select {} // Block forever to keep the program running
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // Windows
}
This extended main function now demonstrates how to instantiate both a standard SharedInformerFactory (for Deployments) and our custom MyApp SharedInformerFactory. Crucially, after creating the informers, we call Start(ctx.Done()) on each factory. This kicks off the listing and watching processes in separate goroutines. The subsequent WaitForCacheSync(ctx.Done()) call is paramount; it blocks until all the informers in the factory have populated their caches with the initial LIST data, ensuring that our controller doesn't try to process events or retrieve objects from an incomplete cache. This methodical approach ensures that our controller starts with a consistent view of the cluster, ready to react to changes.
Here's a table summarizing the key components and their roles in the informer lifecycle:
| Component | Description | Purpose |
|---|---|---|
rest.Config |
Configuration for connecting to the Kubernetes API server (e.g., host, authentication, TLS). | Establishes the secure and authenticated channel for all API interactions. |
Clientset (Typed) |
Strongly-typed client for CRUD operations on specific Kubernetes resource types (e.g., kubernetes.Clientset for built-in, myclientset.Clientset for CRDs). |
Provides a programmatic interface to interact with resources, used internally by informers for LIST and WATCH. |
SharedInformerFactory |
A factory that creates and manages informers for multiple resource types, ensuring a single instance per type and shared caching. | Optimizes API server load, simplifies informer management, and provides a consistent view across different resource types. |
Informer |
A mechanism that maintains a local, up-to-date cache of Kubernetes resources and dispatches events (Add, Update, Delete) upon changes. |
Decouples event observation from reconciliation logic, provides a low-latency, cached view of resources, and reduces API server stress. |
Indexer |
An internal component of the informer that stores cached objects and allows retrieval by key (namespace/name) and by arbitrary indices (e.g., by label selector, owner reference). | Enables efficient lookup of cached objects and supports complex query patterns, crucial for reconciliation logic. |
Lister |
A read-only interface to the informer's cache (Indexer). It allows controllers to retrieve objects from the local cache without hitting the API server. |
Provides fast, local access to resource data, essential for comparing actual state with desired state in reconciliation loops. |
AddEventHandler |
Method to register callback functions (AddFunc, UpdateFunc, DeleteFunc) that are invoked when corresponding events occur for the watched resource. |
The primary mechanism for the controller to be notified of changes and trigger its reconciliation logic. |
ctx.Done() |
A Go channel that closes when the associated context.Context is cancelled, signaling to Start() and WaitForCacheSync() that they should gracefully shut down. |
Enables proper shutdown of informers and other long-running goroutines, critical for controller lifecycle management. |
WaitForCacheSync() |
Blocks until all informers within the factory have completed their initial LIST operations and populated their caches. |
Ensures that the controller operates on a complete and consistent view of the cluster state from the very beginning, preventing race conditions or processing events against an outdated cache. |
| Resync Period | The interval at which an informer will re-list all objects from the API server, even if no watch events have occurred. | A safety mechanism to recover from potential missed watch events and periodically reconcile the cache with the API server's authoritative state, preventing stale data. |
Understanding these components and their interactions is paramount to constructing robust and efficient Kubernetes controllers. The informer pattern is a powerful paradigm that underpins much of the automation within the Kubernetes ecosystem, making it possible to build reactive systems that effortlessly manage complex, dynamic states.
Handling Events with ResourceEventHandlerFuncs
With informers running and their caches synced, the next critical step is to actually react to the changes they detect. This is achieved by registering event handlers with the informer. client-go provides the ResourceEventHandlerFuncs struct (k8s.io/client-go/tools/cache) for this purpose, allowing you to define callback functions for Add, Update, and Delete events.
When an informer detects a change in a watched resource, it invokes the corresponding handler function you've registered, passing the affected resource object(s) as arguments. These handlers are where your controller's custom logic begins to respond to the cluster's evolving state.
Let's define our event handlers for the MyApp custom resource. In a real-world controller, these handlers wouldn't contain the full reconciliation logic directly. Instead, they would typically push the changed object's key (namespace/name) onto a workqueue for asynchronous processing. This separation is a crucial pattern for building resilient and performant controllers, which we'll explore in the next section. For now, we'll demonstrate simple log messages within the handlers to illustrate their invocation.
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"time"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache" // Required for ResourceEventHandlerFuncs
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue" // Will use this later, but import now
myclientset "github.com/your-username/my-custom-resource-controller/pkg/client/clientset/versioned"
myinformers "github.com/your-username/my-custom-resource-controller/pkg/client/informers/externalversions"
myappv1 "github.com/your-username/my-custom-resource-controller/pkg/apis/myapp/v1" // Import our types
// Ensure you replace this with your actual module path
)
// MyAppController demonstrates watching Custom Resources
type MyAppController struct {
kubeClientset kubernetes.Interface
myAppClientset myclientset.Interface
myAppInformer cache.SharedIndexInformer
workqueue workqueue.RateLimitingInterface // Will be used in the next section
}
// NewMyAppController creates a new instance of MyAppController
func NewMyAppController(
kubeClientset kubernetes.Interface,
myAppClientset myclientset.Interface,
myAppInformer cache.SharedIndexInformer,
workqueue workqueue.RateLimitingInterface,
) *MyAppController {
return &MyAppController{
kubeClientset: kubeClientset,
myAppClientset: myAppClientset,
myAppInformer: myAppInformer,
workqueue: workqueue,
}
}
// Run starts the controller's main logic
func (c *MyAppController) Run(ctx context.Context) error {
defer c.workqueue.ShutDown() // Ensure workqueue is shut down when context is cancelled
log.Println("Starting MyApp controller")
// Register event handlers for MyApp informer
c.myAppInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.handleAddMyApp,
UpdateFunc: c.handleUpdateMyApp,
DeleteFunc: c.handleDeleteMyApp,
})
// Start the informer and wait for cache sync (handled in main)
// In a real controller, you might start the worker goroutines here.
// Block until the context is cancelled
<-ctx.Done()
log.Println("Shutting down MyApp controller")
return nil
}
// handleAddMyApp processes an added MyApp resource
func (c *MyAppController) handleAddMyApp(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Printf("Error getting key for added MyApp: %v", err)
return
}
log.Printf("Added MyApp: %s", key)
// In a real controller, we'd add the key to the workqueue here.
// c.workqueue.Add(key)
}
// handleUpdateMyApp processes an updated MyApp resource
func (c *MyAppController) handleUpdateMyApp(oldObj, newObj interface{}) {
oldMyApp := oldObj.(*myappv1.MyApp)
newMyApp := newObj.(*myappv1.MyApp)
// We only care about actual spec changes to trigger a reconciliation
if oldMyApp.ResourceVersion == newMyApp.ResourceVersion {
// If the ResourceVersion is the same, this is likely a periodic resync
// or a non-spec change we don't need to act on immediately.
// log.Printf("No spec change for MyApp %s/%s. Skipping update trigger.", newMyApp.Namespace, newMyApp.Name)
return
}
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err != nil {
log.Printf("Error getting key for updated MyApp: %v", err)
return
}
log.Printf("Updated MyApp: %s (Image: %s -> %s, Replicas: %d -> %d)",
key, oldMyApp.Spec.Image, newMyApp.Spec.Image, oldMyApp.Spec.Replicas, newMyApp.Spec.Replicas)
// c.workqueue.Add(key)
}
// handleDeleteMyApp processes a deleted MyApp resource
func (c *MyAppController) handleDeleteMyApp(obj interface{}) {
// Kubernetes often sends a "tombstone" object for deleted items if the watch connection
// breaks and then restarts. We need to handle this.
deletedObj, ok := obj.(cache.DeletedFinalStateUnknown)
if ok {
obj = deletedObj.Obj
}
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
log.Printf("Error getting key for deleted MyApp: %v", err)
return
}
log.Printf("Deleted MyApp: %s", key)
// c.workqueue.Add(key) // We might want to reconcile even on deletion to clean up associated resources
}
func main() {
var config *rest.Config
var err error
if home := homeDir(); home != "" {
kubeconfig := flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
flag.Parse()
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
log.Fatalf("Error building kubeconfig or in-cluster config: %v", err)
}
kubeClientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating kubernetes clientset: %v", err)
}
myAppClientset, err := myclientset.NewForConfig(config)
if err != nil {
log.Fatalf("Error creating myApp clientset: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
myAppInformerFactory := myinformers.NewSharedInformerFactory(myAppClientset, time.Minute*10)
myAppInformer := myAppInformerFactory.Myapp().V1().MyApps()
// Initialize a dummy workqueue for now; will be replaced with a real one
// in the next section.
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
controller := NewMyAppController(kubeClientset, myAppClientset, myAppInformer.Informer(), queue)
go myAppInformerFactory.Start(ctx.Done())
if !myAppInformerFactory.WaitForCacheSync(ctx.Done()) {
log.Fatalf("Error syncing myApp informer caches")
}
log.Println("Informers started and caches synced. Controller ready to process events.")
// Run the controller's event handling logic
if err := controller.Run(ctx); err != nil {
log.Fatalf("Error running controller: %v", err)
}
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // Windows
}
In this enhanced main.go, we've introduced a MyAppController struct to encapsulate our controller's dependencies and logic. The Run method orchestrates the registration of our handleAddMyApp, handleUpdateMyApp, and handleDeleteMyApp functions as event handlers. These functions receive the object that triggered the event. For UpdateFunc, it receives both the old and new states of the object, allowing for comparison to detect specific changes (e.g., in the spec). Notice how cache.MetaNamespaceKeyFunc(obj) is used to extract a unique string key (e.g., namespace/name) for the object; this key is vital for referencing objects in the workqueue and the informer's cache.
While the current handler implementations simply log messages, this setup forms the crucial "trigger" mechanism for our controller. Any time a MyApp resource is created, modified, or deleted, the corresponding handler is invoked, making our controller reactive to the dynamic state of our custom resources within Kubernetes. This reactive capability, fueled by the efficient informer pattern, is what empowers controllers to automate complex operational tasks and maintain desired application states effortlessly.
Workqueues: Decoupling Event Handling from Reconciliation
Directly executing complex reconciliation logic within an informer's event handlers is a significant anti-pattern in controller development. Informer callbacks run synchronously and can block the informer's event stream, leading to missed events, stale caches, and overall controller instability. To address this, client-go introduces the concept of workqueue.RateLimitingInterface (often just called a "workqueue").
A workqueue acts as a buffer and a processing pipeline, effectively decoupling the rapid-fire event notifications from the potentially time-consuming and error-prone reconciliation process. When an informer's event handler is triggered, instead of performing the reconciliation immediately, it simply adds the unique key of the changed object (e.g., namespace/name) to the workqueue. A separate set of worker goroutines then concurrently pull items from this queue, perform the actual reconciliation, and handle retries if necessary.
The benefits of using a workqueue are numerous:
- Asynchronous Processing: Event handlers can quickly add items to the queue and return, allowing the informer to continue processing the event stream without interruption.
- Concurrency Control: You can run multiple worker goroutines against a single workqueue, enabling parallel processing of reconciliation requests.
- Idempotency: Reconciliation logic should be idempotent, meaning applying it multiple times yields the same result as applying it once. Workqueues help reinforce this by potentially re-adding items for retry.
- Error Handling and Retries: Workqueues provide built-in mechanisms for rate-limiting retries. If a reconciliation fails, the item can be re-added to the queue with a backoff delay, preventing tight loops of failure and giving the system time to recover.
- Deduping: If multiple events for the same object occur in quick succession, the workqueue automatically de-duplicates them, ensuring the reconciliation for that object is only processed once.
- Backpressure: If reconciliation is slow, the workqueue can build up, but it won't block the informer.
Integrating Workqueue into Our Controller
Let's modify our MyAppController to use a workqueue for processing MyApp resource changes.
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"path/filepath"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.sio/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue" // The workqueue package
"k8s.io/klog/v2" // For structured logging
myclientset "github.com/your-username/my-custom-resource-controller/pkg/client/clientset/versioned"
myinformers "github.com/your-username/my-custom-resource-controller/pkg/client/informers/externalversions"
myappv1 "github.com/your-username/my-custom-resource-controller/pkg/apis/myapp/v1"
)
// MyAppController demonstrates watching Custom Resources
type MyAppController struct {
kubeClientset kubernetes.Interface
myAppClientset myclientset.Interface
myAppInformer cache.SharedIndexInformer
myAppLister *myclientset.MyappV1().MyApps().Lister // A typed lister for our custom resource
workqueue workqueue.RateLimitingInterface
}
// NewMyAppController creates a new instance of MyAppController
func NewMyAppController(
kubeClientset kubernetes.Interface,
myAppClientset myclientset.Interface,
myAppInformer cache.SharedIndexInformer,
myAppLister myclientset.MyappV1().MyApps().Lister, // Pass the lister
workqueue workqueue.RateLimitingInterface,
) *MyAppController {
c := &MyAppController{
kubeClientset: kubeClientset,
myAppClientset: myAppClientset,
myAppInformer: myAppInformer,
myAppLister: myAppLister,
workqueue: workqueue,
}
// Register event handlers
myAppInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueMyApp,
UpdateFunc: func(oldObj, newObj interface{}) { c.enqueueMyApp(newObj) },
DeleteFunc: c.enqueueMyApp, // For cleanup if the resource is deleted
})
return c
}
// enqueueMyApp adds the object's key to the workqueue.
func (c *MyAppController) enqueueMyApp(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
klog.Errorf("Error getting key for object: %v", err)
return
}
c.workqueue.Add(key)
}
// Run starts the controller's main logic
func (c *MyAppController) Run(ctx context.Context, workers int) error {
defer c.workqueue.ShutDown()
klog.Info("Starting MyApp controller")
// Wait for caches to be synced (handled in main function now)
// We only start workers after the caches are synced
for i := 0; i < workers; i++ {
go c.runWorker(ctx)
}
klog.Info("Started workers")
<-ctx.Done()
klog.Info("Shutting down MyApp controller workers")
return nil
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the workqueue.
func (c *MyAppController) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
// processNextWorkItem retrieves the next item from the workqueue and invokes the reconcile function.
func (c *MyAppController) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead calling
// EnqueueRateLimited to re-queue the item with a delay.
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually a string, we cannot get
// an object and thus just forget it.
c.workqueue.Forget(obj)
klog.Errorf("Expected string in workqueue but got %#v", obj)
return true
}
// Run the reconcile logic for the MyApp resource specified by the key.
if err := c.reconcile(ctx, key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
klog.Errorf("Error reconciling MyApp '%s': %v, requeueing...", key, err)
return true
}
// If no error occurs we Forget this item so it's not re-queued again.
c.workqueue.Forget(obj)
klog.Infof("Successfully reconciled MyApp '%s'", key)
return true
}
// reconcile is the main reconciliation logic for the MyApp controller.
// It fetches the latest state of the MyApp resource and ensures the desired state.
func (c *MyAppController) reconcile(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
klog.Errorf("invalid resource key: %s", key)
return nil // Don't requeue, invalid key
}
// Get the MyApp resource from the informer's cache.
// Using the lister is crucial here, as it fetches from the local cache.
myApp, err := c.myAppLister.MyApps(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
klog.Infof("MyApp '%s' in namespace '%s' no longer exists. Performing cleanup if necessary.", name, namespace)
// If the resource is deleted, perform cleanup of associated K8s resources.
// Example: Delete the Deployment and Service associated with this MyApp.
// This is where APIPark could be mentioned naturally. If the MyApp controller
// manages resources that interact with various external APIs, this cleanup
// might involve interacting with those APIs as well.
return c.cleanupExternalResources(ctx, namespace, name)
}
// For other errors, requeue the item.
return fmt.Errorf("error getting MyApp '%s' from lister: %w", key, err)
}
// Here's where your core reconciliation logic goes.
// Compare myApp.Spec with the actual state of Kubernetes resources (e.g., Deployments, Services).
// For instance, create/update a Deployment based on myApp.Spec.Image and myApp.Spec.Replicas.
// Then, create/update a Service exposing myApp.Spec.Port.
// Example: Log the desired state
klog.Infof("Reconciling MyApp '%s/%s': Image=%s, Replicas=%d, Port=%d",
myApp.Namespace, myApp.Name, myApp.Spec.Image, myApp.Spec.Replicas, myApp.Spec.Port)
// In a real controller, you would:
// 1. Get existing Deployment/Service (using kubeClientset and kubeInformerFactory.Apps().V1().Deployments().Lister())
// 2. Compare with desired state from myApp.Spec
// 3. Create/Update/Delete necessary resources (Deployment, Service) using kubeClientset
// 4. Update myApp.Status to reflect the actual state (e.g., availableReplicas)
// Example of updating status:
// myAppCopy := myApp.DeepCopy()
// myAppCopy.Status.AvailableReplicas = 1 // Or actual number from Deployment status
// _, err = c.myAppClientset.MyappV1().MyApps(myAppCopy.Namespace).UpdateStatus(ctx, myAppCopy, metav1.UpdateOptions{})
// if err != nil {
// return fmt.Errorf("failed to update MyApp status: %w", err)
// }
// For demonstration, let's just update the status to show it's "Ready" if it's not already
if !isMyAppReady(myApp) {
klog.Infof("Setting MyApp '%s/%s' status to Ready", myApp.Namespace, myApp.Name)
myAppCopy := myApp.DeepCopy()
myAppCopy.Status.AvailableReplicas = myAppCopy.Spec.Replicas // Assuming it's ready for demo
myAppCopy.Status.Conditions = []metav1.Condition{
{
Type: "Ready",
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: "Application is ready",
Reason: "DeploymentCreated",
},
}
_, err = c.myAppClientset.MyappV1().MyApps(myAppCopy.Namespace).UpdateStatus(ctx, myAppCopy, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update MyApp status: %w", err)
}
}
return nil // Reconciliation successful
}
// isMyAppReady checks if the MyApp's status indicates it's ready
func isMyAppReady(myApp *myappv1.MyApp) bool {
for _, condition := range myApp.Status.Conditions {
if condition.Type == "Ready" && condition.Status == metav1.ConditionTrue {
return true
}
}
return false
}
// cleanupExternalResources handles cleanup logic when a MyApp is deleted.
// This is a placeholder for actual cleanup logic.
func (c *MyAppController) cleanupExternalResources(ctx context.Context, namespace, name string) error {
klog.Infof("Performing cleanup for deleted MyApp '%s/%s'", namespace, name)
// Example: Delete any associated Deployments, Services, etc.
// For a comprehensive solution, one might consider how this controller's actions, or its managed
// resources, interact with external services via their respective APIs. For instance, if the
// MyApp involved provisioning resources in a cloud provider or integrating with a third-party
// platform, the cleanup logic would involve making API calls to those external systems.
// Managing and standardizing access to such diverse external APIs, from traditional REST
// services to specialized AI models, can be simplified by leveraging an API gateway like
// [APIPark](https://apipark.com/). APIPark provides a unified platform for managing,
// integrating, and deploying various API services, ensuring consistency and ease of maintenance
// across different external integrations.
return nil
}
func main() {
klog.InitFlags(nil) // Initialize klog flags
flag.Set("logtostderr", "true") // Log to stderr by default
flag.Parse()
var config *rest.Config
var err error
// ... (kubeconfig or in-cluster config loading as before) ...
if home := homeDir(); home != "" {
kubeconfig := flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
flag.Parse()
config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
klog.Fatalf("Error building kubeconfig or in-cluster config: %v", err)
}
kubeClientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Error creating kubernetes clientset: %v", err)
}
myAppClientset, err := myclientset.NewForConfig(config)
if err != nil {
klog.Fatalf("Error creating myApp clientset: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
myAppInformerFactory := myinformers.NewSharedInformerFactory(myAppClientset, time.Minute*10)
myAppInformer := myAppInformerFactory.Myapp().V1().MyApps()
// Create a rate limiting workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// Initialize the controller with the lister from the informer
controller := NewMyAppController(
kubeClientset,
myAppClientset,
myAppInformer.Informer(),
myAppInformer.Lister(), // Pass the lister here
queue,
)
klog.Info("Starting informers")
go myAppInformerFactory.Start(ctx.Done())
klog.Info("Waiting for informer caches to sync")
if !myAppInformerFactory.WaitForCacheSync(ctx.Done()) {
klog.Fatalf("Error syncing myApp informer caches")
}
klog.Info("Informer caches synced")
// Run the controller's event handling and worker logic
const numWorkers = 2 // You can adjust the number of worker goroutines
if err := controller.Run(ctx, numWorkers); err != nil {
klog.Fatalf("Error running controller: %v", err)
}
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // Windows
}
In this significantly updated code:
- Workqueue Initialization: In
main, we create aworkqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()).DefaultControllerRateLimiter()provides exponential backoff for retries. enqueueMyAppHandler: Our informer event handlers (AddFunc,UpdateFunc,DeleteFunc) now simply callc.enqueueMyApp(obj), which extracts the object's key and adds it to theworkqueue. This is a very fast operation, preventing the informer from being blocked.runWorkerandprocessNextWorkItem: TheRunmethod now startsnumWorkersgoroutines, each executingrunWorker.runWorkercontinuously callsprocessNextWorkItem, which pulls an item (a string key) from theworkqueue.reconcileFunction: This is the core logic.- It fetches the
MyAppobject from the informer's local cache usingc.myAppLister.MyApps(namespace).Get(name). This is a key optimization: we avoid an API call for every reconciliation, instead relying on the always-up-to-date cache. - It handles the case where the
MyAppresource might no longer exist (errors.IsNotFound). This is crucial for cleanup operations when a resource is deleted. - It then contains the logic to compare the desired state (
myApp.Spec) with the actual state of related Kubernetes resources (e.g., Deployments, Services). In a real controller, this would involve usingkubeClientsetto create, update, or delete those resources. - It also demonstrates how to update the
Statusfield of theMyAppcustom resource itself usingc.myAppClientset.MyappV1().MyApps(myAppCopy.Namespace).UpdateStatus(...). Updating status is often done separately from spec changes to avoid race conditions. - APIPark Integration: In
cleanupExternalResources, a natural place arises to discuss managing external APIs. If ourMyAppcontroller managed integrations with external services (e.g., a payment gateway, a messaging service, or an AI prediction API), thereconcileloop or cleanup would involve making calls to these external APIs. An API gateway like APIPark could then be introduced as a valuable tool for centralizing the management, integration, and security of such diverse external API connections, providing a unified platform irrespective of whether those external services are traditional REST endpoints or advanced AI models. This avoids making the mention feel forced and connects it to the broader context of API management that a sophisticated controller might require.
- It fetches the
- Error Handling and Retries: If
reconcilereturns an error,processNextWorkItemcallsc.workqueue.AddRateLimited(key), which re-adds the item to the queue with a backoff. Ifreconcilesucceeds,c.workqueue.Forget(obj)is called, ensuring the item is not retried.
This architecture, combining informers with workqueues, represents the canonical pattern for building robust, scalable, and resilient Kubernetes controllers in Golang. It gracefully handles concurrency, error recovery, and API server load, making it the foundation for complex automation within the cloud-native ecosystem.
Advanced Considerations for Production-Ready Controllers
Building a basic controller is a great start, but creating a production-ready, highly available, and reliable Kubernetes operator requires attention to several advanced considerations. These practices ensure your controller can gracefully handle failures, scale effectively, and provide necessary insights into its operation.
Leader Election for High Availability
When deploying multiple replicas of your controller for high availability, you need a mechanism to ensure that only one instance is actively reconciling resources at any given time. Running multiple active controllers simultaneously would lead to race conditions, conflicting operations, and inconsistent states. This is solved through leader election.
Kubernetes provides built-in leader election mechanisms, typically implemented using a Lease object. client-go offers the leaderelection package (k8s.io/client-go/tools/leaderelection) to facilitate this. Multiple controller replicas attempt to acquire a lease. The one that succeeds becomes the leader and performs reconciliation, while others remain in a standby mode, ready to take over if the leader fails or disconnects.
Implementing leader election involves:
- Defining a Lease object in a specific namespace (e.g.,
kube-systemor your controller's namespace). - Using
leaderelection.NewLeaderElectorto configure the election process, including callbacks forOnStartedLeading,OnStoppedLeading, andOnNewLeader. - Running the leader elector in a separate goroutine.
- Ensuring your main reconciliation loop only runs when your controller is the leader.
// Example snippet for leader election integration (not a full runnable main)
package main
import (
// ... (imports) ...
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)
// ... (MyAppController struct, NewMyAppController, enqueueMyApp, reconcile, etc.) ...
func main() {
// ... (config, clientset, informer setup) ...
// Context for leader election and overall application lifecycle
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Unique ID for this controller instance (e.g., Pod name)
id, err := os.Hostname()
if err != nil {
klog.Fatalf("Error getting hostname: %v", err)
}
// The lock configuration for leader election
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: "my-app-controller-leader-lock", // Unique name for your lock
Namespace: "kube-system", // Or your controller's namespace
},
Client: kubeClientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second, // How long the lease is valid
RenewDeadline: 10 * time.Second, // How often the leader tries to renew
RetryPeriod: 2 * time.Second, // How often non-leaders retry
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Info("LEADER: Started leading, running controller")
// Start your informers and controller worker loops here
go myAppInformerFactory.Start(ctx.Done())
if !myAppInformerFactory.WaitForCacheSync(ctx.Done()) {
klog.Fatalf("LEADER: Error syncing myApp informer caches")
}
if err := controller.Run(ctx, 2); err != nil {
klog.Errorf("LEADER: Controller exited with error: %v", err)
}
},
OnStoppedLeading: func() {
klog.Info("LEADER: Stopped leading, controller shutting down")
cancel() // Signal overall shutdown if we lose leadership
},
OnNewLeader: func(identity string) {
if identity == id {
return // That's us!
}
klog.Infof("NEW LEADER: %s", identity)
},
},
ReleaseOnCancel: true, // Release lease when context is cancelled
Name: "my-app-controller",
})
if err != nil {
klog.Fatalf("Error creating leader elector: %v", err)
}
leaderElector.Run(ctx) // Blocks until context is cancelled or leadership stops
klog.Info("Controller main function exiting")
}
This snippet illustrates how leaderelection.NewLeaderElector is configured with callbacks. The OnStartedLeading callback is where your informers are started and your controller's main Run method (which starts workers) is invoked. This ensures that only the elected leader performs the actual work.
Contexts and Graceful Shutdown
Graceful shutdown is paramount for any long-running application, especially controllers that manage critical infrastructure. In Go, the context package (context.Context) is the idiomatic way to handle cancellation signals across goroutines.
Our controller uses context.WithCancel(context.Background()) to create a cancellable context. This context is then passed down to informers (via ctx.Done()) and worker goroutines. When cancel() is called (e.g., on SIGTERM or SIGINT, or if leadership is lost), all goroutines observing ctx.Done() can gracefully terminate, cleaning up resources like workqueues.
// In main.go:
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Ensure cancel is called on exit
// Handle OS signals for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // Capture Ctrl+C and Kubernetes termination signals
go func() {
<-sigChan // Block until a signal is received
klog.Info("OS signal received, initiating graceful shutdown...")
cancel() // Cancel the context
}()
// Pass ctx.Done() to informers
go myAppInformerFactory.Start(ctx.Done())
// Pass ctx to controller.Run()
if err := controller.Run(ctx, numWorkers); err != nil {
klog.Fatalf("Error running controller: %v", err)
}
Logging and Metrics
For troubleshooting and operational visibility, robust logging and metrics are indispensable.
- Logging: Use
klog/v2(which we integrated) for structured, Kubernetes-style logging. It supports various levels (Info, Warning, Error), verbosity flags, and output formats. Ensure your log messages are informative, including relevant object keys (namespace/name) and error details. - Metrics: Expose Prometheus metrics from your controller.
client-goprovides some out-of-the-box metrics for informers and workqueues. You can also instrument your reconciliation logic with custom metrics (e.g.,reconciliation_total,reconciliation_duration_seconds,errors_total) using client libraries likegithub.com/prometheus/client_golang/prometheus. This allows you to monitor your controller's health, performance, and workload in real-time.
// Example for exposing Prometheus metrics (in main.go)
import (
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Add this somewhere in your main function to start a metrics server
go func() {
http.Handle("/metrics", promhttp.Handler())
klog.Fatal(http.ListenAndServe(":8080", nil)) // Or another port
}()
klog.Info("Metrics server listening on :8080")
Testing Controllers
Testing Kubernetes controllers can be challenging due to their asynchronous and event-driven nature. Key strategies include:
- Unit Tests: Test individual components (e.g., reconciliation logic, helper functions) in isolation.
- Integration Tests: Test the interaction between your controller and a mock Kubernetes API server (e.g.,
k8s.io/client-go/kubernetes/fake) or a local KinD cluster. This allows you to simulate API events and verify your controller's reactions. - End-to-End Tests: Deploy your controller and CRDs to a real (test) Kubernetes cluster and verify its behavior with actual custom resources.
The controller-runtime project (from the Kubernetes SIGs) offers a envtest package that spins up a lightweight Kubernetes API server without a full kubelet or etcd, providing an excellent environment for integration tests.
By incorporating these advanced considerations, your Golang controller for Custom Resources moves beyond a proof-of-concept to a robust, fault-tolerant, and observable component ready for the demands of a production Kubernetes environment. This attention to detail is what differentiates resilient, self-managing systems from fragile, error-prone automation.
Higher-Level Frameworks: Controller-Runtime and Operator SDK
While client-go provides the fundamental building blocks for interacting with the Kubernetes API and building controllers, the boilerplate code for informers, workqueues, leader election, and metrics can become substantial. To simplify controller development and promote best practices, two prominent higher-level frameworks have emerged: controller-runtime and the Operator SDK.
These frameworks abstract away much of the low-level client-go machinery, allowing developers to focus more on the core reconciliation logic for their custom resources. They build upon client-go but provide a more structured and opinionated approach to controller development.
Controller-Runtime (sigs.k8s.io/controller-runtime)
controller-runtime is a set of libraries that provides higher-level APIs for building Kubernetes controllers. It's designed to be flexible and composable, offering a robust foundation for operators without dictating every aspect of their implementation. Key features include:
- Manager: A central component that coordinates multiple controllers, sharing informers, caches, and potentially a
rest.Configand client. It simplifies the setup and lifecycle management of all controllers within an application. ReconcilerInterface: Promotes a clear, singleReconcile(context.Context, request.Request)method for your controller logic. This method is called with a request containing theNamespacedNameof the object that needs reconciliation.BuilderAPI: Simplifies the process of creating controllers and wiring them to informers and event sources. It allows you to declare which resources your controller watches and how events from those resources (or related ones) should trigger reconciliation.- Caching and Clients: Automatically handles shared caches (informers) and provides a unified client that reads from the cache when possible and falls back to the API server when necessary.
- Leader Election and Webhooks: Built-in support for leader election and easy integration for admission webhooks (Mutating and Validating Webhooks), which allow you to intercept and modify/validate resource creations/updates.
- Metrics and Health Checks: Provides sensible defaults and easy extensibility for Prometheus metrics and liveness/readiness probes.
Using controller-runtime significantly reduces the amount of manual client-go setup. Instead of managing SharedInformerFactory and workqueue directly, you configure a Manager that handles these details. Your controller then implements a simple Reconciler interface, focusing solely on the "what to do" rather than the "how to watch and queue."
Operator SDK (operatorframework.io/operator-sdk)
The Operator SDK is a framework built on top of controller-runtime that provides tools and CLI commands to accelerate operator development. While controller-runtime is a library, Operator SDK is a complete development kit that helps with:
- Scaffolding: Generates a new operator project with a basic structure,
Dockerfile, CRD templates, and pre-configuredcontroller-runtimesetup. - Code Generation: Automates the generation of client code, informers, and listers for your custom resources, similar to what we did manually with
kube-codegen. - Lifecycle Management: Assists with packaging, deploying, and managing the lifecycle of your operator using Operator Lifecycle Manager (OLM).
- Testing: Provides utilities for testing operators locally and in CI/CD pipelines.
The Operator SDK is particularly valuable for new operator projects, as it sets up a robust foundation, adhering to best practices and providing a streamlined developer experience. If you are building a new operator from scratch, starting with the Operator SDK is often the most efficient path.
When to use which?
- Direct
client-go: If you need extremely fine-grained control over every aspect of API interaction, are integrating into an existing, non-Kubernetes Go application, or are developing a very specialized, minimalist tool that only performs a simple watch. It's great for learning the core mechanics, as we have done. controller-runtime: For building full-fledged Kubernetes controllers and operators where you want the benefits of a robust framework, but still desire a good level of control over your reconciliation logic and dependencies. It's the underlying library for Operator SDK.- Operator SDK: The best choice for starting new, production-grade Kubernetes operators. It bundles
controller-runtimewith a powerful CLI and scaffolding tools, significantly reducing initial setup time and promoting a consistent development workflow.
While this guide focused on the raw client-go approach to provide a deeper understanding of the underlying mechanisms, recognizing the existence and utility of controller-runtime and Operator SDK is crucial for any serious Kubernetes Go developer. They offer a powerful abstraction layer that allows you to build more sophisticated and maintainable controllers with less effort, allowing you to quickly focus on the unique domain logic that your custom resources define and the complex automation they enable.
Conclusion: Mastering Custom Resource Observation in Golang
The ability to watch for changes to Custom Resources is not merely a technical detail; it is the fundamental mechanism that unlocks the full potential of Kubernetes as an extensible and programmable platform. Through the meticulous dissection of client-go, we have journeyed from establishing a basic API connection to orchestrating complex, event-driven reconciliation loops. We've seen how CustomResourceDefinitions empower you to extend Kubernetes with your own domain-specific objects, transforming it into a control plane tailored to your application's unique needs.
The client-go library, with its robust informers, listers, and workqueues, provides the essential toolkit for building resilient and efficient controllers. Informers, acting as local, cached proxies for the Kubernetes API, drastically reduce server load and ensure low-latency responses to resource changes. Workqueues, in turn, provide a critical layer of decoupling, buffering events and enabling asynchronous, rate-limited, and retryable reconciliation. This architectural pattern, while initially appearing complex, is the cornerstone of scalable and fault-tolerant Kubernetes automation.
We've also touched upon critical considerations for moving from a basic implementation to a production-grade operator: embracing leader election for high availability, implementing graceful shutdown with Go contexts, and ensuring observability through comprehensive logging and metrics. Finally, we acknowledged the power of higher-level frameworks like controller-runtime and Operator SDK, which build upon client-go to further streamline operator development, making it even easier to implement the sophisticated automation that transforms Kubernetes into a truly self-managing system.
By mastering the art of watching for changes to Custom Resources in Golang, you gain the power to build controllers that are not just reactive but truly intelligent, capable of observing the subtle shifts in your cluster's state and orchestrating the precise actions required to maintain the desired operational reality. This skill is invaluable for anyone looking to extend, automate, and elevate their applications within the dynamic world of cloud-native infrastructure, truly making Kubernetes work for your specific requirements. Whether you're building a specialized microservice operator or an entire application platform, the principles and techniques outlined here will serve as your guiding light, empowering you to craft powerful, self-healing systems that effortlessly adapt to change.
Frequently Asked Questions (FAQ)
1. Why is it better to use informers and workqueues instead of directly polling the Kubernetes API or making direct CRUD calls in event handlers? Directly polling the Kubernetes API (GET requests in a loop) is inefficient, generates significant load on the API server, and can lead to stale data or missed events. Informers establish a long-lived WATCH connection, receiving push notifications for changes, which is far more efficient. They also maintain a local, in-memory cache, providing fast access to resource data without needing network calls. Direct CRUD (Create, Read, Update, Delete) calls in event handlers can block the informer's event stream, causing events to be missed and the cache to become stale. Workqueues decouple event handling from reconciliation. Event handlers quickly add object keys to a queue, allowing the informer to continue processing. Separate worker goroutines then pull from the queue, performing reconciliation asynchronously, with built-in rate-limiting and retry mechanisms for robustness. This architecture ensures responsiveness, scalability, and fault tolerance.
2. What are the key differences between a Clientset, a Lister, and an Informer? A Clientset (e.g., kubernetes.Clientset or our generated myclientset.Clientset) is used for direct API operations (Create, Get, Update, Delete) on Kubernetes resources. When you use a Clientset, you are making a network call to the Kubernetes API server. An Informer maintains a local, consistent cache of a specific resource type by continuously LISTing and WATCHing the API server. It dispatches event notifications when resources are added, updated, or deleted. It's the mechanism that keeps the local cache up-to-date and notifies your controller. A Lister is a read-only interface to an Informer's local cache. When you use a Lister (e.g., myAppInformer.Lister()), you retrieve objects from the local memory cache, not from the API server. This provides extremely fast access to resource data without network latency, which is crucial for the reconciliation loop to quickly compare desired and actual states.
3. What is the purpose of resourceVersion in Kubernetes objects, and how do informers handle it? resourceVersion is a string that represents the version of a Kubernetes object in the API server's database (etcd). Every time an object is modified, its resourceVersion is incremented. It's used to detect stale reads and ensure consistency. Informers transparently use resourceVersion for WATCH operations. When an informer establishes a WATCH connection, it specifies the resourceVersion from which it wants to start receiving events. If the API server detects that the watch has become stale (e.g., the requested resourceVersion is too old, indicating too many intermediate changes have occurred), it will respond with an error that causes the informer to restart its LIST and WATCH cycle from scratch, ensuring it always operates on a consistent and up-to-date view of the cluster state.
4. How does leader election prevent multiple instances of my controller from interfering with each other? Leader election is a mechanism that ensures only one instance of a replicated controller is "active" at any given time, performing the actual reconciliation work. When multiple controller replicas start, they all attempt to acquire a distributed lock (typically a Kubernetes Lease object). Only one instance can successfully acquire this lock and become the leader. The other instances become "followers" and remain idle, continuously trying to acquire the lock. If the leader fails (e.g., its Pod crashes), its lease expires, and one of the followers will successfully acquire the lock and take over leadership. This prevents race conditions, conflicting operations, and ensures that the controller's logic is applied consistently to your Custom Resources and other Kubernetes objects.
5. When should I consider using higher-level frameworks like controller-runtime or Operator SDK instead of pure client-go? You should consider controller-runtime or Operator SDK for most new, non-trivial Kubernetes controller or operator projects. * controller-runtime is beneficial when you want to reduce boilerplate code, benefit from structured patterns (like the Reconciler interface), and leverage built-in features for leader election, webhooks, and shared caches, while still maintaining flexibility. It's a powerful library if you want to integrate controller functionality into an existing Go application or have specific, complex needs that require deeper customization. * Operator SDK builds on controller-runtime and is ideal for starting a complete, production-grade operator from scratch. It provides scaffolding, code generation, and lifecycle management tools that accelerate development, enforce best practices, and simplify deployment and packaging (e.g., with Operator Lifecycle Manager). If your goal is to create a full-fledged operator for a specific application or service, the Operator SDK offers the most streamlined development experience. Pure client-go is best reserved for simple, highly specialized tools or for deep-dive learning into the core Kubernetes API interaction mechanics.
🚀You can securely and efficiently call the OpenAI API on APIPark in just two steps:
Step 1: Deploy the APIPark AI gateway in 5 minutes.
APIPark is developed based on Golang, offering strong product performance and low development and maintenance costs. You can deploy APIPark with a single command line.
curl -sSO https://download.apipark.com/install/quick-start.sh; bash quick-start.sh

In my experience, you can see the successful deployment interface within 5 to 10 minutes. Then, you can log in to APIPark using your account.

Step 2: Call the OpenAI API.
