How to Watch Custom Resource Changes in Golang
In the dynamic and ever-evolving landscape of cloud-native applications, Kubernetes has emerged as the de facto operating system for the data center. Its extensible architecture, powered by Custom Resource Definitions (CRDs), allows users to extend the Kubernetes API with their own resource types, transforming it into a highly specialized control plane tailored to specific domain needs. These custom resources (CRs) are fundamental to building sophisticated operators and controllers that manage complex applications, provision infrastructure, or automate operational tasks within the Kubernetes ecosystem. However, the true power of CRDs is unlocked when applications can react instantaneously to changes in these custom resources. For developers leveraging the robustness and performance of Golang, mastering the art of watching and responding to custom resource changes is not just a best practice; it is an indispensable skill for building resilient, self-healing, and intelligent cloud-native systems.
The ability to monitor, detect, and act upon modifications, additions, or deletions of custom resources is at the heart of the operator pattern. An operator, essentially a domain-specific controller, continuously observes the desired state of a custom resource and works to reconcile it with the actual state of the system. Without an efficient and reliable mechanism for watching these changes, an operator would be relegated to polling, a resource-intensive and often slow approach that undermines the reactive nature of Kubernetes. Golang, with its powerful client-go library, offers a sophisticated and production-ready toolkit for interacting with the Kubernetes API, including advanced patterns for watching resources. This article embarks on a comprehensive journey, delving deep into the mechanisms, best practices, and practical implementation details of watching custom resource changes in Golang. We will explore everything from the foundational concepts of Kubernetes extensibility to the intricate workings of client-go informers, equipping you with the knowledge to build powerful and responsive Kubernetes controllers.
Understanding Custom Resources in Kubernetes: Extending the Control Plane
Before diving into the specifics of watching changes, it's crucial to solidify our understanding of Custom Resources and their role within the Kubernetes architecture. Kubernetes, at its core, is a declarative system where users define their desired state, and the control plane works tirelessly to achieve and maintain that state. While Kubernetes provides a rich set of built-in resources like Pods, Deployments, Services, and StatefulSets, these are not always sufficient for expressing the full complexity of modern applications or infrastructure components. This is where Custom Resource Definitions (CRDs) come into play, offering a powerful mechanism to extend the Kubernetes API without modifying the core source code.
A Custom Resource Definition (CRD) is itself a Kubernetes API resource that defines a new kind of resource. When you create a CRD, you're essentially telling Kubernetes, "Hey, I'm introducing a new object type with this name, this version, and this schema." Once the CRD is registered with the API server, you can then create instances of that new resource type, which are known as Custom Resources (CRs). These CRs behave just like any other native Kubernetes object: you can create, update, delete, and watch them using kubectl or programmatically via the Kubernetes API. For example, if you're building a database operator, you might define a DatabaseCluster CRD, allowing users to declare their desired database configuration (e.g., number of replicas, storage size, database engine version) as a Kubernetes object. A corresponding controller would then watch for DatabaseCluster CRs and provision/manage the actual database instances accordingly. This declarative approach simplifies operations, provides a single control plane for diverse workloads, and promotes consistency.
The extensibility offered by CRDs has revolutionized how complex applications are deployed and managed on Kubernetes. Instead of writing external scripts or using traditional infrastructure-as-code tools that operate outside the Kubernetes context, developers can now model their application-specific concepts directly within Kubernetes. This brings numerous benefits, including leveraging Kubernetes' inherent capabilities for scaling, self-healing, and scheduling, as well as providing a consistent management experience for both native and custom workloads. The schema definition within a CRD is particularly important, as it enforces structural validation for the custom resources created from it, ensuring that users provide valid configurations. This not only prevents malformed resources from being created but also guides users in defining their desired state correctly. CRDs empower the Kubernetes api to be truly universal, capable of orchestrating virtually any type of workload or service, whether it's a simple web application or a sophisticated machine learning pipeline.
The Core Concept of Watching Resources: An Event-Driven Paradigm
The Kubernetes control plane operates on an event-driven paradigm, a fundamental design choice that underpins its reactivity and self-healing capabilities. Instead of constantly polling the state of every resource, Kubernetes clients, including controllers and operators, can "watch" resources and receive real-time notifications whenever a change occurs. This elegant mechanism is far more efficient and responsive than polling, as it eliminates unnecessary requests and allows clients to react instantly to changes in the desired or actual state of the cluster.
At its core, the Kubernetes API server exposes a watch endpoint for every resource type. When a client initiates a watch request for a specific resource (or all resources of a certain type), the API server establishes a persistent connection. Through this connection, the server streams a sequence of events to the client whenever an object matching the watch criteria is added, updated, or deleted. These events are the lifeblood of any Kubernetes controller, informing it of the current state and prompting it to take action.
There are three primary types of events that a watch stream can deliver:
- ADDED Event: This event signifies that a new resource has been created in the Kubernetes cluster that matches the watch criteria. For example, if you're watching
Deploymentresources, anADDEDevent would be triggered when a newDeploymentis successfully created. A controller would typically respond by initiating the provisioning of underlying resources, such as Pods, based on the new Deployment's specification. - MODIFIED Event: Also often referred to as an
UPDATEDevent, this indicates that an existing resource has been changed. This could be anything from a change in a Pod's image version, an update to a Service's port, or, most importantly for our discussion, an alteration to thespecorstatusof a Custom Resource. When a controller receives aMODIFIEDevent, it needs to compare the new state with its internal understanding of the desired state and reconcile any differences, perhaps by scaling up replicas or updating configurations. - DELETED Event: This event signals that a resource has been removed from the Kubernetes cluster. Upon receiving a
DELETEDevent for a resource it manages, a controller's responsibility is typically to clean up any associated resources that were created or managed as a result of that resource's existence. For instance, if aDatabaseClustercustom resource is deleted, its controller would tear down the corresponding database instances, storage volumes, and any related network configurations.
The watch mechanism is designed to be robust. Clients specify a resourceVersion in their watch request. This resourceVersion is a unique identifier that represents the state of a resource at a specific point in time. By including resourceVersion in a watch request, a client tells the API server, "Send me all events that occurred after this resourceVersion." This allows clients to pick up from where they left off after a disconnection, preventing missed events and ensuring eventual consistency. If a client's resourceVersion is too old or the watch connection breaks for an extended period, the API server might return a "resource too old" error. In such cases, the client typically needs to re-list all resources to get the current state and then restart the watch from the latest resourceVersion. This ensures that controllers always have an up-to-date view of the cluster state, making them highly resilient to transient network issues or API server restarts.
Golang Client-Go Library for Kubernetes Interaction
When developing Kubernetes controllers, operators, or any application that needs to interact with the Kubernetes API in Golang, the client-go library is the official and indispensable toolkit. It provides a robust, idiomatic, and production-ready way to communicate with the Kubernetes API server, handle authentication, manage resources, and, critically for our topic, watch for changes. client-go is not just a thin wrapper around HTTP calls; it implements many complex patterns necessary for reliable interaction with Kubernetes, such as retry mechanisms, rate limiting, and sophisticated caching with informers.
To begin using client-go in a Golang project, you typically add it as a dependency:
go get k8s.io/client-go@kubernetes-VERSION
Replace kubernetes-VERSION with the Kubernetes version your cluster is running on or the version you intend to develop against (e.g., v0.28.3). It's generally recommended to align your client-go version with your target Kubernetes API server version for compatibility.
Once installed, the first step is to configure how your application authenticates with the Kubernetes API server. client-go provides two primary methods for configuration:
In-cluster (Running inside Kubernetes): When your application (e.g., a controller or operator) runs as a Pod inside the Kubernetes cluster, it uses the service account credentials provided by Kubernetes. This is the recommended and most secure way for applications within the cluster to authenticate. client-go automatically detects and uses these in-cluster configurations.```go import ( "k8s.io/client-go/rest" "k8s.io/client-go/kubernetes" )func getClientInCluster() (*kubernetes.Clientset, error) { // Creates the in-cluster config config, err := rest.InClusterConfig() if err != nil { return nil, err }
// Creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
} ```
Out-of-cluster (Local Development): During local development, your application typically runs outside the Kubernetes cluster. In this scenario, client-go can read your kubeconfig file (usually located at ~/.kube/config). This file contains credentials and cluster connection details that kubectl also uses.```go import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/kubernetes" )func getClientOutOfCluster() (*kubernetes.Clientset, error) { // Path to your kubeconfig file kubeconfig := "/path/to/your/kubeconfig" // Or use clientcmd.NewDefaultClientConfigLoadingRules()
// Build config from kubeconfig file
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, err
}
// Create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
} ```
After obtaining a Clientset, you can interact with standard Kubernetes resources. The Clientset provides access to various API groups (e.g., CoreV1(), AppsV1(), RbacV1()). Each API group offers methods for operations like Get, List, Create, Update, Delete, and Watch on specific resource types. For example, to list all Pods in the default namespace:
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func listPods(clientset *kubernetes.Clientset) {
pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
if err != nil {
// Handle error
}
for _, pod := range pods.Items {
fmt.Printf("Pod Name: %s\n", pod.Name)
}
}
While direct Get, List, Create, Update, Delete operations are straightforward, directly using the low-level Watch API can be challenging, particularly for building robust, production-grade controllers. client-go introduces the Informer pattern to address these complexities, providing a higher-level, more efficient, and resilient way to watch resources, which we will explore in detail in the subsequent sections.
Watching Standard Resources with client-go: From Low-Level to Informers
When it comes to watching resources in Kubernetes using client-go, there are essentially two paths: the low-level Watch API and the higher-level, more sophisticated Informer pattern. While understanding the low-level Watch API provides foundational knowledge, the Informer pattern is the overwhelmingly preferred method for building production-ready controllers due to its efficiency, resilience, and convenience.
The Low-Level Watch API
The kubernetes.Interface (specifically clientset.CoreV1().Pods(), for example) exposes a Watch method that directly corresponds to the Kubernetes API server's watch endpoint. This method returns a watch.Interface, which provides a channel that streams watch.Event objects. Each watch.Event contains a Type (Added, Modified, Deleted, Error) and the Object itself.
Here's a simplified example of how one might use the low-level Watch API for Pods:
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/watch"
"log"
)
func watchPodsLowLevel() {
// ... (get clientset as shown in previous section)
// For demonstration, let's assume clientset is already initialized
watcher, err := clientset.CoreV1().Pods("default").Watch(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Fatalf("Error watching pods: %v", err)
}
defer watcher.Stop()
fmt.Println("Watching Pods in default namespace...")
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Added:
fmt.Printf("Pod Added: %s\n", event.Object.(*corev1.Pod).Name)
case watch.Modified:
fmt.Printf("Pod Modified: %s\n", event.Object.(*corev1.Pod).Name)
case watch.Deleted:
fmt.Printf("Pod Deleted: %s\n", event.Object.(*corev1.Pod).Name)
case watch.Error:
log.Printf("Watch error: %v", event.Object)
}
}
}
Challenges with Low-Level Watch
While seemingly straightforward, directly using the low-level Watch API in a production controller comes with several significant challenges:
- Initial State Synchronization: The
WatchAPI only provides events after the watch connection is established. To ensure a controller has a complete view of the cluster state, it must first perform aListoperation to retrieve all existing resources and then immediately start watching from theresourceVersionobtained from that list. This "List-then-Watch" pattern needs careful implementation to avoid race conditions where events might be missed between the list and the watch. - Disconnected Watches and Resynchronization: Network disruptions, API server restarts, or
resourceVersionstaleness can cause a watch connection to terminate. A robust controller must detect these disconnections, gracefully handle errors (e.g., "resource too old"), and re-establish the watch, potentially performing another fullListoperation to resynchronize its state. This retry logic, exponential backoff, and state reconciliation are complex to implement correctly. - Caching and Performance: Repeatedly fetching resources from the API server for every operation or event can be inefficient and put undue strain on the API server. For controllers that need to perform frequent lookups or maintain a consistent view of the cluster state, a local cache of resources is essential. The low-level
WatchAPI offers no built-in caching. - Event Buffering and Order Guarantees: The raw watch channel might not always guarantee event order if events are buffered or replayed. Handling out-of-order events or ensuring that processing occurs in a consistent sequence for a given resource requires additional logic.
- Rate Limiting: Kubernetes API servers have rate limits. A controller performing frequent
Listoperations or reconnecting aggressively after disconnections can hit these limits, impacting its stability and the overall cluster.
Introduction to the Informer Pattern: The Preferred Way
To address the complexities of the low-level Watch API, client-go provides the Informer pattern. An Informer is a higher-level abstraction that elegantly handles the List-then-Watch pattern, caching, resynchronization, and event delivery, making it the cornerstone of most Kubernetes controllers. It acts as a resilient, self-healing, and efficient proxy between your controller's business logic and the Kubernetes API server.
Why Informers are superior:
- Automatic List-then-Watch: Informers automatically perform the initial
Listto populate their cache and then establish aWatchto keep the cache up to date. They manage theresourceVersioninternally. - Built-in Caching: Informers maintain a local, read-only cache of resources. This significantly reduces the load on the API server, as your controller can perform
Getoperations directly from the local cache instead of making repeated API calls. This cache is highly optimized for fast lookups. - Resilience and Resynchronization: Informers handle disconnections,
resource too olderrors, and automatically re-establish watches and resynchronize their cache when necessary. They implement robust retry logic. - Efficient Event Delivery: Informers buffer events and deliver them through user-defined
EventHandlerfunctions, abstracting away the complexities of raw watch channels. They also perform periodic resynchronizations (by default, every 10 hours) to ensure the cache eventually converges with the API server, even if some events were missed. - Shared Informers: For large controllers or operators managing multiple resource types,
SharedInformerFactoryallows multiple controllers to share a single informer instance for a given resource type. This means only one List and one Watch operation are performed per resource type, even if multiple parts of your application need to watch that resource, further reducing API server load.
In essence, using Informers allows developers to focus on the core business logic of their controllers – what to do when a resource is added, updated, or deleted – rather than getting bogged down in the intricacies of API interaction, error handling, and state management. They provide a reliable foundation upon which robust and scalable Kubernetes controllers are built.
Deep Dive into Informers: The Backbone of Kubernetes Controllers
Informers are arguably the most critical component of client-go for anyone building Kubernetes controllers. They provide a robust and efficient mechanism for observing changes in Kubernetes resources and maintaining a consistent local cache, dramatically simplifying controller development. To truly appreciate their power, we need to understand their internal architecture and how their various components work together.
What is an Informer? Its Purpose and Benefits
An Informer is a client-side component in client-go that serves two primary purposes: 1. To keep an up-to-date, in-memory cache of Kubernetes resources. This cache is populated by performing an initial List operation and then continually updated via Watch events from the Kubernetes API server. 2. To notify registered EventHandler functions whenever a resource is added, updated, or deleted. These handlers are where your controller's business logic resides.
The benefits of using Informers are profound:
- Reduced API Server Load: By maintaining a local cache, Informers significantly cut down the number of
GETandLISTrequests to the API server. Most controller logic can query the local cache directly. - Eventual Consistency: Informers ensure that your controller eventually has a consistent view of the cluster state, even if watch connections drop or events are missed. They handle re-listing and re-watching automatically.
- Simplified Controller Logic: Developers can focus on
Add,Update,Deleteevent handling without worrying about the underlying complexities of API interaction,resourceVersionmanagement, or connection resilience. - Performance: Accessing objects from an in-memory cache is far faster than making network calls to the API server.
Components of an Informer
An Informer is not a monolithic entity but rather a coordinated system of several key components:
Reflector:- Role: The
Reflectoris the component closest to the Kubernetes API server. Its primary job is to perform the initialListoperation for a specific resource type and then establish aWatchconnection to receive real-time changes. - Mechanism: It maintains the
resourceVersionto ensure that subsequent watch requests pick up events from the correct point in time. If the watch connection breaks or the API server reports a "resource too old" error, theReflectoris responsible for restarting theList-then-Watchcycle, re-fetching the complete state, and establishing a new watch. - Output: The
Reflectorpushes all receivedADD,UPDATE, andDELETEevents into a queue, specifically aDeltaFIFO.
- Role: The
DeltaFIFO:- Role: The
DeltaFIFO(First-In, First-Out queue for deltas) acts as a buffer between theReflectorand theIndexer. It's a key-value store where the keys are object UIDs and the values are a list of "deltas" (events) for that object. - Mechanism: When the
Reflectorpushes an event (e.g.,Object Added,Object Updated), theDeltaFIFOstores this as a delta for the corresponding object. If multiple events for the same object arrive in quick succession, theDeltaFIFOintelligently coalesces them, often by just keeping the latest state forUPDATEevents. This de-duplicates events and ensures that theIndexerandEventHandlers only process the most relevant state changes. - Output: The
DeltaFIFOis consumed by theProcessor, which then updates theIndexerand dispatches events to registeredEventHandlers.
- Role: The
Indexer:- Role: The
Indexeris the actual in-memory cache of Kubernetes objects. It provides fast, indexed access to the objects that theReflectorhas observed and theDeltaFIFOhas processed. - Mechanism: As
DeltaFIFOitems are processed, theIndexeris updated with the latest version of the objects. It stores these objects in a map (typicallymap[string]interface{}where the key isnamespace/nameor justnamefor cluster-scoped resources). Critically,Indexeralso supports secondary indexes, allowing you to query objects based on arbitrary fields (e.g., all pods with a specific label, or all deployments owned by a certain custom resource). This is incredibly powerful for controllers that need to quickly find related resources. - Usage: Controllers primarily interact with the
Indexer(often via aListerinterface) toGetobjects by name orListobjects based on index queries, without hitting the API server.
- Role: The
EventHandler:- Role:
EventHandlers are the callbacks defined by the controller developer. They are invoked by the Informer whenever anADD,UPDATE, orDELETEevent is processed for a resource. - Mechanism: An Informer allows you to register
ResourceEventHandlerfunctions. These functions typically receive the actual Kubernetes object (e.g.,*corev1.Pod,*v1alpha1.MyCustomResource) forAddFunc,UpdateFunc(old and new objects), andDeleteFunc(the deleted object). It's within these handlers that your controller's specific business logic for reacting to resource changes is implemented. For instance, anAddFuncfor aDeploymentmight trigger a check if its associated Service exists, while anUpdateFuncfor aCustomResourcemight trigger a reconciliation loop if itsspechas changed.
- Role:
Setting up an Informer for Standard Resources
Let's illustrate how to set up an Informer for a standard Kubernetes resource, like Deployment:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 1. Configure client-go
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig == "" {
kubeconfig = clientcmd.NewDefaultClientConfigLoadingRules().Get = os.Getenv("HOME") + "/.kube/config"
}
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Fatalf("Error building clientset: %v", err)
}
// 2. Create a SharedInformerFactory
// Resync period: how often to re-list all resources, even if no events were observed.
// This helps ensure eventual consistency and recover from missed events.
tweakListOptions := informers.With -> "namespace", "default" // Optional: filter for a specific namespace
factory := informers.NewSharedInformerFactoryWithOptions(clientset, time.Minute*5, tweakListOptions)
// 3. Get the Informer for Deployments
deploymentInformer := factory.Apps().V1().Deployments()
// 4. Register Event Handlers
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
deployment := obj.(*appsv1.Deployment)
fmt.Printf("Deployment Added: %s/%s\n", deployment.Namespace, deployment.Name)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldDeployment := oldObj.(*appsv1.Deployment)
newDeployment := newObj.(*appsv1.Deployment)
if oldDeployment.ResourceVersion == newDeployment.ResourceVersion {
// Periodic resync will send update events for the same object
// without any change. We can filter out these periodic resyncs.
return
}
fmt.Printf("Deployment Updated: %s/%s (Replicas: %d -> %d)\n",
newDeployment.Namespace, newDeployment.Name,
*oldDeployment.Spec.Replicas, *newDeployment.Spec.Replicas)
},
DeleteFunc: func(obj interface{}) {
deployment := obj.(*appsv1.Deployment)
fmt.Printf("Deployment Deleted: %s/%s\n", deployment.Namespace, deployment.Name)
},
})
// 5. Start the Informers
stopCh := make(chan struct{})
defer close(stopCh)
factory.Start(stopCh) // Starts all informers in the factory concurrently
factory.WaitForCacheSync(stopCh) // Waits for all caches to be synced
log.Println("Informers synced. Watching for Deployment changes...")
// 6. Keep the main Goroutine running until termination signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
}
This example demonstrates the standard workflow: * We obtain a Clientset for interaction with Kubernetes. * We create a SharedInformerFactory. This factory is a powerful concept because it allows multiple controllers within the same application to share the same informer for a given resource type, thus optimizing API calls and resource usage. * We retrieve a specific Informer (e.g., for Deployments) from the factory. * We register our EventHandler functions to receive notifications. * We start all informers in the factory and wait for their caches to synchronize with the API server. This WaitForCacheSync call is crucial; it ensures your handlers won't be called until the informer's cache has been fully populated with the initial list of resources. Without it, your controller might attempt to process events before it has a complete view of the cluster state, leading to errors. * Finally, we keep the main function alive to continuously process events.
The Informer pattern, especially when combined with a Workqueue (which we'll discuss later), forms the foundational building block for sophisticated and reliable Kubernetes controllers.
Implementing Custom Resources and CRDs in Golang
To build a controller that watches custom resources, you first need to define those custom resources themselves. This involves creating a Custom Resource Definition (CRD) in Kubernetes and generating the corresponding Golang types and client code that your controller will use to interact with instances of that CRD. This process typically involves a structured approach to your Go project and leveraging tools like controller-gen.
Defining CRD Go Structs
The first step is to define the Go structs that represent your Custom Resource. These structs will reflect the schema of your CRD and include fields for spec (the desired state) and status (the observed state). It's a convention to place these type definitions in a pkg/apis/<group>/<version> directory within your project.
Let's imagine we want to create a custom resource called MyApplication to manage a simple application deployment.
Directory structure:
my-controller/
├── go.mod
├── go.sum
├── main.go
└── pkg/
└── apis/
└── example/
└── v1alpha1/
├── doc.go
├── register.go
├── types.go
pkg/apis/example/v1alpha1/doc.go: This file is for package documentation and often contains markers for code generation.
// +k8s:deepcopy-gen=package,register
// +groupName=example.com
// Package v1alpha1 is the v1alpha1 version of the API.
package v1alpha1
+k8s:deepcopy-gen=package,register: This marker tellscontroller-gento generate deepcopy methods for all types in this package and to register them with the Kubernetes API machinery.+groupName=example.com: Defines the API group for your CRD.
pkg/apis/example/v1alpha1/register.go: This file registers your types with the Kubernetes API scheme.
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1alpha1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
// SchemeBuilder initializes a scheme builder
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme adds the types in this group-version to the given scheme.
AddToScheme = SchemeBuilder.AddToScheme
)
// addKnownTypes adds our types to the API scheme by registering
// MyApplication and MyApplicationList
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&MyApplication{},
&MyApplicationList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}
pkg/apis/example/v1alpha1/types.go: This is where you define the Go structs for your Custom Resource, including its Spec and Status.
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyApplication is the Schema for the myapplications API
type MyApplication struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MyApplicationSpec `json:"spec,omitempty"`
Status MyApplicationStatus `json:"status,omitempty"`
}
// MyApplicationSpec defines the desired state of MyApplication
type MyApplicationSpec struct {
Image string `json:"image"` // Container image to run
Replicas *int32 `json:"replicas"` // Number of desired replicas
Port int32 `json:"port"` // Port the application listens on
}
// MyApplicationStatus defines the observed state of MyApplication
type MyApplicationStatus struct {
AvailableReplicas int32 `json:"availableReplicas"` // Total number of available replicas
Phase string `json:"phase"` // Current phase of the application (e.g., "Pending", "Running", "Failed")
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyApplicationList contains a list of MyApplication
type MyApplicationList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []MyApplication `json:"items"`
}
+genclient: This marker tellscontroller-gento generate aclient-goclient for this resource type.+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object: This ensures deepcopy methods are generated, which are crucial for safe object manipulation in concurrent environments and for interacting withclient-go's internal caching mechanisms.metav1.TypeMetaandmetav1.ObjectMeta: These are standard Kubernetes fields that every resource must embed, providing API version, kind, name, namespace, labels, annotations, etc.MyApplicationSpec: Defines the configurable parameters for your application.MyApplicationStatus: Defines the observed state, which your controller will update to reflect the actual state of the world.
Using controller-gen to Generate Client-Go Code for CRDs
Manually writing all the boilerplate code for clients, informers, and listers for your custom resources is tedious and error-prone. Fortunately, the Kubernetes ecosystem provides controller-gen, a powerful tool that automates this process.
First, install controller-gen:
go install sigs.k8s.io/controller-tools/cmd/controller-gen@latest
Then, you typically add controller-gen targets to your Makefile or run it directly. For deepcopy, client, lister, and informer generation, you would run a command like this from your project root:
controller-gen object:headerFile="hack/boilerplate.go.txt" \
deepcopy \
client \
lister \
informer \
paths="./..."
object:headerFile="hack/boilerplate.go.txt": (Optional) Specifies a boilerplate header file to add to generated files.deepcopy: Generateszz_generated.deepcopy.gofiles, which contain methods for deep copying your custom resource types. These are essential for thread-safe operations withinclient-goand controllers.client: Generatespkg/clientdirectories containing a customClientset(e.g.,pkg/client/clientset/versioned), typed clients (e.g.,pkg/client/clientset/versioned/typed/example/v1alpha1), and aFakeClientsetfor testing.lister: Generatespkg/client/listers/<group>/<version>directories containingListerinterfaces for querying your custom resources from an informer's cache.informer: Generatespkg/client/informers/<group>/<version>directories containingSharedInformerFactoryand specificInformerimplementations for your custom resources.paths="./...": Tellscontroller-gento scan all Go packages under the current directory.
After running controller-gen, your pkg/client directory will be populated with all the necessary boilerplate code. This generated code includes: * A custom clientset (e.g., exampleclientset.NewForConfig(config)) that you can use to interact with your MyApplication CRs directly, just like you would with kubernetes.NewForConfig(config) for built-in resources. * Listers that provide Get and List methods to query the informer's cache. * Informers that implement the SharedInformerFactory pattern for your custom resource group and version.
With the custom resource types defined and the client code generated, your Golang controller is now equipped to specifically interact with MyApplication objects, including the crucial ability to watch for their changes. This groundwork is essential before we can effectively set up an informer for our custom resource.
APIPark is a high-performance AI gateway that allows you to securely access the most comprehensive LLM APIs globally on the APIPark platform, including OpenAI, Anthropic, Mistral, Llama2, Google Gemini, and more.Try APIPark now! 👇👇👇
Watching Custom Resources with Informers
Now that we have our custom resource types defined and the client code generated, we can put everything together to watch for changes in our MyApplication custom resources using the Informer pattern. This involves using the generated client code and setting up a SharedInformerFactory specific to our custom API group.
The process largely mirrors watching standard resources, but with a few key differences regarding the client and informer factory instantiation.
Creating a Specific client-go Client for the Custom Resource
Instead of using kubernetes.NewForConfig(), which provides clients for built-in Kubernetes resources, we will use the generated clientset for our custom resource. This clientset is found in the pkg/client/clientset/versioned directory (e.g., my-controller/pkg/client/clientset/versioned).
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/kubernetes" // For generic k8s client, if needed
"k8s.io/client-go/tools/cache" // For ResourceEventHandlerFuncs
// Import our generated clients
clientset "my-controller/pkg/client/clientset/versioned"
informers "my-controller/pkg/client/informers/externalversions"
v1alpha1 "my-controller/pkg/apis/example/v1alpha1"
)
func main() {
// 1. Configure Kubernetes client (standard & custom)
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig == "" {
kubeconfig = clientcmd.NewDefaultClientConfigLoadingRules().Get
}
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
// Create the generic Kubernetes clientset (for managing built-in resources)
_, err = kubernetes.NewForConfig(config) // We might not need this explicitly here, but good practice.
if err != nil {
log.Fatalf("Error building generic clientset: %v", err)
}
// Create the custom clientset for our MyApplication resources
exampleClientset, err := clientset.NewForConfig(config)
if err != nil {
log.Fatalf("Error building example clientset: %v", err)
}
// 2. Create a SharedInformerFactory for our custom resource group
// The `informers.NewSharedInformerFactory()` function comes from the generated code
// It's specific to our API group (example.com) and versions (v1alpha1)
tweakListOptions := informers.With -> "namespace", "default" // Optional: watch only in 'default' namespace
exampleInformerFactory := informers.NewSharedInformerFactoryWithOptions(exampleClientset, time.Minute*5, tweakListOptions)
// 3. Get the Informer for MyApplication resources
myAppInformer := exampleInformerFactory.Example().V1alpha1().MyApplications()
// 4. Register Event Handlers
myAppInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
myApp := obj.(*v1alpha1.MyApplication)
fmt.Printf("MyApplication Added: %s/%s, Image: %s, Replicas: %d\n",
myApp.Namespace, myApp.Name, myApp.Spec.Image, *myApp.Spec.Replicas)
// Here, you'd trigger your controller logic to create deployments, services, etc.
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldMyApp := oldObj.(*v1alpha1.MyApplication)
newMyApp := newObj.(*v1alpha1.MyApplication)
if oldMyApp.ResourceVersion == newMyApp.ResourceVersion {
return // Periodic resync without actual changes
}
fmt.Printf("MyApplication Updated: %s/%s, Image: %s -> %s, Replicas: %d -> %d\n",
newMyApp.Namespace, newMyApp.Name, oldMyApp.Spec.Image, newMyApp.Spec.Image,
*oldMyApp.Spec.Replicas, *newMyApp.Spec.Replicas)
// Here, you'd trigger your controller logic to update existing deployments, services, etc.
},
DeleteFunc: func(obj interface{}) {
myApp := obj.(*v1alpha1.MyApplication)
fmt.Printf("MyApplication Deleted: %s/%s\n", myApp.Namespace, myApp.Name)
// Here, you'd trigger your controller logic to clean up associated resources.
},
})
// 5. Start the Informers
stopCh := make(chan struct{})
defer close(stopCh)
exampleInformerFactory.Start(stopCh) // Starts all informers in this factory
exampleInformerFactory.WaitForCacheSync(stopCh) // Waits for all caches to be synced
log.Println("Custom Resource Informers synced. Watching for MyApplication changes...")
// 6. Keep the main Goroutine running until termination signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down custom resource watcher...")
}
Explanation of the Workflow
- Configuration: We first set up the
rest.Configusingclientcmd.BuildConfigFromFlags. This configuration is then used to create both the standardkubernetes.Clientset(if your controller needs to manage built-in resources likeDeploymentsorServices) and, more importantly for this context, theexampleClientsetgenerated for your custom resource group. SharedInformerFactory: We instantiateinformers.NewSharedInformerFactoryWithOptions(). ThisNewSharedInformerFactoryfunction is generated bycontroller-genand is specific to your custom API group (example.com). It takes yourexampleClientsetand a resync period as arguments. Theinformers.With -> "namespace", "default"option is a powerful way to scope your informer to watch resources only within a specific namespace, which is crucial for multi-tenancy or resource isolation.- Informer Retrieval: From the
exampleInformerFactory, we retrieve the specificInformerforMyApplicationobjects usingexampleInformerFactory.Example().V1alpha1().MyApplications(). The chainExample().V1alpha1()corresponds to your API group and version. - Event Handlers: We register
cache.ResourceEventHandlerFuncswith themyAppInformer.Informer().AddEventHandler(). These are the callback functions that will be executed when anMyApplicationobject is added, updated, or deleted. Inside these handlers, you'd implement the core reconciliation logic of your controller. For example, anAddFuncmight create a KubernetesDeploymentbased on theMyApplication.Spec.ImageandMyApplication.Spec.Replicas, while anUpdateFuncwould check if thoseSpecfields have changed and update the correspondingDeploymentaccordingly. - Starting and Syncing:
exampleInformerFactory.Start(stopCh)kicks off all the informers managed by this factory in separate goroutines. Each informer begins its List-then-Watch cycle.exampleInformerFactory.WaitForCacheSync(stopCh)is critical; it blocks until all the caches for all informers in the factory have been fully populated with the initial list of resources from the API server. This prevents your event handlers from being called before the cache has a complete picture of the cluster state, which could lead tonilpointers or incorrect assumptions about resource existence. - Continuous Operation: The
select {}or<-sigChat the end keeps themaingoroutine alive indefinitely, allowing the informer goroutines to continue running and processing events until a termination signal (like Ctrl+C) is received.
By following this pattern, your Golang application gains the ability to reliably and efficiently monitor any changes to your custom resources, forming the basis for powerful and responsive Kubernetes operators. The use of generated clients and informers streamlines development and ensures adherence to best practices for Kubernetes API interaction.
Building a Robust Controller for Custom Resources
Watching custom resource changes is only one part of building a fully functional and resilient Kubernetes controller. The next crucial step is to effectively process those change events, perform reconciliation, and handle errors in a robust manner. This involves integrating several key patterns and considerations into your controller's design.
Workqueues: Decoupling Event Handling from Processing Logic
Directly executing complex reconciliation logic within an EventHandler can be problematic. Event handlers are typically called sequentially, and if one handler takes too long, it can block others, potentially missing or delaying subsequent events. Furthermore, if reconciliation fails, you need a mechanism to retry. This is where Workqueues come in.
A Workqueue (specifically k8s.io/client-go/util/workqueue) is a thread-safe queue that allows you to decouple the event handling phase from the reconciliation logic. Instead of doing the work directly in AddFunc, UpdateFunc, or DeleteFunc, you merely push the key (typically namespace/name) of the changed resource into a workqueue. A separate set of worker goroutines then concurrently pull items from this workqueue, process them, and handle retries if necessary.
Benefits of Workqueues:
- Concurrency: Multiple worker goroutines can process items from the queue in parallel, improving throughput.
- Decoupling: Event handlers remain lightweight, ensuring rapid ingestion of events from informers.
- Rate Limiting and Retries:
Workqueueimplementations likerate_limiting_queueautomatically handle exponential backoff and retries for failed reconciliation attempts, preventing a rapid-fire succession of retries that could overwhelm the API server or external services. - Idempotency: When an item is pushed multiple times (e.g., an object is updated twice before the first update is processed), the workqueue ensures it's only processed once for its latest state.
Example Workqueue Integration:
// In main.go or controller's main loop:
// ... after setting up informer and clientset ...
// Create a rate-limiting workqueue
workqueue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// In Add/Update/DeleteFunc for your MyApplication informer:
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj) // Returns "namespace/name" or "name"
if err == nil {
workqueue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(newObj)
if err == nil {
workqueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// For deletes, handle the case where obj might be a DeletedFinalStateUnknown object
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
workqueue.Add(key)
}
},
// Controller's worker function (run in separate goroutines)
func runWorker() {
for processNextItem() {
}
}
func processNextItem() bool {
obj, shutdown := workqueue.Get()
if shutdown {
return false
}
// We call Done here so the workqueue knows we have finished
// processing this item. That way, the item can be passed on to
// other workers or to the next round of processing if there is a
// failure.
defer workqueue.Done(obj)
// We expect a key of type string "namespace/name" to come off the workqueue.
key, ok := obj.(string)
if !ok {
workqueue.Forget(obj) // If the item is not a string, we don't know what to do with it.
// Log an error or panic.
return true
}
// Run the reconciliation logic.
if err := reconcileMyApplication(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
// The rate limiter will manage how quickly it's retried.
workqueue.AddRateLimited(key)
// Log the error
return true
}
// If no error occurs we Forget this item so it's not retried.
workqueue.Forget(obj)
return true
}
func reconcileMyApplication(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
// Log error, likely malformed key
return nil // Don't requeue
}
// Get the MyApplication object from the informer's cache
// Use the lister for thread-safe access to the cache
myApp, err := myAppInformer.Lister().MyApplications(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
// The MyApplication was deleted, perform cleanup if necessary
log.Printf("MyApplication %s/%s deleted, performing cleanup.", namespace, name)
return nil // Don't requeue
}
// Other errors, requeue
return fmt.Errorf("failed to get MyApplication %s/%s from informer cache: %w", namespace, name, err)
}
// --- Core Reconciliation Logic Here ---
// Example: Ensure a Deployment exists based on MyApplication.Spec
// Example: Update MyApplication.Status based on observed state
log.Printf("Reconciling MyApplication %s/%s (Image: %s, Replicas: %d)",
myApp.Namespace, myApp.Name, myApp.Spec.Image, *myApp.Spec.Replicas)
// Your logic to create/update/delete associated resources (e.g., Deployments, Services)
// You would use the generic Kubernetes clientset here
// Example: EnsureDeploymentForMyApp(myApp, clientset)
// If reconciliation is successful, update the MyApplication's status
// myApp.Status.AvailableReplicas = ...
// myApp.Status.Phase = "Running"
// exampleClientset.ExampleV1alpha1().MyApplications(myApp.Namespace).UpdateStatus(context.TODO(), myApp, metav1.UpdateOptions{})
return nil
}
// In main, start worker goroutines:
// for i := 0; i < numWorkers; i++ {
// go wait.Until(runWorker, time.Second, stopCh)
// }
Shared Informers: Efficiency for Multiple Controllers
As mentioned earlier, SharedInformerFactory is key for efficiency. If your operator consists of multiple controllers (e.g., one watching MyApplication and another watching Service), or if different parts of a single controller need access to the same resource type, using SharedInformerFactory ensures that only one List and Watch call is made per resource type to the API server. All consuming controllers share the same underlying cache, minimizing resource consumption and API server load.
Error Handling and Idempotency
- Error Handling: In
reconcileMyApplication, if an error occurs that is transient (e.g., network issue, API server temporary unavailable), return the error so thatworkqueue.AddRateLimited(key)is called, scheduling a retry. If the error is permanent (e.g., invalidMyApplication.Spec), log it and returnnilso it's not retried indefinitely. - Idempotency: Controllers must be idempotent. This means applying the same desired state multiple times should always result in the same actual state, without unintended side effects. For example, if your controller creates a Deployment, it should check if the Deployment already exists before attempting to create it again. If it does, it should then check if its
specneeds updating. All Kubernetes API calls (create, update, delete) should be made with idempotency in mind.
Resource Versioning
client-go and Informers internally manage ResourceVersion to ensure consistency. When an Informer performs a List operation, it gets a resourceVersion for that point in time. All subsequent Watch requests are made with this resourceVersion, ensuring that no events are missed. When you retrieve objects from the Informer's cache using a Lister, those objects also carry their resourceVersion. This is crucial when performing updates. If you Get an object, modify it, and then Update it, you must use the resourceVersion from the Get operation. If another change happened to the object on the API server in between your Get and Update, the Update will fail with a conflict error (resourceVersion mismatch), indicating you need to re-fetch, re-apply your changes, and retry. This optimistic concurrency control prevents lost updates.
Leader Election: For Highly Available Controllers
In production environments, you typically run multiple replicas of your controller for high availability. However, only one instance should be actively performing reconciliation at any given time to avoid race conditions and redundant work (the "active-passive" or "active-standby" model). client-go provides utilities for Leader Election (using the Lease API, previously Endpoints or ConfigMaps), allowing only one controller instance to become the "leader" and perform reconciliation. Other instances remain in standby, ready to take over if the leader fails. This ensures that your controller is highly available and resilient to node failures.
Practical Example: A Simple Golang Operator for a Custom Resource
Let's consolidate our understanding with a practical, albeit simplified, example of a Golang operator that watches a custom resource called MyService and ensures a corresponding Kubernetes Deployment and Service exist.
Define MyService Custom Resource
First, we define our MyService CRD. It will specify the container image, replica count, and the port for the HTTP service.
pkg/apis/example/v1alpha1/types.go (Modified for MyService)
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyService is the Schema for the myservices API
type MyService struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec MyServiceSpec `json:"spec,omitempty"`
Status MyServiceStatus `json:"status,omitempty"`
}
// MyServiceSpec defines the desired state of MyService
type MyServiceSpec struct {
Image string `json:"image"` // Container image (e.g., nginx:latest)
Replicas *int32 `json:"replicas"` // Number of desired replicas
Port int32 `json:"port"` // Port the application listens on (e.g., 80)
Labels map[string]string `json:"labels,omitempty"` // Optional labels for pods
}
// MyServiceStatus defines the observed state of MyService
type MyServiceStatus struct {
AvailableReplicas int32 `json:"availableReplicas"` // Total number of available replicas
Phase string `json:"phase"` // Current phase (e.g., "Pending", "Ready", "Error")
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// MyServiceList contains a list of MyService
type MyServiceList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []MyService `json:"items"`
}
Remember to run controller-gen after making these changes to generate updated deepcopy, client, lister, and informer code.
CRD Definition (YAML)
After generating the Go types, controller-gen (or manually, if preferred) can also generate the YAML definition for the CRD, which you would apply to your Kubernetes cluster:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: myservices.example.com
spec:
group: example.com
names:
kind: MyService
listKind: MyServiceList
plural: myservices
singular: myservice
scope: Namespaced
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
image:
type: string
description: Container image (e.g., nginx:latest)
replicas:
type: integer
format: int32
description: Number of desired replicas
port:
type: integer
format: int32
description: Port the application listens on (e.g., 80)
labels:
type: object
x-kubernetes-preserve-unknown-fields: true
required:
- image
- replicas
- port
status:
type: object
properties:
availableReplicas:
type: integer
format: int32
phase:
type: string
subresources:
status: {}
Table: MyServiceSpec Fields and Purpose
Here's a quick overview of the MyServiceSpec fields:
| Field Name | Type | Description | Example Value |
|---|---|---|---|
image |
string | Docker image for the application container. | nginx:1.21.6 |
replicas |
*int32 | Desired number of Pod replicas for the application. | 3 |
port |
int32 | The port on which the application listens inside Pods. | 80 |
labels |
map | Optional labels to apply to the created Pods and Service. | {"app": "frontend"} |
Controller Implementation (main.go)
This main.go will be the heart of our simple operator. It sets up both the custom resource informer (MyService) and a standard resource informer (Deployment) for the controller to properly reconcile.
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
// Import our generated custom resource clients and informers
exampleclientset "my-controller/pkg/client/clientset/versioned"
exampleinformers "my-controller/pkg/client/informers/externalversions"
v1alpha1 "my-controller/pkg/apis/example/v1alpha1"
)
const (
controllerAgentName = "my-service-controller"
myServiceLabelKey = "myservice.example.com/managed-by"
)
// Controller defines our custom controller structure
type Controller struct {
kubeClientset kubernetes.Interface
exampleClientset exampleclientset.Interface
myServiceLister v1alpha1.MyServiceLister
deploymentsLister cache.Lister
servicesLister cache.Lister
myServicesSynced cache.InformerSynced
deploymentsSynced cache.InformerSynced
servicesSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface
}
// NewController creates a new instance of the Controller
func NewController(
kubeClientset kubernetes.Interface,
exampleClientset exampleclientset.Interface,
kubeInformerFactory informers.SharedInformerFactory,
exampleInformerFactory exampleinformers.SharedInformerFactory) *Controller {
// Get informers for Deployments, Services, and MyServices
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
serviceInformer := kubeInformerFactory.Core().V1().Services()
myServiceInformer := exampleInformerFactory.Example().V1alpha1().MyServices()
c := &Controller{
kubeClientset: kubeClientset,
exampleClientset: exampleClientset,
myServiceLister: myServiceInformer.Lister(),
deploymentsLister: deploymentInformer.Lister(),
servicesLister: serviceInformer.Lister(),
myServicesSynced: myServiceInformer.Informer().HasSynced,
deploymentsSynced: deploymentInformer.Informer().HasSynced,
servicesSynced: serviceInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerAgentName),
}
log.Println("Setting up event handlers...")
// Watch for changes to MyService custom resources
myServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.enqueueMyService,
UpdateFunc: func(oldObj, newObj interface{}) {
oldMyService := oldObj.(*v1alpha1.MyService)
newMyService := newObj.(*v1alpha1.MyService)
if oldMyService.ResourceVersion == newMyService.ResourceVersion {
return // Periodic resync, no actual change
}
c.enqueueMyService(newObj)
},
DeleteFunc: c.enqueueMyService,
})
// Watch for changes to Deployments and Services that are owned by a MyService
// We use the custom label to filter
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
c.handleObject(newObj)
},
DeleteFunc: c.handleObject,
})
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
c.handleObject(newObj)
},
DeleteFunc: c.handleObject,
})
return c
}
// enqueueMyService takes a MyService resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* block.
func (c *Controller) enqueueMyService(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
log.Printf("Error getting key for object: %v", err)
return
}
c.workqueue.Add(key)
}
// handleObject will take any resource that is a Kubernetes object and attempt to find
// its owner MyService, if one exists. Then it puts the owner MyService's key
// into the workqueue for reconciliation.
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Printf("Error decoding object, invalid type")
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
log.Printf("Error decoding object tombstone, invalid type")
return
}
log.Printf("Recovered deleted object '%s' from tombstone", object.GetName())
}
log.Printf("Processing object: %s", object.GetName())
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// If this object is not controlled by a MyService resource, ignore it.
if ownerRef.Kind != "MyService" {
return
}
myService, err := c.myServiceLister.MyServices(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
log.Printf("Ignoring orphaned object '%s/%s' of MyService '%s'", object.GetNamespace(), object.GetName(), ownerRef.Name)
return
}
c.enqueueMyService(myService)
return
}
}
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer c.workqueue.ShutDown()
log.Println("Starting MyService controller...")
log.Println("Waiting for informer caches to sync...")
if ok := cache.WaitForCacheSync(stopCh, c.myServicesSynced, c.deploymentsSynced, c.servicesSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
log.Println("Starting workers")
for i := 0; i < threadiness; i++ {
go c.runWorker()
}
log.Println("Started workers")
<-stopCh
log.Println("Shutting down workers")
return nil
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single item off the workqueue and
// attempt to process it, by calling the reconcile handler.
func func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
defer c.workqueue.Done(obj)
var key string
var ok bool
if key, ok = obj.(string); !ok {
c.workqueue.Forget(obj)
log.Printf("Expected string in workqueue but got %#v", obj)
return true
}
if err := c.reconcileHandler(key); err != nil {
c.workqueue.AddRateLimited(key) // Requeue with rate limit
log.Printf("Error reconciling '%s': %v", key, err)
return true
}
c.workqueue.Forget(obj) // Successfully processed, remove from queue
return true
}
// reconcileHandler is the core reconciliation loop for a MyService resource.
func (c *Controller) reconcileHandler(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("invalid resource key: %s", key)
}
// Get the MyService resource from the lister (informer cache)
myService, err := c.myServiceLister.MyServices(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
log.Printf("MyService '%s' in namespace '%s' deleted, cleaning up resources...", name, namespace)
// MyService no longer exists, clean up associated Deployment and Service
err = c.kubeClientset.AppsV1().Deployments(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete Deployment '%s': %w", name, err)
}
err = c.kubeClientset.CoreV1().Services(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete Service '%s': %w", name, err)
}
return nil // No error, MyService is gone, cleanup done/not needed
}
return fmt.Errorf("failed to get MyService '%s/%s' from lister: %w", namespace, name, err)
}
// --- Reconcile Deployment ---
deployment, err := c.kubeClientset.AppsV1().Deployments(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if errors.IsNotFound(err) {
// Deployment does not exist, create it
deployment, err = c.kubeClientset.AppsV1().Deployments(namespace).Create(context.TODO(), newDeployment(myService), metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create Deployment for MyService '%s/%s': %w", namespace, name, err)
}
log.Printf("Created Deployment '%s' for MyService '%s/%s'", deployment.Name, namespace, name)
} else if err != nil {
return fmt.Errorf("failed to get Deployment '%s': %w", name, err)
} else {
// Deployment exists, ensure it's up to date
if !reflect.DeepEqual(myService.Spec.Replicas, deployment.Spec.Replicas) ||
myService.Spec.Image != deployment.Spec.Template.Spec.Containers[0].Image {
updatedDeployment := newDeployment(myService)
deployment, err = c.kubeClientset.AppsV1().Deployments(namespace).Update(context.TODO(), updatedDeployment, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update Deployment '%s' for MyService '%s/%s': %w", name, namespace, name, err)
}
log.Printf("Updated Deployment '%s' for MyService '%s/%s'", deployment.Name, namespace, name)
}
}
// --- Reconcile Service ---
service, err := c.kubeClientset.CoreV1().Services(namespace).Get(context.TODO(), name, metav1.GetOptions{})
if errors.IsNotFound(err) {
// Service does not exist, create it
service, err = c.kubeClientset.CoreV1().Services(namespace).Create(context.TODO(), newService(myService), metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create Service for MyService '%s/%s': %w", namespace, name, err)
}
log.Printf("Created Service '%s' for MyService '%s/%s'", service.Name, namespace, name)
} else if err != nil {
return fmt.Errorf("failed to get Service '%s': %w", name, err)
} else {
// Service exists, ensure it's up to date (e.g., port)
if service.Spec.Ports[0].Port != myService.Spec.Port {
updatedService := newService(myService)
service, err = c.kubeClientset.CoreV1().Services(namespace).Update(context.TODO(), updatedService, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update Service '%s' for MyService '%s/%s': %w", name, namespace, name, err)
}
log.Printf("Updated Service '%s' for MyService '%s/%s'", service.Name, namespace, name)
}
}
// --- Update MyService Status ---
// DO NOT use the MyService object directly from the lister for updates,
// it's a shared object. Always fetch the latest from the API or deep copy.
latestMyService, err := c.myServiceLister.MyServices(namespace).Get(name)
if err != nil {
// MyService might have been deleted while we were reconciling, requeue and try again later
return fmt.Errorf("failed to get latest MyService '%s/%s' for status update: %w", namespace, name, err)
}
if latestMyService.Status.AvailableReplicas != deployment.Status.AvailableReplicas ||
latestMyService.Status.Phase != "Ready" { // Simplified phase logic for example
myServiceCopy := latestMyService.DeepCopy()
myServiceCopy.Status.AvailableReplicas = deployment.Status.AvailableReplicas
if deployment.Status.AvailableReplicas > 0 {
myServiceCopy.Status.Phase = "Ready"
} else {
myServiceCopy.Status.Phase = "Pending"
}
_, err = c.exampleClientset.ExampleV1alpha1().MyServices(namespace).UpdateStatus(context.TODO(), myServiceCopy, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update status for MyService '%s/%s': %w", namespace, name, err)
}
log.Printf("Updated status for MyService '%s/%s'", namespace, name)
}
return nil
}
// newDeployment creates a new Deployment for a MyService resource.
func newDeployment(myService *v1alpha1.MyService) *appsv1.Deployment {
labels := map[string]string{
myServiceLabelKey: myService.Name,
"app": myService.Name,
}
for k, v := range myService.Spec.Labels {
labels[k] = v
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: myService.Name,
Namespace: myService.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(myService, v1alpha1.SchemeGroupVersion.WithKind("MyService")),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: myService.Spec.Replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "web",
Image: myService.Spec.Image,
Ports: []corev1.ContainerPort{
{
ContainerPort: myService.Spec.Port,
},
},
},
},
},
},
},
}
}
// newService creates a new Service for a MyService resource.
func newService(myService *v1alpha1.MyService) *corev1.Service {
labels := map[string]string{
myServiceLabelKey: myService.Name,
"app": myService.Name,
}
for k, v := range myService.Spec.Labels {
labels[k] = v
}
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: myService.Name,
Namespace: myService.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(myService, v1alpha1.SchemeGroupVersion.WithKind("MyService")),
},
},
Spec: corev1.ServiceSpec{
Selector: labels,
Ports: []corev1.ServicePort{
{
Protocol: corev1.ProtocolTCP,
Port: myService.Spec.Port,
TargetPort: intstr.FromInt(int(myService.Spec.Port)),
},
},
Type: corev1.ServiceTypeClusterIP, // Or NodePort/LoadBalancer if needed
},
}
}
func main() {
var kubeconfig string
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
var masterURL string
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.Parse()
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
log.Fatalf("Error building kubeconfig: %v", err)
}
kubeClientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
log.Fatalf("Error building kubernetes clientset: %v", err)
}
exampleClientset, err := exampleclientset.NewForConfig(cfg)
if err != nil {
log.Fatalf("Error building example clientset: %v", err)
}
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClientset, time.Minute*5)
exampleInformerFactory := exampleinformers.NewSharedInformerFactory(exampleClientset, time.Minute*5)
controller := NewController(kubeClientset, exampleClientset, kubeInformerFactory, exampleInformerFactory)
stopCh := make(chan struct{})
defer close(stopCh)
// Start all informers
kubeInformerFactory.Start(stopCh)
exampleInformerFactory.Start(stopCh)
// Run the controller
if err = controller.Run(2, stopCh); err != nil { // 2 worker threads
log.Fatalf("Error running controller: %v", err)
}
// Wait for OS signals to stop
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
}
This comprehensive main.go file outlines a functional controller. It demonstrates: * Initializing both standard and custom clientsets. * Setting up SharedInformerFactory for both standard Kubernetes resources and our custom MyService resources. * The Controller struct encapsulating clientsets, listers (for cache access), informerSynced functions (for cache readiness), and the workqueue. * AddEventHandler calls to enqueue MyService keys into the workqueue when changes occur. * A handleObject function to re-enqueue the owner MyService if a managed Deployment or Service changes or is deleted (this helps ensure the controller reacts to external changes to its managed resources). * The Run method for starting informers, waiting for cache sync, and initiating worker goroutines. * The reconcileHandler function, which: * Retrieves the MyService from the cache. * Handles deletion cleanup. * Ensures a Deployment exists or is updated based on MyService.Spec. * Ensures a Service exists or is updated based on MyService.Spec. * Updates the MyService.Status to reflect the observed state. * Helper functions newDeployment and newService to construct Kubernetes objects with proper OwnerReferences, which are crucial for Kubernetes' garbage collection to automatically clean up owned resources when the owner MyService is deleted.
This example provides a solid foundation for building more complex operators, demonstrating how to effectively watch, reconcile, and manage both custom and standard Kubernetes resources in a robust and efficient manner using Golang.
Bridging to API and Gateway: Custom Resources in a Broader Ecosystem
While custom resources and their controllers operate within the Kubernetes ecosystem, their function often extends to defining and managing services that interact with the broader api landscape. The concepts of "api" and "gateway" are intimately intertwined with how these custom resources are exposed, consumed, and managed, both internally and externally. Understanding this bridge is key to fully appreciating the value of custom resources in a modern, distributed system architecture.
Custom Resources as Internal APIs
Fundamentally, Custom Resources (CRs) define new APIs within Kubernetes itself. When you define a MyService CRD, you're creating a declarative API for developers and operators to specify how their services should run on the cluster. Instead of directly manipulating Deployment, Service, Ingress, and other native Kubernetes objects, users interact with the higher-level MyService API. This abstraction simplifies the user experience, encapsulates operational knowledge, and provides a consistent interface.
- API Standardization: CRDs allow teams to standardize how certain functionalities or infrastructure components are exposed and consumed within their Kubernetes clusters. This internal API consistency improves developer velocity and reduces operational overhead.
- Declarative Control: Users declare their desired state using the
MyServicecustom resource, and the controller ensures that state is met, adhering to the principles of Kubernetes. This is a powerful form of API interaction. - Abstraction Layer: CRs act as an abstraction layer, hiding the underlying complexity of multiple Kubernetes primitives. A user doesn't need to know the intricacies of
Podtemplates,Serviceselectors, orIngressrules; they just specify theirMyService, and the controller handles the rest.
Controllers as "Gateways" to External Systems
Often, the services defined or managed by custom resources are not entirely self-contained within Kubernetes. Controllers frequently act as gateways, translating the desired state expressed in a Custom Resource into actual operations on external infrastructure or services.
Consider these scenarios:
- Cloud Provisioning: A
DatabaseClusterCR controller might act as a gateway to provision a database instance in AWS RDS, Google Cloud SQL, or Azure Database, translating the CR'sspecinto cloud provider API calls. - External Service Configuration: An
ExternalDNSCR controller might act as a gateway to update DNS records in an external DNS provider based onIngressorServicedefinitions. - AI Model Deployment: Imagine a
AIModelDeploymentcustom resource that defines how an AI model should be deployed and served. The controller for this CR would then be responsible for provisioning the necessary compute resources (e.g., GPU-enabled Pods), configuring model serving frameworks (like KServe or BentoML), and crucially, setting up an AI Gateway to manage access, traffic, and security for that deployed model.
This is where a product like ApiPark becomes highly relevant. APIPark, as an open-source AI gateway and API management platform, is specifically designed to manage, integrate, and deploy AI and REST services with ease. If your MyService custom resource, for instance, were to represent an AI inference endpoint (e.g., specifying an LLM model, its context window, and desired inference parameters), its controller could, after deploying the model, interact with APIPark. The controller could use APIPark's administrative api to:
- Register the newly deployed AI service: Allowing it to be discovered and managed by APIPark.
- Apply API management policies: Such as rate limiting, authentication, and traffic routing to the AI service.
- Standardize AI invocation: If your
MyServiceis deploying diverse AI models, APIPark's ability to unify API formats for AI invocation ensures that applications don't need to adapt to different model APIs, simplifying integration significantly. - Expose a managed API: The deployed AI service, controlled by your
MyServicecustom resource, can then be exposed through APIPark as a robust, managed API, complete with full lifecycle management, detailed logging, and performance monitoring.
In this context, your Golang controller, watching MyService changes, acts as a bridge: it ensures the underlying AI service (deployment, pods) is running in Kubernetes, and then it configures APIPark as the gateway to intelligently manage and expose that AI service as a performant and secure api. This collaborative approach leverages Kubernetes' declarative control for infrastructure and APIPark's specialized capabilities for AI and API management.
API Management for Custom Resource Exposure
While CRDs define internal APIs for Kubernetes, the services they manage often need to be consumed by external applications or even other internal microservices. This is where a dedicated API management platform, like APIPark, becomes indispensable. It serves as the external gateway for all your APIs, whether they are standard REST services or sophisticated AI models.
APIPark offers features vital for any production-grade API: * Unified API Format for AI Invocation: Crucially for AI-focused custom resources, APIPark standardizes request data formats across various AI models, meaning your application (or other services) consumes a consistent API regardless of the underlying AI model managed by your controller. * End-to-End API Lifecycle Management: From design and publication to invocation and decommissioning, APIPark helps regulate API management processes, traffic forwarding, load balancing, and versioning. * Security and Access Control: APIPark enables robust authentication, authorization, and subscription approval features, ensuring only authorized callers can access your APIs. This prevents unauthorized API calls and potential data breaches, which is critical for services managed by your custom resources. * Performance and Scalability: With performance rivaling Nginx (over 20,000 TPS with modest resources), APIPark can handle large-scale traffic for services controlled by your operators, supporting cluster deployment. * Observability: Detailed API call logging and powerful data analysis help businesses quickly trace issues and identify performance trends, ensuring system stability and data security for all APIs, including those serving custom resource-managed applications.
Therefore, a robust cloud-native solution often involves your Golang operator watching custom resource changes within Kubernetes to manage the desired state of your applications or AI models, while a powerful API Gateway like APIPark acts as the central point for managing and exposing these services to external consumers, ensuring security, performance, and seamless integration. This holistic approach combines the power of Kubernetes extensibility with enterprise-grade API management.
Advanced Topics and Best Practices
Building a production-ready Kubernetes controller involves more than just watching resources and reconciling. Several advanced topics and best practices are crucial for robustness, maintainability, and operational excellence.
Garbage Collection for Owned Resources
In our MyService example, we used metav1.NewControllerRef to set an OwnerReference on the Deployment and Service created by our controller. This is a critical best practice. Kubernetes' built-in garbage collector uses OwnerReferences to automatically delete "child" resources (like our Deployment and Service) when their "owner" resource (the MyService custom resource) is deleted. This ensures proper cleanup and prevents resource leaks without the controller having to explicitly delete every child object in its DeleteFunc. The Controller field in OwnerReference ensures that only one controller can own a particular resource, preventing conflicting reconciliations.
Finalizers for Graceful Deletion
While OwnerReferences handle cascading deletion automatically, sometimes a controller needs to perform complex cleanup operations before a custom resource is fully deleted. For instance, if MyService controlled an external database, the controller would need to de-provision that database before the MyService object itself is removed from Kubernetes. This is where Finalizers come in.
A finalizer is a string entry in a resource's metadata.finalizers list. When a resource with finalizers is deleted, Kubernetes doesn't immediately remove it. Instead, it marks the resource as "deletion-pending" (sets metadata.deletionTimestamp) and leaves it in the API. Your controller then sees this deletion-pending resource, performs its cleanup logic (e.g., calls the external database API to de-provision), removes its finalizer from the list, and only then will Kubernetes fully delete the object. This pattern ensures that all necessary external cleanup is completed before the Kubernetes object vanishes.
Context Cancellation for Shutdown
In Golang, context.Context is the standard way to carry deadlines, cancellation signals, and other request-scoped values across API boundaries. For long-running processes like controllers, it's essential to use context.Context (and specifically context.WithCancel) to manage graceful shutdowns. Our example uses a stopCh channel, which is a common pattern, but using a context.Context can be more idiomatic and allow for propagation of cancellation to all downstream functions that might be blocking or performing I/O. When your main function receives a termination signal, it should call cancel() on the context, which will then signal all goroutines listening on that context to gracefully shut down.
Metrics and Tracing for Observability
For any production system, observability is paramount. Kubernetes controllers should expose metrics (e.g., reconciliation duration, workqueue depth, error rates) and support distributed tracing to understand their behavior, identify bottlenecks, and debug issues.
- Metrics: Use Prometheus client libraries (e.g.,
github.com/prometheus/client_golang/prometheus) to expose metrics.client-goitself exposes useful metrics about API server requests and informer cache sizes. - Tracing: Integrate with tracing systems like OpenTelemetry or Jaeger to trace the flow of reconciliation across different components or even into external services. This is invaluable for debugging complex interactions.
Testing Controllers (Unit, Integration, E2E)
Thorough testing is critical for controller reliability.
- Unit Tests: Test individual functions and reconciliation logic in isolation using mock clients or fake Kubernetes objects.
client-goprovides aFakeClientsetandFakeInformerFactory(k8s.io/client-go/kubernetes/fakeandmy-controller/pkg/client/clientset/versioned/fake) specifically for this purpose. - Integration Tests: Test the interaction between your controller and a real (or simulated) Kubernetes API server. Tools like
envtest(part ofsigs.k8s.io/controller-runtime/pkg/envtest) allow you to spin up a lightweight, in-memory API server for testing without needing a full cluster. - End-to-End (E2E) Tests: Deploy your controller and CRDs to a full Kubernetes cluster (or a minikube/kind instance) and verify its behavior from a user's perspective (e.g., create a
MyServiceCR, then check if theDeploymentandServiceare created correctly).
By considering and implementing these advanced topics and best practices, you can elevate your Golang-based Kubernetes controllers from basic watchers to robust, production-grade operators capable of managing complex workloads with resilience and efficiency.
Conclusion
The ability to watch custom resource changes in Golang is the cornerstone of building intelligent, self-healing, and highly extensible cloud-native applications within the Kubernetes ecosystem. Throughout this comprehensive guide, we've journeyed from the foundational concepts of Custom Resource Definitions, which empower Kubernetes to understand new domain-specific APIs, to the intricate mechanisms of client-go Informers, the indispensable pattern for efficient and reliable resource observation.
We've explored how Informers abstract away the complexities of the low-level Kubernetes watch API, providing robust caching, automatic resynchronization, and resilient event delivery. By leveraging generated client code for custom resources, developers can seamlessly integrate their domain-specific objects into the powerful client-go framework. Furthermore, the integration of workqueues ensures that controller logic is decoupled, concurrent, and fault-tolerant, capable of handling transient errors and ensuring eventual consistency.
The example operator for MyService illustrated a practical application of these principles, demonstrating how a Golang controller can not only watch its custom resources but also reconcile them by managing dependent standard Kubernetes objects like Deployments and Services. We also thoughtfully bridged the discussion to the broader API and gateway landscape, highlighting how custom resources define internal APIs and how controllers often act as gateways to external systems. The mention of APIPark as an open-source AI gateway and API management platform served to demonstrate how services orchestrated by Kubernetes controllers can be effectively managed and exposed, ensuring security, performance, and streamlined access to both traditional REST and cutting-edge AI services.
Mastering these techniques in Golang empowers developers to build operators that transform Kubernetes into a truly specialized control plane for their unique needs, automating complex operational tasks and driving innovation in their cloud-native strategies. As the Kubernetes ecosystem continues to evolve, the operator pattern, fueled by the ability to react to custom resource changes, will remain a critical pillar for building the next generation of resilient and intelligent applications.
5 Frequently Asked Questions (FAQs)
1. What is the main difference between using the low-level Watch API and Informers in client-go for watching resources? The main difference lies in robustness and efficiency. The low-level Watch API provides a raw stream of events, requiring manual implementation of initial listing, error handling, re-connection logic, resourceVersion management, and caching. Informers, on the other hand, are a higher-level abstraction that automatically handle all these complexities. They perform the "List-then-Watch" pattern, maintain a local, in-memory cache, manage resourceVersion, handle re-connections and periodic resynchronizations, and provide a convenient callback mechanism for event handling. For production-ready controllers, Informers are the overwhelmingly preferred and recommended approach.
2. Why do I need to generate client code for my Custom Resources (CRDs)? Can't I just use the generic client-go? While you can use the generic DynamicClient from client-go to interact with custom resources without generating specific code, generating client code (using tools like controller-gen) provides several significant advantages. It generates strongly-typed Go structs for your CRD, leading to compile-time type checking, improved readability, and reduced boilerplate. It also generates a dedicated Clientset, Listers, and Informers tailored to your custom API group and version, which are crucial for efficiently watching and managing your CRs with the Informer pattern and accessing objects from the local cache in a type-safe manner. This greatly simplifies controller development and reduces the risk of runtime errors.
3. What is the purpose of SharedInformerFactory and why is it important for controllers? SharedInformerFactory is a factory for creating and managing multiple Informers. Its importance stems from its efficiency. If your controller (or operator) needs to watch several different resource types (e.g., your custom MyService CRs, plus standard Deployments and Services), or if multiple internal components of your controller need to watch the same resource type, SharedInformerFactory ensures that only one List operation and one Watch stream are established per resource type with the Kubernetes API server. All Informers created by the factory then share the same underlying cache, significantly reducing API server load, memory footprint, and network traffic compared to running independent Informers for each consumer.
4. How does a controller handle cleanup when a Custom Resource is deleted? There are two primary mechanisms: * OwnerReferences (Automatic Garbage Collection): This is the simplest and preferred method for child resources directly managed by the controller. By setting the OwnerReference on a managed resource (e.g., a Deployment owned by a MyService CR) to point back to the owner CR, Kubernetes' built-in garbage collector will automatically delete the child resource when the owner CR is deleted. * Finalizers (Graceful, Pre-deletion Cleanup): For situations requiring complex cleanup before the CR is fully removed (e.g., de-provisioning an external cloud service), Finalizers are used. When a CR with finalizers is deleted, Kubernetes marks it for deletion but doesn't remove it. The controller sees this deletion-pending state, performs its custom cleanup logic, removes its finalizer from the CR, and only then does Kubernetes proceed with the actual deletion of the CR.
5. How do concepts like "API" and "Gateway" relate to watching Custom Resource Changes in Golang? Custom Resources fundamentally define new APIs within the Kubernetes system itself, allowing users to interact with domain-specific abstractions declaratively. Your Golang controller watches these custom resource APIs to understand the desired state. Often, these controllers act as "Internal Gateways," translating the desired state into actions on underlying Kubernetes resources or even external systems. For services managed by these custom resources that need external exposure or advanced management, a dedicated API Gateway like APIPark comes into play. APIPark serves as the external gateway, providing unified API management (authentication, rate limiting, traffic routing, lifecycle management) for services, including those deployed and configured by your custom resource controllers. This creates a holistic solution where Kubernetes handles internal orchestration and an API Gateway manages external interaction.
🚀You can securely and efficiently call the OpenAI API on APIPark in just two steps:
Step 1: Deploy the APIPark AI gateway in 5 minutes.
APIPark is developed based on Golang, offering strong product performance and low development and maintenance costs. You can deploy APIPark with a single command line.
curl -sSO https://download.apipark.com/install/quick-start.sh; bash quick-start.sh

In my experience, you can see the successful deployment interface within 5 to 10 minutes. Then, you can log in to APIPark using your account.

Step 2: Call the OpenAI API.

