# Building a Cron Scheduler with RabbitMQ in Go

## Introduction

This project is about a cron scheduler service which schedules cron jobs that are created using a dashboard interface. It publishes the jobs to the consumer service through RabbitMQ job queue, where it acknowledges successful job execution and retries for upto 3 times on job failures. And thus updates the job status in PostgreSQL database for tracking through the dashboard interface.

## Some Theory

1. `Cron jobs`: These are jobs/tasks which are scheduled to run at a specific time. They can be configured to run only at a given specific time, repeat every day at a specific time, repeat every x hours, etc by using `cron expressions`.
    
2. [`RabbitMQ`](https://www.rabbitmq.com/): It is an open-source `message broker` that allows two services to communicate asynchronously. It provides a queue through which two separate services can communicate by sending messages. This is one of the best communication methods used by distributed systems. We will be using the AMQP (Advanced Message Queuing Protocol) in this project supported by RabbitMQ.
    
3. [`AMQP`](https://www.rabbitmq.com/tutorials/amqp-concepts#amqp-model): The Advanced Message Queuing Protocol is an open standard protocol where `messages` (data) are published through `exchanges` to `queues`. The queues are connected with exchanges using `bindings` which define the exchanges to which queue the data should be sent. Then other service can consume the messages over the queue
    

## Project Architecture

![](https://cdn.hashnode.com/res/hashnode/image/upload/v1751095740803/e45f43c2-09df-405c-8eff-d0796f8608b6.jpeg align="center")

This project will have two main components:

1. `scheduler`  
    This will host our dashboard and schedule the cron jobs. On cron timings, the jobs will be published to the queue.
    
2. `consumer`  
    This will consume messages from the queue and process them based on the job type. On success, it will register in PostgreSQL DB. On failure, it will try for up to maximum 3 times. If job fails for 3 retries, it will register as permanently\_failed in the DB. This will also `ACK` (acknowledge) or `NACK` (un-acknowledge and put back to queue) the jobs depending on the results.
    

Find this project here:

%[https://github.com/Aniketyadav44/cronflow] 

> Note:
> 
> The dashboard in this project has pages to display basic stats, listing all of the scheduled jobs and listing job run entries with status (running/completed/failed) which are not explained in this blog.
> 
> The main purpose of this blog is to only explain the code for core logic of cron job scheduling, publishing to and consuming from RabbitMQ, processing the job based on type (ping, email, slack, webhook) with acknowledgement & retries and registering everything on the PostgreSQL database.
> 
> To understand the project structure used here, please refer to this blog: [Effective Project Structure for Backend Projects in Go](https://blog.anikety.com/go-backend-project-structure) written by me!

### Scheduling cron job & Publishing to RabbitMQ

In scheduler service’s `apiService.go` file, which is located at [`/scheduler/internal/services/apiService.go`](https://github.com/Aniketyadav44/cronflow/blob/main/scheduler/internal/services/apiService.go)

```go
type ApiService struct {
	db        *sql.DB
	cron      *cron.Cron
	rabbitmq  *amqp091.Connection
	mqChannel *amqp091.Channel
}

func (s *ApiService) CreateNewJob(job *models.Job) error {
	// first putting this job in db
	query := `INSERT INTO jobs(cron_expr, type, payload)
			  VALUES ($1, $2, $3) RETURNING id;
	`
	payloadJSON, _ := json.Marshal(job.Payload)
	var jobId int
	if err := s.db.QueryRow(query, job.CronExpr, job.Type, payloadJSON).Scan(&jobId); err != nil {
		return err
	}
	job.Id = jobId

	// scheduling a cron job
	id, err := s.cron.AddFunc(job.CronExpr, func() {
		log.Println("running cron job: publishing to rabbitmq")
		q, err := s.mqChannel.QueueDeclare("cron_events", false, false, false, false, nil)
		if err != nil {
			log.Println("failed creating a queue for rabbitmq: ", err.Error())
			return
		}

		ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
		defer cancel()

		jsonBody, err := json.Marshal(map[string]any{
			"job":  job,
			"time": time.Now().Format("2006-01-02T15:04:05.000000-07:00"),
		})
		if err != nil {
			log.Println("failed creating payload for rabbitmq: ", err.Error())
			return
		}

		if err := s.mqChannel.PublishWithContext(ctx, "", q.Name, false, false, amqp091.Publishing{
			ContentType: "application/json",
			Body:        jsonBody,
		}); err != nil {
			log.Println("failed publishing to rabbitmq: ", err.Error())
			return
		}
		log.Println("event published to rabbitmq!")

	})
	if err != nil {
		// on cron scheduling error, deleting the job created in db
		delQuery := `DELETE FROM jobs WHERE id = $1`
		s.db.Exec(delQuery, jobId)
		return err
	}

	updateQuery := `UPDATE jobs SET cron_id = $1 WHERE id = $2`
	s.db.Exec(updateQuery, id, jobId)
	job.CronId = int(id)
	return nil
}
```

This is a service function that we have created for our `ApiService` and called from `/api/create` API’s handler.  
So, we first insert the job into the `jobs` table

```go
query := `INSERT INTO jobs(cron_expr, type, payload)
			  VALUES ($1, $2, $3) RETURNING id;
	`
	payloadJSON, _ := json.Marshal(job.Payload)
	var jobId int
	if err := s.db.QueryRow(query, job.CronExpr, job.Type, payloadJSON).Scan(&jobId); err != nil {
		return err
	}
	job.Id = jobId
```

After inserting, we update `id` of the `job` variable.

Then we schedule the cron job using [`robfig/cron`](https://github.com/robfig/cron) package. We have loaded the cron instance from this package into our `ApiService`.

The cron job is scheduled using [`AddFunc()`](https://pkg.go.dev/github.com/robfig/cron/v3@v3.0.1#Cron.AddFunc) function from the package, which takes two parameters, the cron expression and the function to run for this cron.

The cron expression we use is:

```plaintext
x y * * *  
```

Where `x` is minute and `y` is hour to repeat every day. This cron expression is inside our `job` model which we access as `job.CronExpr`.

```go
// scheduling a cron job
	id, err := s.cron.AddFunc(job.CronExpr, func() {
		log.Println("running cron job: publishing to rabbitmq")
		q, err := s.mqChannel.QueueDeclare("cron_events", false, false, false, false, nil)
		if err != nil {
			log.Println("failed creating a queue for rabbitmq: ", err.Error())
			return
		}
```

Inside the function that will be invoked at cron time,

We create a queue for the RabbitMQ using [`rabbitmq/amqp091-go`](https://github.com/rabbitmq/amqp091-go) package’s [`QueueDeclare()`](https://pkg.go.dev/github.com/rabbitmq/amqp091-go@v1.10.0#Channel.QueueDeclare) function.  
We have stored the RabbitMQ’s channel inside our `ApiService` as `mqChannel` which is of type `*amqp091.Channel`.

So we create the queue by passing name of the queue which in this case is `cron_events`, and then check for any errors in creating of the queue.

This creates the queue(if not already exists) on which the consumer will get to consume the messages. Then we prepare the payload to send in the message over queue.

```go
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

jsonBody, err := json.Marshal(map[string]any{
	"job":  job,
	"time": time.Now().Format("2006-01-02T15:04:05.000000-07:00"),
})
if err != nil {
	log.Println("failed creating payload for rabbitmq: ", err.Error())
	return
}
```

Here, we first created a context with 5 seconds timeout to be used for publishing the message to queue. It is important in case it takes more than 5 seconds to publish the message.

Then we create a json with `job` key that will store our job model’s json data and `time` key which stores the current schedule time of this cron job.

Then we check for any errors in creating the payload body and continue to publish the message

```go
if err := s.mqChannel.PublishWithContext(ctx, "", q.Name, false, false, amqp091.Publishing{
	ContentType: "application/json",
	Body:        jsonBody,
}); err != nil {
	log.Println("failed publishing to rabbitmq: ", err.Error())
	return
}
```

We publish the message to queue using [`PublishWithContext()`](https://pkg.go.dev/github.com/rabbitmq/amqp091-go@v1.10.0#Channel.PublishWithContext) function on `mqChannel` and pass the timeout context, keep the default `““` exchange and provide queue’s name.

In this, we have also passed our payload using `amqp091.Publishing{}` where we have mentioned the `ContentType` and `Body`.

We then check for any errors in publishing the message to queue.

Then after scheduling the cron job and defining the function to run for cron, we check for any errors in scheduling of cron job

```go
if err != nil {
	// on cron scheduling error, deleting the job created in db
	delQuery := `DELETE FROM jobs WHERE id = $1`
	s.db.Exec(delQuery, jobId)
	return err
}
```

If there were errors, then we delete the DB entry we made for this job earlier.

```go
updateQuery := `UPDATE jobs SET cron_id = $1 WHERE id = $2`
s.db.Exec(updateQuery, id, jobId)
job.CronId = int(id)
return nil
```

If there were no errors, we finally update the `cron_id` of our job in the DB’s entry.

### Listening on the RabbitMQ’s Job Queue & Consuming Jobs

In consumer service’s `rabbitmqService.go` file, which is located at [`/consumer/internal/services/rabbitmqService.go`](https://github.com/Aniketyadav44/cronflow/blob/main/consumer/internal/services/rabbitmqService.go)

We have first created a `RMQService` struct and a `NewRMQService()` constructor.

```go
type RMQService struct {
	dbService *DBService
	conn      *amqp091.Connection
	channel   *amqp091.Channel
}

func NewRMQPService(db *sql.DB, conn *amqp091.Connection, channel *amqp091.Channel) *RMQService {
	return &RMQService{
		dbService: NewDBService(db),
		conn:      conn,
		channel:   channel,
	}
}
```

This struct holds the db connection `dbService`, the RabbitMQ’s connection `conn` and the RabbitMQ’s channel `channel`.

In the `main.go`'s main function, we create a new rabbitmq service’s instance

```go
rabbitmqService := services.NewRMQService(cfg.Db, cfg.RabbitMQ, cfg.MQChannel)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rabbitmqService.Start(ctx)
```

After creating the rabbitmq service’s instance, we define a `WithCancel` context and call its `cancel()` function on defer of main function to gracefully close the RabbitMQ’s connection when the consumer service shuts down. Then we call the [`Start()`](https://github.com/Aniketyadav44/cronflow/blob/d19f24d5fbfbfb6c4f988722d609c8ba4ecd89ac/consumer/internal/services/rabbitmqService.go#L31) function.

```go
func (s *RMQService) Start(ctx context.Context) {
	q, err := s.channel.QueueDeclare("cron_events", false, false, false, false, nil)
	if err != nil {
		log.Println("error in creating rabbitmq queue: ", err.Error())
		return
	}

	msgs, err := s.channel.Consume(q.Name, "", false, false, false, false, nil)
	if err != nil {
		log.Println("error in creating a consume channel for rabbitmq: ", err.Error())
		return
	}

	go func() {
		for {
			select {
			case <-ctx.Done():
				log.Println("Stopping rabbitmq...")
				return
			case msg, ok := <-msgs:
				if !ok {
					log.Println("RabbitMQ message channel is closed.")
					return
				}
				processMessage(&msg, s.dbService)
			}
		}
	}()
	log.Println("RabbitMQ Consumer Running: Waiting for messages...")

	<-ctx.Done()
}
```

In Start function, we first declare the queue using [`QueueDeclare()`](https://pkg.go.dev/github.com/rabbitmq/amqp091-go@v1.10.0#Channel.QueueDeclare) and then call the [`Consume()`](https://pkg.go.dev/github.com/rabbitmq/amqp091-go@v1.10.0#Channel.Consume) function.

```go
msgs, err := s.channel.Consume(q.Name, "", false, false, false, false, nil)
if err != nil {
	log.Println("error in creating a consume channel for rabbitmq: ", err.Error())
	return
}
```

In this, we pass the queue’s name and mentioned autoAck as false. So that we can manually `ACK` and `NACK` the job messages for retries.

```go
go func() {
	for {
		select {
		case <-ctx.Done():
			log.Println("Stopping rabbitmq...")
			return
		case msg, ok := <-msgs:
			if !ok {
				log.Println("RabbitMQ message channel is closed.")
				return
			}
			processMessage(&msg, s.dbService)
		}
	}
}()
log.Println("RabbitMQ Consumer Running: Waiting for messages...")

<-ctx.Done()
```

Then we run a goroutine in which we run an infinite for loop which basically has a `select` statement which is waiting on the cancel context’s cannel and RabbitMQ’s consume channel.

When the application shuts down, `close()` of the context is called and a value is received on `←ctx.Done()` which returns from this function to close the RabbitMQ’s connection.

When a message is published on the queue by the `scheduler` service, that message is received on `msgs` channel which is fetched by `←msgs` case and the message is stored in `msg`

Then we call the [`processMessage()`](https://github.com/Aniketyadav44/cronflow/blob/d19f24d5fbfbfb6c4f988722d609c8ba4ecd89ac/consumer/internal/services/rabbitmqService.go#L67) function to further parse the job’s payload and process the job.

### Processing the Job

```go
func processMessage(msg *amqp091.Delivery, dbService *DBService) {
	log.Println("Received message on RabbitMQ channel: ", string(msg.Body))

	// parsing message body which has keys "job"[Job json] and "time"[Schedule time string]
	var body map[string]any
	if err := json.Unmarshal(msg.Body, &body); err != nil {
		log.Println("error in extracting message payload: ", err.Error())
		msg.Ack(false)
		return
	}

	// parsing job json from the message body json
	jobBody, ok := body["job"].(map[string]any)
	if !ok {
		log.Println("error in extracting job json: ", body["job"])
		msg.Ack(false)
		return
	}
	// parsing scheduled time from the message body json
	sTime, ok := body["time"].(string)
	if !ok {
		log.Println("error in extracting scheduled time: ", body["time"])
		msg.Ack(false)
		return
	}

	// converting the job json to bytes, to convert it to models.Job
	jobBodyByte, err := json.Marshal(jobBody)
	if err != nil {
		log.Println("error in parsing job json: ", err.Error())
		msg.Ack(false)
		return
	}
	var job *models.Job
	if err := json.Unmarshal(jobBodyByte, &job); err != nil {
		log.Println("error in extracting job: ", err.Error())
		msg.Ack(false)
		return
	}

	// get retries of any existing job entry for this job id, scheduled time which was not failed
	var jobEntry *models.JobEntry
	j, err := dbService.getExistingJobEntry(job, sTime)
	if err != nil {
		log.Println("error in getting a job entry: ", err.Error())
		msg.Ack(false)
		return
	}
	if j != nil {
		// if a job entry already exists, update that in jobEntry variable
		jobEntry = j
	} else {
		createdEntry, err := dbService.createNewJobEntry(job, sTime)
		if err != nil {
			log.Println("error creating new entry in db: ", err.Error())
			msg.Ack(false)
			return
		}
		jobEntry = createdEntry
	}

	// checking if the retry count reached max retries
	if jobEntry.Retries >= MaxJobRetries {
		dbService.markJobAsPermanentlyFailed(jobEntry)
		msg.Ack(false)
		return
	}

	switch job.Type {
	case "ping":
		if err := processPingJob(dbService, job, jobEntry, sTime); err != nil {
			handleJobError(dbService, err, msg, jobEntry)
			return
		}
	case "email":
		if err := processEmailJob(dbService, job, jobEntry, sTime); err != nil {
			handleJobError(dbService, err, msg, jobEntry)
			return
		}
	case "slack":
		if err := processSlackJob(dbService, job, jobEntry, sTime); err != nil {
			handleJobError(dbService, err, msg, jobEntry)
			return
		}
	case "webhook":
		if err := processWebhookJob(dbService, job, jobEntry, sTime); err != nil {
			handleJobError(dbService, err, msg, jobEntry)
			return
		}
	default:
		handleJobError(dbService, fmt.Errorf("invalid event type: %s", job.Type), msg, jobEntry)
	}

	msg.Ack(false)
}
```

In this function, we first get the json body from message’s payload. Then we extract the job’s json from this body stored at key `job` and scheduled time store at key `time`.

Then we encode the job’s json to convert it to a `Job` type variable stored in `job *models.Job`.

If there are errors at any of the parsing step, we acknowledge the message using `Ack(false)`. We pass multiple as false in Ack to avoid acknowledging any other prior deliveries.

We do this, because we can’t process the job further and have to remove this message from the queue by acknowledging.

```go
// get retries of any existing job entry for this job id, scheduled time which was not failed
var jobEntry *models.JobEntry
j, err := dbService.getExistingJobEntry(job, sTime)
if err != nil {
	log.Println("error in getting a job entry: ", err.Error())
	msg.Ack(false)
	return
}
if j != nil {
	// if a job entry already exists, update that in jobEntry variable
	jobEntry = j
} else {
	createdEntry, err := dbService.createNewJobEntry(job, sTime)
	if err != nil {
		log.Println("error creating new entry in db: ", err.Error())
		msg.Ack(false)
		return
	}
	jobEntry = createdEntry
}
```

Then we call [`getExistingJobEntry()`](https://github.com/Aniketyadav44/cronflow/blob/d19f24d5fbfbfb6c4f988722d609c8ba4ecd89ac/consumer/internal/services/dbService.go#L21) function from dbService to check if there were any previous execution trials for this job at that specific scheduled time. If there was prior execution and this is a retry round, then we initiate the `jobEntry` variable.

If there were no previous execution, we create a new job entry with `running` status for that scheduled time in database using [`createNewJobEntry()`](https://github.com/Aniketyadav44/cronflow/blob/d19f24d5fbfbfb6c4f988722d609c8ba4ecd89ac/consumer/internal/services/dbService.go#L39) function and put it’s value in `jobEntry` variable.

```go
// checking if the retry count reached max retries
if jobEntry.Retries >= MaxJobRetries {
	dbService.markJobAsPermanentlyFailed(jobEntry)
	msg.Ack(false)
	return
}
```

Then we check in case of retrying job, if the retry count has exceeded `MaxJobRetries` i.e 3.

If yes, then we update that job with `permanently_failed` status in database for that scheduled time using [`markJobAsPermanentlyFailed()`](https://github.com/Aniketyadav44/cronflow/blob/d19f24d5fbfbfb6c4f988722d609c8ba4ecd89ac/consumer/internal/services/dbService.go#L69) function and acknowledge the message to remove it from the queue and finally return from the function.

```go
switch job.Type {
case "ping":
	if err := processPingJob(dbService, job, jobEntry, sTime); err != nil {
		handleJobError(dbService, err, msg, jobEntry)
		return
	}
case "email":
	if err := processEmailJob(dbService, job, jobEntry, sTime); err != nil {
		handleJobError(dbService, err, msg, jobEntry)
		return
	}
case "slack":
	if err := processSlackJob(dbService, job, jobEntry, sTime); err != nil {
		handleJobError(dbService, err, msg, jobEntry)
		return
	}
case "webhook":
	if err := processWebhookJob(dbService, job, jobEntry, sTime); err != nil {
		handleJobError(dbService, err, msg, jobEntry)
		return
	}
default:
	handleJobError(dbService, fmt.Errorf("invalid event type: %s", job.Type), msg, jobEntry)
}
```

Then we switch-case on the `job.Type` and call further task process functions based on the job type.

If we get error from the task process functions, we call [`handleJobError()`](https://github.com/Aniketyadav44/cronflow/blob/d19f24d5fbfbfb6c4f988722d609c8ba4ecd89ac/consumer/internal/services/rabbitmqService.go#L166) function and return.

```go
func handleJobError(dbService *DBService, err error, msg *amqp091.Delivery, jobEntry *models.JobEntry) {
	log.Println("error in processing job: ", err.Error(), ", retries: ", jobEntry.Retries)
	time.Sleep(2 * time.Second)
	dbService.markJobAsFailed(err, jobEntry.Retries+1, jobEntry)
	msg.Nack(false, true)
}
```

In `handleJobError()` function, we sleep for 2 seconds before retrying.

We mark the job run entry as `failed` in database using [`markJobAsFailed()`](https://github.com/Aniketyadav44/cronflow/blob/d19f24d5fbfbfb6c4f988722d609c8ba4ecd89ac/consumer/internal/services/dbService.go#L75) function. The job entry’s status will move to `permanently_failed` if it fails all 3 retries.

Then we negative acknowledge `Nack()` the message and pass requeue as true to put this message back into the queue.

On success processing of the job inside of the task processing function, we use [`markJobAsCompleted()`](https://github.com/Aniketyadav44/cronflow/blob/d19f24d5fbfbfb6c4f988722d609c8ba4ecd89ac/consumer/internal/services/dbService.go#L63) function from dbService to update the job entry status to `completed`.

```go
msg.Ack(false)
```

Then finally in end of `processMessage()` function, we `Ack()` the message.  
In this way, the processing of job is done after it was consumed!

## Conclusion

In this blog, we have learned about how we can schedule cron jobs and publish them to RabbitMQ’s queue. And how these messages can be consumed from the queue.

And we also learned, after consuming how we can acknowledge and retry on job failures.

This blog contained parts from the full project I created to also provide the dashboard for creating jobs, listing all jobs and view the job run statuses as `running`/`completed`/`failed`(for retries)/`permanently_failed`.

Make sure to give it a go:

%[https://github.com/Aniketyadav44/cronflow/tree/main] 

---

If you find this article helpful, don't forget to hit the ❤️ button.

Check out my website [**here**](https://anikety.com/) and feel free to connect.

Happy Coding! 👨‍💻
