Kubernetes with Java - Handling Events

Handling Kubernetes API events in Java

This post is a continuation of Kubernetes with Java - Asynchronous APIs which showed us how to interact with the Kubernetes API to list deployment resources on demand. In this post, we will learn how to handle Kubernetes API events in near real-time.

An example use case for this capability might be to send a notification to a slack channel whenever an app deployment is started or when it finishes and becomes ready to be consumed. Even though this example will just deal with the Kubernetes Deployment resources - the same mechanism of why/when events are emitted applies to other kind of Kubernetes resources as well.

The deployment object

Before we get started on the Java side, lets see what a sample Deployment resource YAML looks like before it is applied.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: echo-app
  labels:
    team: cypress
spec:
  replicas: 2
  progressDeadlineSeconds: 60
  selector:
    matchLabels:
      app: echo-app
  template:
    metadata:
      labels:
        app: echo-app
    spec:
      containers:
      - name: main
        image: k8s.gcr.io/echoserver:1.10
        ports:
        - containerPort: 8080
          protocol: TCP
        readinessProbe:
          httpGet:
            path: /
            port: 8080
          initialDelaySeconds: 3
          periodSeconds: 3

The above YAML will deploy an echo-server which is a simple web-server that echoes back information about the requests. It listens on port 8080 and contains a readiness-probe which will help Kubernetes determine when a pod (and the container in it) becomes ready. Without this probe, Kubernetes would immediately assume that the pod was ready on application of the YAML. We will leave talking about readiness and other kinds of probes for a later post. Along with this probe, the Deployment also specifies that Kubernetes has 60 seconds to apply any changes to this resource. If the deadline is reached the Deployment will be considered a failure.

We can apply this Deployment by running:

kubectl apply -f k8s/echo.yml

… and then check on progress of deployments by running:

kubectl get deployments

… this will bring back, something like:

NAME       READY   UP-TO-DATE   AVAILABLE   AGE
echo-app   2/2     2            2           2m

Notice the READY column telling us that the 2 out of 2 pods are ready and that 2 of them are up-to-date. All the information displayed here give us an indication that the app deployment succeeded. You might have to try running the above command several times until the echo-app is ready. This is because applying the Deployment resource (or any other resource) with kubectl is not a synchronous operation. Components within Kubernetes, called controllers, go to work as soon as you apply the YAML. These controllers try to make the information in the provided YAML a reality by deploying 2 pods running the echo-server. Along the way, the system keeps updating the state of the Deployment which is what kubectl looks at to tell us if the Deployment is ready or not.

The few peices of information we see in the kubectl output are actually NOT what the Kubernetes API returns if we were to ask for details for a Deployment resource. What you see instead in the output above is a summary generated by kubectl. To preview what our Java app might see and what kubectl actually gets from the Kubernets API, we can run the following command:

kubectl get deployment/echo-app -o yaml

The returned YAML looks like the original YAML input we gave the kubectl command. But on closer inspection, it is more than that. Kubernetes API has filled in lots of details. In some cases, it is the system defaults that we omitted from our YAML and are showing up now because the Kubernetes API filled them in. In other cases, it is system generated IDs, dates, and current status of the state of the resource. See the sample output below (some of the information is omitted):

apiVersion: apps/v1
kind: Deployment
metadata:
  creationTimestamp: "2021-12-21T03:23:05Z"
  labels:
    team: cypress
  name: echo-app
  resourceVersion: "3811292"
  uid: 8f585af8-a67d-4144-937f-ee834fa21fe8
spec:
  replicas: 2
  progressDeadlineSeconds: 60
  selector:
    matchLabels:
      app: echo-app
  template:
    metadata:
      labels:
        app: echo-app
    spec:
      containers:
      - image: k8s.gcr.io/echoserver:1.10
        name: main
        ports:
        - containerPort: 8080
          protocol: TCP
        readinessProbe:
          failureThreshold: 3
          httpGet:
            path: /
            port: 8080
            scheme: HTTP
          initialDelaySeconds: 3
          periodSeconds: 3
status:
  availableReplicas: 2
  conditions:
  - lastTransitionTime: "2021-12-21T03:34:26Z"
    lastUpdateTime: "2021-12-21T03:34:26Z"
    message: Deployment has minimum availability.
    reason: MinimumReplicasAvailable
    status: "True"
    type: Available
  - lastTransitionTime: "2021-12-21T03:34:14Z"
    lastUpdateTime: "2021-12-21T03:34:26Z"
    message: ReplicaSet "echo-app-f56f666" has successfully progressed.
    reason: NewReplicaSetAvailable
    status: "True"
    type: Progressing
  observedGeneration: 1
  readyReplicas: 2
  replicas: 2
  updatedReplicas: 2

In the above output, notice the metadata/uid, metadata/resourceVersion, and status elements. Kubernetes API assigned these additional details (amongst others) when we applied our YAML originally. The metadata/uid was assigned the very first time Kubernetes API discovered this new object. It considers something new by looking at the apiVersion, kind, and metadata/name attributes. This happens regardless of what kind of object it is. Another important thing that happened was the assignment of metadata/resourceVersion - this number is assigned/updated whenever there is ANY change in the state of the resource. The change could have been applied by us or any of the involved controllers that are part of the system. From the time we ran the kubectl apply ... command and when the deployment and its pods became ready, there might have been few different resource-versions assigned and a series of events emitted.

For the update event, Kubernetes API emits the old version and the new version of the resource with the event.

Most information in Kubernetes cluster is shared via events. Components inside a Kubernetes cluster rely on these events to perform work. There is an event emitted when a new resource is discovered by the cluster, another event when the resource is deleted, and a whole lot of update events in response to Kubernetes API assigning different versions to the resource as its state changes. Along with each event, information that looks like the above YAML gets emitted. For the update event, Kubernetes API emits the old version and the new version of the resource with the event.

Status condition transitions

Our Java app will subscribe to this event stream and determine when a deployment was started and when it finished by only looking at the update events. In each update event, there will be two versions of the Deployment resource present. The old version and the new version. We will be comparing these versions programmatically to determine when something interesting has happened.

For a new Deployment resource, looking at the Progressing condition in the status/conditions array, we can determine when a deployment was started and finished by looking for the events data transitions that look like the following:

New Deployment - Success

When something goes wrong and deployment’s pods don’t become ready for 60 seconds (see the spec/progressDeadlineSeconds parameter) - the transitions will look like the following:

New Deployment - Failure

The reason why ReplicaSet are mentioned so much in these events is because that is how Deployments work in Kubernetes. When we apply a new version of the Deployment YAML, the deployment controller running inside Kubernetes decides to spin a new ReplicaSet and keeps updating the Deployment state (and emitting events) as the new ReplicaSet becomes available.

In case where we have an existing deployment with running Pods and we apply a new version of the YAML (for example, because a new version of the app image became available) - the events data transitions will look like the following:

Existing Deployment - Success

Notice the NewReplicaSetAvailable condition present in the old resource version. This was not present when we were starting with a fresh new Deployment. Finally, the non-happy path events data transitions, when dealing with an existing Deployment, will look like the following:

Existing Deployment - Failure

You might be thinking that instead of paying attention to the status/conditions array, can’t we just pay attention to the various replicas related attributes of status. The short answer is NO we can’t. This is because we would have to pay close attention to current replicas, desired replicas, and the deployment strategy being used and that can get really tricky. That is why we will stick to looking at the status/conditions array.

Java implementation

Now that we understand what to expect from the Kubernetes API as the Deployment is applied and becomes ready, we will get to implementing it in Java. Our use case mentioned at the beginning of the post talked about sending a message to Slack when a deployment started or finished. Since we want to concentrate on our interactions with the Kubernetes API in a Java app, we will leave interacting with the Slack APIs for you to implement on your own. Sending a message to Slack is not that complicated and is well documented on the Slack dev site.

In the Kubernetes with Java - Asynchronous APIs post, we saw SharedIndexInformer from the Kubernetes API being used to store local copies of deployment objects. It turns out that the same informer object can be used to handle events as they happen. The following code shows how to register an event-handler object on a newly created informer object:

import io.kubernetes.client.informer.SharedIndexInformer;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.apis.AppsV1Api;
import io.kubernetes.client.openapi.models.V1Deployment;
import io.kubernetes.client.openapi.models.V1DeploymentList;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Slf4j
@Service
public class DeploymentEventsService {

    private final SharedIndexInformer<V1Deployment> informer;

    public DeploymentEventsService(
            ApiClient client,
            SharedInformerFactory informerFactory,
            @Value("${namespace}") String namespace) {

        informer = informerFactory.sharedIndexInformerFor(
                params -> new AppsV1Api(client).listNamespacedDeploymentCall(
                        namespace,
                        null,
                        null,
                        null,
                        null,
                        "team",
                        null,
                        params.resourceVersion,
                        null,
                        params.timeoutSeconds,
                        params.watch,
                        null),
                V1Deployment.class,
                V1DeploymentList.class);

        informer.addEventHandler(new DeploymentEventsHandler());
    }

    @PostConstruct
    public void init() {
        informer.run();
    }

    @PreDestroy
    public void destroy() {
        informer.stop();
    }
}

The code above should look familiar - it is similar to the code shown in Kubernetes with Java - Asynchronous APIs post. The big difference is that we additionally register an event-handler with the informer. In this example, DeploymentEventsHandler is responsible for handling deployment related events as it implements the ResourceEventHandler interface. Here is what it looks like:

import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.openapi.models.V1Deployment;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;

@Slf4j
public class DeploymentEventsHandler implements ResourceEventHandler<V1Deployment> {

    @Override
    public void onAdd(V1Deployment obj) {
        /* don't need to pay attention to this event for our use case */
    }

    @Override
    public void onDelete(V1Deployment obj, boolean deletedFinalStateUnknown) {
        /* don't need to pay attention to this event for our use case */
    }

    @Override
    public void onUpdate(V1Deployment oldObj, V1Deployment newObj) {

        if (!hasCondition(oldObj, "ReplicaSetUpdated", "True") && hasCondition(newObj, "ReplicaSetUpdated", "True")) {
            /* Deployment is started */
            logDeploymentStatus(newObj, "Started");
        } else if (hasCondition(oldObj, "ReplicaSetUpdated", "True")) {
            if (hasCondition(newObj, "NewReplicaSetAvailable", "True")) {
                /* Deployment is finished successfully */
                logDeploymentStatus(newObj, "Finished/Success");
            } else if (hasCondition(newObj, "ProgressDeadlineExceeded", "False")) {
                /* Deployment is finished in a failure - progress-deadline exceeded */
                logDeploymentStatus(newObj, "Finished/Failed");
            }
        }
    }

    private boolean hasCondition(V1Deployment deployment, String progressingReason, String progressingStatus) {
        return Optional.ofNullable(deployment.getStatus())
                .filter(status -> status.getConditions() != null)
                .filter(status -> hasReasonAndStatus(status, progressingReason, progressingStatus))
                .isPresent();
    }

    private boolean hasReasonAndStatus(V1DeploymentStatus status, String progressingReason, String progressingStatus) {
        return status.getConditions().stream()
                .filter(c -> c.getType().equals("Progressing") && c.getReason().equals(progressingReason) && c.getStatus().equals(progressingStatus))
                .findFirst()
                .isPresent();
    }
    
    private void logDeploymentStatus(V1Deployment deployment, String status) {
        /* instead of logging we could put a message in a slack channel */
        log.info("***** Deployment: App={}, Team={}, Status={} *****",
                deployment.getMetadata().getName(),
                deployment.getMetadata().getLabels().get("team"),
                status);
    }
}

The required event-handler methods can pretty much do anything since the whole V1Deployment object is available to them. For this use case, we were only paying attention to the update events for certain Progressing condition transitions discussed earlier.

We left the onAdd() method empty since that would have ONLY helped us detect a deployment starting for new Deployment resources. Instead the main logic is implemented in the onUpdate() method where we can more reliably detect deployments starting for new and existing Deployment resources. It is also the method where we detect deployments finishing either successfully or in a failure because the deployment took longer time than what was set as the progress-deadline. Finally, we left the onDelete() empty as that event would not provide any useful information for our use case.

Conclusion

The main thing to learn here is that we can listen to events of any Kubernetes resource by implementing an appropriate ResourceEventHandler and registering it with the right kind of informer. The way we had to inspect the status/conditions array for data transitions was very specific to detecting when a deployment was started and when it finished for Deployment resources. The logic and data elements involved will certainly be different and be specific to the use case when dealing with events for other kind of Kubernetes resources. Please reach out if you’d like to hear more about these topics.

References


See also