Nucleus Logo

Nucleus

Manually Triggering a Kubernetes Cronjob in Golang

Nick Zelei

|

|

8min

single cube

Intro

Every Engineer at some point in their career has mostly likely had to work with cron or some sort of cron-based system in order to schedule jobs to perform tasks. Kubernetes has a Cronjob feature that allows one to replicate the old school Unix behavior in the cloud-native world.

It’s pretty straightforward to create a Cronjob in K8s, but something that we found is that the cronjob doesn't automatically run the first time after creating it. In our case, we wanted to create the cronjob and then immediately trigger and start on schedule. This can be problematic if a job runs infrequently, or if you simply to test the thing. Otherwise, you have to make up a schedule so that it runs right after you create it which gets hacky.

So we wanted to show how we were able to programmatically and manually (those two words sound weird together?) trigger the first cronjob and then watch it get picked up on it's normal schedule. This blog will take us through how a cronjob is created via a declaration and applied with kubectl, and then manually create an instance of the job. We’ll then go into how we can replicate this behavior in Golang, which can be applied in a more automated fashion!

To see all of the code snippets in one place, view the repo here .

Basic Setup

Let's create a basic cronjob that we can apply directly using the kubernetes CLI kubectl.

apiVersion: batch/v1
kind: CronJob
metadata:
  name: hello-world
  namespace: default
spec:
  schedule: '0 */10 * * *'
  successfulJobsHistoryLimit: 3
  suspend: false
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: hello-world
              image: alpine
              imagePullPolicy: IfNotPresent
              command:
                - /bin/sh
                - -c
                - |-
                  echo "Hello World"
          restartPolicy: Never

How to manually run the cronjob

This job will be created and will now run every 10 hours. The cronjob schedule syntax follows {min} -> {hour} -> {day} -> {month} -> {day of the week}, from left to right. So on the day variable we select /10 to indicate that the job should run every 10 hours. To invoke the job manually, you can use kubectL to create a Job directly.

This is the command:

$ kubectl create job --from=cronjob/<cronjob name> <job name>

Give the job a name and invoke the command.

$ kubectl create job --from=cronjob/hello-world hello-world-01

The job can be found by running:

$ kubectl get jobs

With the output looking like this:

NAME             COMPLETIONS   DURATION   AGE
hello-world-01   1/1           5s         5s

You can then describe the job by:

$ kubectl describe job hello-world-01

The output should look like this:

Name:             hello-world-01
Namespace:        default
Selector:         controller-uid=7cd23eb4-8579-4352-b64d-ab8d378f496a
Labels:           controller-uid=7cd23eb4-8579-4352-b64d-ab8d378f496a
                  job-name=hello-world-01
Annotations:      cronjob.kubernetes.io/instantiate: manual
Parallelism:      1
Completions:      1
Completion Mode:  NonIndexed
Start Time:       Sat, 22 Oct 2022 13:22:02 -0700
Completed At:     Sat, 22 Oct 2022 13:22:07 -0700
Duration:         5s
Pods Statuses:    0 Active / 1 Succeeded / 0 Failed
Pod Template:
  Labels:  controller-uid=7cd23eb4-8579-4352-b64d-ab8d378f496a
           job-name=hello-world-01
  Containers:
   hello-world:
    Image:      alpine
    Port:       <none>
    Host Port:  <none>
    Command:
      /bin/sh
      -c
      echo "Hello World"
    Environment:  <none>
    Mounts:       <none>
  Volumes:        <none>
Events:
  Type    Reason            Age   From            Message
  ----    ------            ----  ----            -------
  Normal  SuccessfulCreate  12s   job-controller  Created pod: hello-world-01-8l7pl
  Normal  Completed         7s    job-controller  Job completed

The pod name can be seen in the Event log, and from there, we can use that to see the log output from the job.

$ kubectl logs hello-world-01-8l7pl

Outputting:

Hello World

Cleanup

$ kubectl delete cronjob hello-world

Translating to Golang

Main

Let’s start with the main driver file

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/google/uuid"
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
	clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

func main() {
	kubeclient, err := getKubeClient()
	if err != nil {
		panic(err)
	}

	namespace := "default"
	cronjobName := "hello-world"
	ctx := context.Background()

	cj, err := assertCronjob(ctx, kubeclient, namespace, cronjobName)
	if err != nil {
		panic(err)
	}

	err = manuallyTriggerCronjobAndWait(ctx, kubeclient, namespace, cj)
	if err != nil {
		panic(err)
	}
}

The basic flow of the program goes in three steps:

First, you initialize a kubernetes client. Then, you assert the cronjob exists. This just means we want to create the cronjob, or just retrieve it if it already has been created. Lastly, you manually trigger an instance of the cronjob, and wait for it to complete before we exit

Kubernetes Client Creation

Let’s dive into getting ourselves a kubernetes client.

func getKubeClient() (kubernetes.Interface, error) {
  homeDir, err := os.UserHomeDir()
	if err != nil {
		return nil, err
	}

	kubeConfigPath := fmt.Sprintf("%s/.kube/config", homeDir)
	cfg, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
	if err != nil {
		return nil, err
	}

	kubeclient, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		return nil, err
	}
	return kubeclient, nil
}

This code pulls the kube config from the default location, which is typically ~/.kube/config - loads it, and initializes a new kubernetes client.

Asserting the Cronjob

func assertCronjob(
	ctx context.Context,
	kubeclient kubernetes.Interface,
	namespace string,
	cronjobName string,
) (*batchv1.CronJob, error) {
	cronjobapi := kubeclient.BatchV1().CronJobs(namespace)

	cj, err := cronjobapi.Create(ctx, &batchv1.CronJob{
		ObjectMeta: metav1.ObjectMeta{
			Name:      cronjobName,
			Namespace: namespace,
		},
		Spec: batchv1.CronJobSpec{
			Schedule:                   "0 */10 * * *",
			SuccessfulJobsHistoryLimit: Int32(3),
			Suspend:                    Bool(false),
			JobTemplate: batchv1.JobTemplateSpec{
				Spec: batchv1.JobSpec{
					Template: corev1.PodTemplateSpec{
						Spec: corev1.PodSpec{
							RestartPolicy: corev1.RestartPolicyNever,
							Containers: []corev1.Container{
								{
									Name:            "hello-world",
									Image:           "alpine",
									ImagePullPolicy: corev1.PullIfNotPresent,
									Command: []string{
										"/bin/sh",
										"-c",
										`echo "Hello World"`,
									},
								},
							},
						},
					},
				},
			},
		},
	}, metav1.CreateOptions{})
	if err != nil && !errors.IsAlreadyExists(err) {
		return nil, err
	}
	if err != nil && errors.IsAlreadyExists(err) {
		cj, err := cronjobapi.Get(ctx, cronjobName, metav1.GetOptions{})
		if err != nil {
			return nil, err
		}
		return cj, nil
	}
	return cj, nil
}

func Int32(val int32) *int32 {
	return &val
}

func Bool(val bool) *bool {
	return &val
}

This code initializes the cronjob api client by namespace. It then creates the cronjob, or retrieves it if it already exists. The cronjob spec should look identical to the YAML variant defined at the beginning of this post.

Manually Triggering the Job

This is the fun part, and ultimately what this entire article is written around. How do we actually replicate the kubectl create job --from=cronjob/<cronjob name> <job name> ??

We want to create the job, and wait for it to complete. Technically the wait is optional, but I thought it was good measure to add that piece in for brevity.

func manuallyTriggerCronjobAndWait(
	ctx context.Context,
	kubeclient kubernetes.Interface,
	namespace string,
	cronjob *batchv1.CronJob,
) error {
	job, err := manuallyTriggerCronjob(
		ctx,
		kubeclient,
		namespace,
		cronjob,
	)
	if err != nil {
		return err
	}
	return waitForJobCompletion(ctx, kubeclient, job)
}

The basic driver function is pretty simple and look like what was just described above. First, you manually trigger the cronjob then you wait for job completion.

func manuallyTriggerCronjob(
	ctx context.Context,
	kubeclient kubernetes.Interface,
	namespace string,
	cronjob *batchv1.CronJob,
) (*batchv1.Job, error) {
	jobspec := cronjob.Spec.JobTemplate.Spec

	jobapi := kubeclient.BatchV1().Jobs(namespace)
	job, err := jobapi.Create(ctx, &batchv1.Job{
		ObjectMeta: metav1.ObjectMeta{
			Namespace: namespace,
			Name:      fmt.Sprintf("%s-%s", cronjob.Name, uuid.New().String()),
			OwnerReferences: []metav1.OwnerReference{
				{
					APIVersion: "batch/v1", // this value is not populated by k8s client cronjob.APIVersion
					Kind:       "CronJob",  // this value is not populated by k8s client cronjob.Kind
					Name:       cronjob.Name,
					UID:        cronjob.UID,
				},
			},
		},
		Spec: jobspec,
	}, metav1.CreateOptions{})
	if err != nil {
		return nil, err
	}
	return job, nil
}

To manually trigger the job, we want to initialize the job api, which is found under the BatchV1 section of the kubeclient. The code here ends up being pretty minimal. We want to create a new job, and the Spec of the job is the same interface of the CronJob’s Spec.JobTemplate.Spec - so that can be passed directly into the job’s Spec property.

An additional nice-to-have is to set the OwnerReference to the cronjob that we are initializing it from. This way we have metadata about where the job actually came from. This is useful if we want to delete the cron job, as this will cause any jobs sourced from it to also be cleaned up. The final step is to wait for the job to complete, successfully or not, before exiting.

func waitForJobCompletion(
	ctx context.Context,
	kubeclient kubernetes.Interface,
	job *batchv1.Job,
) error {
	jobapi := kubeclient.BatchV1().Jobs(job.Namespace)

	watcher, err := jobapi.Watch(ctx, metav1.SingleObject(job.ObjectMeta))
	if err != nil {
		return err
	}

	defer watcher.Stop()

	for {
		select {
		case <-ctx.Done():
			return fmt.Errorf("request timeout reached")
		case <-time.After(3 * time.Minute):
			return fmt.Errorf("timeout reached waiting for job completion")
		case event := <-watcher.ResultChan():
			updatedJob, ok := event.Object.(*batchv1.Job)
			if !ok {
				return fmt.Errorf("couldn't cast event watcher object to job; object was %T", event.Object)
			}
			for _, condition := range updatedJob.Status.Conditions {
				if condition.Type == batchv1.JobComplete {
					return nil
				} else if condition.Type == batchv1.JobFailed {
					fmt.Println("job failed to complete successfully", condition.Message)
					return fmt.Errorf("job failed to complete successfully")
				}
			}
		}
	}
}

To do this, we take advantage of k8’s Watch interface. This allows us to watch an object to keep track of any status updates that we care about.

In this case, we are iterating over the result channel and keeping an eye on the status conditions. We exit when we find a condition that has the type of JobComplete or JobFailed. Pretty slick!

$ go run main.go
$ kubectl get jobs
NAME                                               COMPLETIONS   DURATION   AGE
hello-world-c825c853-3a86-46e8-a77e-fc32ea0837a6   1/1           3s         9s
$ kubectl get pods
NAME                                                     READY   STATUS      RESTARTS   AGE
hello-world-c825c853-3a86-46e8-a77e-fc32ea0837a6-lgcqf   0/1     Completed   0          18s
$ kubectl logs hello-world-c825c853-3a86-46e8-a77e-fc32ea0837a6-lgcqf
Hello World

Cleanup

$ kubectl delete cronjob hello-world

Closing

Being able to programmatically create and trigger cronjobs can be useful in different scenarios depending on what you are doing. In this case, it was useful for us at Nucleus because we use a cronjob to refresh a token that expires every 12 hours. For us, this is ultimately used to configure a Docker login Secret that a service account uses to read and write from the ECR docker registry. The cronjob runs every 10 hours to ensure that the access token stays up to date. However, we needed to run the job manually after initial creation to populate the token. Otherwise, the service account wouldn’t be able to do much until the cronjob finally got around to triggering.

Table of Contents

  • Intro
  • Basic Setup
  • How to manually run the cronjob
  • Cleanup
  • Translating to Golang
  • Closing

Latest Articles

blog_image

Product

3 types of Zero-Downtime Deployments in Kubernetes

A guide to the 3 types of zero-downtime deployments in Kubernetes

|

|

5min

Subscribe to new blogs from Nucleus.