Back to main page Traveling Coderman

Node.js Microservice Architecture Building an idempotent cron job with at-least-once delivery

<100 views

In the previous post, we implemented a batch job to send a notification for each delayed todo of the Postgres database. So far, on each execution, this batch job sends another notification for each delayed todo. Instead, we want to achieve that the notification for a delayed todo is not send again after a successful delivery. In this post, we implement such an idempotent cron job with at-least-once delivery.

At-least-once delivery 🔗

Note the difference between at-least-once, exactly-once and at-most-once delivery.

There can always be errors in network communication. Even worse, in a network outage, it can't even be identified if the other side has received and processed the message or not. Therefore, it is technically challenging to achieve exactly-once delivery.

Instead, it is more straightforward to either go for at-least-once delivery or at-most-once delivery. For the use case of our notifications, both options are viable. We go for at-least-once delivery. This way, we can guarantee a user to receive a notification, but we can not guarantee that there are no duplicated notifications.

Persisting if a notification has been sent 🔗

In the existing table todos, we create a column delayNotificationSent with a Knex.js schema migration. It persists a boolean variable. It is set to false on creation since the notification has not yet been sent.

Note We are mixing user-provided data (assignee, due date, etc.) with internal data (delay notification sent) in the database. It is the purpose of the data access modules to act on the relevant subset of columns. One could separate into two tables todos and notifications. However, although these would have a one-to-one relation, we would need joins to query over both. It is good to avoid that complexity.

import { Knex } from "knex";

export async function up(knex: Knex): Promise<void> {
await knex.schema.alterTable("todos", (table) => {
table.boolean("delayNotificationSent").defaultTo(false).notNullable();
});
}

export async function down(knex: Knex): Promise<void> {
await knex.schema.alterTable("todos", (table) => {
table.dropColumn("delayNotificationSent");
});
}

Filtering for todos without sent notification 🔗

The function getDelayedTodos in the data access module delay-notifications/delayed-todos.dao.ts returns the list of delayed todos. With the new column being created, we can add a WHERE clause filtering for todos with the value false in the column delayNotificationSent.

import { db } from "../database/db";
import { WorkspaceId } from "../workspace-id.type";
import { DbTodo, TodoId } from "./todo.type";

export function getDelayedTodos(
now: string
): Promise<Pick<DbTodo, "id" | "workspaceId">[]> {
return db
.table<DbTodo>("todos")
.where("dueDate", "<=", now)
.andWhere("delayNotificationSent", false)
.select("id", "workspaceId");
}

Marking a todo as processed 🔗

In the same data access module, we create a function setDelayNotificationSent. This function updates the column delayNotificationSent to true.

import { db } from "../database/db";
import { WorkspaceId } from "../workspace-id.type";
import { DbTodo, TodoId } from "./todo.type";

export async function setDelayNotificationSent(
workspaceId: WorkspaceId,
todoId: TodoId
): Promise<void> {
await db
.table<DbTodo>("todos")
.where("id", todoId)
.andWhere("workspaceId", workspaceId)
.update("delayNotificationSent", true);
}

Typing the new column 🔗

We create dedicated types Todo and DbTodo for the send-delay-notifications functionality. We add the column delayNotificationSent to be part of these todos. The columns dueDate and assignee are not relevant in the context of the job send-delay-notifications. They are therefore left out.

import { WithWorkspaceId } from "../workspace-id.type";

export type TodoId = string;

export interface Todo {
id: TodoId;
dueDate: string;
delayNotificationSent: boolean;
}

export type DbTodo = Todo & WithWorkspaceId;

Sending a notification and marking the todo 🔗

We assemble everything in the function sendDelayNotification. It is rather long due to error handling. Nevertheless, it only does two things.

  1. Send a notification for the todo with sendNotification.
  2. Mark the todo as processed with setDelayNotificationSent.
import { logger } from "../logger";
import { setDelayNotificationSent } from "./delayed-todo.dao";
import { sendNotification } from "./notification.service";
import { DbTodo } from "./todo.type";

export async function sendDelayNotification(
todo: Pick<DbTodo, "id" | "workspaceId">
): Promise<boolean> {
try {
await sendNotification(todo.workspaceId, `Todo ${todo.id} is delayed`);
try {
await setDelayNotificationSent(todo.workspaceId, todo.id);
return true;
} catch (error) {
logger.error(
{
workspaceId: todo.workspaceId,
todoId: todo.id,
error,
},
"Could not mark in the database that the notification for this todo has been sent. The notification will be send another time in the next run."
);
return false;
}
} catch (error) {
logger.error(
{
workspaceId: todo.workspaceId,
todoId: todo.id,
error,
},
"Could not send delayed todo. Skipping it..."
);
return false;
}
}

In this function, we can observe the implementation of at-least-once delivery. It is defined by the order of the calls to sendNotification and setDelayNotificationSent. If we first marked the todo and then sent the notification, we would have at-most-once delivery.

With this implementation, a user could receive two notifications in the following scenario.

  1. We call the endpoint POST /notifications of the service notify.
  2. The service notify creates the notification and tries to respond with a success 200.
  3. The network fails.
  4. After a time of x seconds, Axios errors with a timeout.
  5. We don't mark the todo as processed.
  6. In a subsequent run of the job, the todo is picked up again and a new notification is sent.

Note Exactly-once delivery could be achieved if the creation of notifications itself would be idempotent. This would require us to pass a unique identifier for a created notification like delayed-<todoId>. Then, the service notify would not create another notification, but update the existing notification. The endpoint POST /notify/notifications would then rather be PUT /notify/notifications/:id.

Conclusion 🔗

We implemented at-least-once delivery of notifications in an idempotent cron job. The job can be run as often as desired and only sends notifications for todos if they have not been successfully sent before.

Become a GitHub sponsor to access the code of this post as a GitHub repo.