Node.js Microservice Architecture Building an idempotent cron job with at-least-once delivery
Node.js Part 20 Jun 9, 2022 420 views
In the previous post, we implemented a batch job to send a notification for each delayed todo of the Postgres database. So far, on each execution, this batch job sends another notification for each delayed todo. Instead, we want to achieve that the notification for a delayed todo is not send again after a successful delivery. In this post, we implement such an idempotent cron job with at-least-once delivery.
At-least-once delivery 🔗
Note the difference between at-least-once, exactly-once and at-most-once delivery.
There can always be errors in network communication. Even worse, in a network outage, it can't even be identified if the other side has received and processed the message or not. Therefore, it is technically challenging to achieve exactly-once delivery.
Instead, it is more straightforward to either go for at-least-once delivery or at-most-once delivery. For the use case of our notifications, both options are viable. We go for at-least-once delivery. This way, we can guarantee a user to receive a notification, but we can not guarantee that there are no duplicated notifications.
Persisting if a notification has been sent 🔗
In the existing table todos
, we create a column delayNotificationSent
with a Knex.js schema migration. It persists a boolean variable. It is set to false
on creation since the notification has not yet been sent.
Note We are mixing user-provided data (assignee, due date, etc.) with internal data (delay notification sent) in the database. It is the purpose of the data access modules to act on the relevant subset of columns. One could separate into two tables
todos
andnotifications
. However, although these would have a one-to-one relation, we would need joins to query over both. It is good to avoid that complexity.
import { Knex } from "knex";
export async function up(knex: Knex): Promise<void> {
await knex.schema.alterTable("todos", (table) => {
table.boolean("delayNotificationSent").defaultTo(false).notNullable();
});
}
export async function down(knex: Knex): Promise<void> {
await knex.schema.alterTable("todos", (table) => {
table.dropColumn("delayNotificationSent");
});
}
Filtering for todos without sent notification 🔗
The function getDelayedTodos
in the data access module delay-notifications/delayed-todos.dao.ts
returns the list of delayed todos. With the new column being created, we can add a WHERE
clause filtering for todos with the value false
in the column delayNotificationSent
.
import { db } from "../database/db";
import { WorkspaceId } from "../workspace-id.type";
import { DbTodo, TodoId } from "./todo.type";
export function getDelayedTodos(
now: string
): Promise<Pick<DbTodo, "id" | "workspaceId">[]> {
return db
.table<DbTodo>("todos")
.where("dueDate", "<=", now)
.andWhere("delayNotificationSent", false)
.select("id", "workspaceId");
}
Marking a todo as processed 🔗
In the same data access module, we create a function setDelayNotificationSent
. This function updates the column delayNotificationSent
to true
.
import { db } from "../database/db";
import { WorkspaceId } from "../workspace-id.type";
import { DbTodo, TodoId } from "./todo.type";
export async function setDelayNotificationSent(
workspaceId: WorkspaceId,
todoId: TodoId
): Promise<void> {
await db
.table<DbTodo>("todos")
.where("id", todoId)
.andWhere("workspaceId", workspaceId)
.update("delayNotificationSent", true);
}
Typing the new column 🔗
We create dedicated types Todo
and DbTodo
for the send-delay-notifications
functionality. We add the column delayNotificationSent
to be part of these todos. The columns dueDate
and assignee
are not relevant in the context of the job send-delay-notifications
. They are therefore left out.
import { WithWorkspaceId } from "../workspace-id.type";
export type TodoId = string;
export interface Todo {
id: TodoId;
dueDate: string;
delayNotificationSent: boolean;
}
export type DbTodo = Todo & WithWorkspaceId;
Sending a notification and marking the todo 🔗
We assemble everything in the function sendDelayNotification
. It is rather long due to error handling. Nevertheless, it only does two things.
- Send a notification for the todo with
sendNotification
. - Mark the todo as processed with
setDelayNotificationSent
.
import { logger } from "../logger";
import { setDelayNotificationSent } from "./delayed-todo.dao";
import { sendNotification } from "./notification.service";
import { DbTodo } from "./todo.type";
export async function sendDelayNotification(
todo: Pick<DbTodo, "id" | "workspaceId">
): Promise<boolean> {
try {
await sendNotification(todo.workspaceId, `Todo ${todo.id} is delayed`);
try {
await setDelayNotificationSent(todo.workspaceId, todo.id);
return true;
} catch (error) {
logger.error(
{
workspaceId: todo.workspaceId,
todoId: todo.id,
error,
},
"Could not mark in the database that the notification for this todo has been sent. The notification will be send another time in the next run."
);
return false;
}
} catch (error) {
logger.error(
{
workspaceId: todo.workspaceId,
todoId: todo.id,
error,
},
"Could not send delayed todo. Skipping it..."
);
return false;
}
}
In this function, we can observe the implementation of at-least-once delivery. It is defined by the order of the calls to sendNotification
and setDelayNotificationSent
. If we first marked the todo and then sent the notification, we would have at-most-once delivery.
With this implementation, a user could receive two notifications in the following scenario.
- We call the endpoint
POST /notifications
of the servicenotify
. - The service
notify
creates the notification and tries to respond with a success200
. - The network fails.
- After a time of
x
seconds, Axios errors with a timeout. - We don't mark the todo as processed.
- In a subsequent run of the job, the todo is picked up again and a new notification is sent.
Note Exactly-once delivery could be achieved if the creation of notifications itself would be idempotent. This would require us to pass a unique identifier for a created notification like
delayed-<todoId>
. Then, the servicenotify
would not create another notification, but update the existing notification. The endpointPOST /notify/notifications
would then rather bePUT /notify/notifications/:id
.
Conclusion 🔗
We implemented at-least-once delivery of notifications in an idempotent cron job. The job can be run as often as desired and only sends notifications for todos if they have not been successfully sent before.
Become a GitHub sponsor to access the code of this post as a GitHub repo.