Back to main page Traveling Coderman

Node.js Microservice Architecture Building an asynchronous batch job

113 views

In the previous post, we setup the architecture for application subcommands. Now we can implement a command send-delay-notifications. This command can run locally with npm run command send-delay-notifications. It can also be triggered on a schedule in production.

In this post, we implement this command. We retrieve delayed todos from the database and send out a notification for each one. We put a special focus on the separation of this code from the endpoint logic. Also, we take a look at error handling and logging in such asynchronous jobs.

Structure ๐Ÿ”—

The functionality of the command send-delay-notifications is implemented in a directory src/send-delay-notifications. The command commands/send-delay-notifications.command.ts is the entry point and delegates to the function sendDelayNotifications.

%%{init: {'theme': 'base', 'themeVariables': {"darkMode":true,"background":"#003890","primaryColor":"#0063b1","fontFamily":"Montserrat, sans-serif","fontSize":"1rem","lineColor":"white"} }}%%
graph TD
    A(cmdSendDelayNotifications)
    B(sendDelayNotifications)
    C(getDelayedTodos)
    D(sendDelayNotification)
    E(sendNotification)
    A-->B
    B-->C
    B-->D
    D-->E

Retrieving delayed todos ๐Ÿ”—

In the top-level directory src/delay-notifications, we create a file delayed-todo.dao.ts. This data access module contains a Knex SQL query to retrieve all the delayed todos.

Note There is already a file src/controllers/todos/todo.dao used in the controllers. We don't add functions to that data access module nor do we make it more generic to use it. Instead we create this new data access module delayed-todo.dao. The controllers and this job only share the schema in the database, not a data access module. This ensures that future changes in code can only affect and break one functionality at a time.

import { db } from "../database/db";
import { DbTodo } from "./todo.type";

export function getDelayedTodos(
now: string
): Promise<Pick<DbTodo, "id" | "workspaceId">[]> {
return db
.table<DbTodo>("todos")
.where("dueDate", "<=", now)
.select("id", "workspaceId");
}

The data access module retrieves delayed todos across all workspaces. We can retrieve them across workspaces since we didn't separate the workspaces into different database schemas. Instead, workspaces are represented with a column workspaceId.

The returned todos only contain the fields id and workspaceId. We reflect this in a type DbTodo in a file src/delay-notifications/todo.type.

import { WithWorkspaceId } from "../workspace-id.type";

export type TodoId = string;

export interface Todo {
id: TodoId;
dueDate: string;
}

export type DbTodo = Todo & WithWorkspaceId;

Note The type Todo is also defined by the controllers in a file src/controllers/todos/todo.type. However, the controllers need different fields and in future posts, the todos of the functionality send-delay-notifications need a field that the controllers don't need. Like for the data access module, only the database schema is shared. We don't share common types between different functionalities.

Notification service ๐Ÿ”—

There exists an API endpoint POST /notify/notifications of a different service notify. We implement a service src/delay-notifications/notification.service that implements a TypeScript interface for the relevant options.

import axios, { AxiosRequestConfig } from "axios";
import { config } from "../configuration/config";
import { workspaceAccess } from "../http/service-access.type";
import { WorkspaceId } from "../workspace-id.type";

export function sendNotification(
workspaceId: WorkspaceId,
message: string
): Promise<void> {
return axios.post(
`${config.http.servicesUrl}/notify/notifications`,
{ message },
{
access: workspaceAccess(workspaceId),
} as AxiosRequestConfig
);
}

Sending a single delay notification ๐Ÿ”—

We add a module send-delay-notification.ts to send a single delay notification. In addition to what the notification service provides, it encapsulates error handling.

In comparison to endpoints, error handling and logging need to be used more carefully in batch jobs. Since endpoints are executed synchronously, a user is waiting for a response. If something goes wrong, an error can be reported to that user.

This is different in asynchronous jobs. There is no user waiting for a response and hence there is no user to report an error to. A user that is waiting synchronously for a reponse has context about their request. Somebody investigating a failure of an asynchronous job is missing this context. Hence, it is important to provide this context via logging and specific error handlers.

The function sendDelayNotification handles the common error of a failing API request. In that case, it logs the error, describes how it will continue and provides an identifier of the skipped entity.

import { logger } from "../logger";
import { sendNotification } from "./notification.service";
import { DbTodo } from "./todo.type";

export async function sendDelayNotification(
todo: Pick<DbTodo, "id" | "workspaceId">
): Promise<boolean> {
try {
await sendNotification(todo.workspaceId, `Todo ${todo.id} is delayed`);
return true;
} catch (error) {
logger.error(
{
workspaceId: todo.workspaceId,
todoId: todo.id,
error,
},
"Could not send delayed todo. Skipping it..."
);
return false;
}
}

Assembling everything together ๐Ÿ”—

In the function sendDelayNotifications, we retrieve the delayed todos and send a notification for each of them. If the retrieval of the delayed todos fails, we log an error and abort the job.

import { logger } from "../logger";
import { getDelayedTodos } from "./delayed-todo.dao";
import { sendDelayNotification } from "./send-delay-notification";

export async function sendDelayNotifications(
startDate: string
): Promise<boolean> {
logger.info({ startDate }, "Starting to send delay notifications.");
const todos = await getDelayedTodos(startDate).catch((error) => {
logger.error(
{ startDate, error },
"Could not retrieve delayed todos from the database. Aborting job..."
);
return null;
});
if (!todos) {
return false;
}
for (const todo of todos) {
await sendDelayNotification(todo);
}
logger.info({ startDate }, "Finished to send delay notifications.");
return true;
}

Note GitHub sponsors have access to tests for the functions sendDelayNotifications, sendDelayNotification and getDelayedTodos in the GitHub repo.

Adding delayed todos in the local database ๐Ÿ”—

In an earlier post, we added a Knex.js seed to populate the local database with example data.

For more information on this, checkout 'Knex.js schema migrations with Postgres in Node.js'

We adjust this seed to contain two delayed todos. The todo of Peter now has a due date of 2020-06-11 and the todo of Ousmane has a due date of 2020-07-14.

import { Knex } from "knex";
import { DbTodo } from "../../controllers/todos/todo.type";
import { WorkspaceId } from "../../workspace-id.type";

const workspaceId1: WorkspaceId = "a9393008-eab2-48e8-b820-0e03447f881c";
const workspaceId2: WorkspaceId = "83fdfa88-737d-4550-87e4-c78bf954fcf2";

const todos: DbTodo[] = [
{
workspaceId: workspaceId1,
id: "142e8446-f5ba-4813-bfb5-f3192a37f1bf",
name: "Workspace 1 - Todo 1",
assignee: "Naomi",
dueDate: "2025-01-05",
},
{
workspaceId: workspaceId1,
id: "bf6091cc-69f8-4a9e-93ed-0294c3a8ac2f",
name: "Workspace 1 - Todo 2",
assignee: "Peter",
dueDate: "2020-06-11",
},
{
workspaceId: workspaceId1,
id: "6e5bad4c-7f1c-4ab4-9361-c6bd697b4257",
name: "Workspace 1 - Todo 3",
assignee: "Sheila",
dueDate: "2032-11-27",
},
{
workspaceId: workspaceId2,
id: "1d2be9f2-4fe9-4509-b8ec-d0d9425c3685",
name: "Workspace 2 - Todo 1",
assignee: "Ousmane",
dueDate: "2020-07-14",
},
{
workspaceId: workspaceId2,
id: "e868f10d-d21a-4139-9c5d-b8c73c62735a",
name: "Workspace 2 - Todo 2",
assignee: "Carla",
dueDate: "2034-09-07",
},
];

export async function seed(knex: Knex): Promise<void> {
await knex("todos").truncate();
await knex("todos").insert(todos);
}

Running it ๐Ÿ”—

With npm start, we spin up the database and the mockserver for the notify service. We can use npm run command send-delay-notifications to execute our implemented logic locally.

$ npm run command send-delay-notifications

INFO: Starting to send delay notifications.
startDate: "2022-06-03"
DEBUG: Request
method: "POST"
url: "http://localhost:3001/notify/notifications"
body: {
"message": "Todo bf6091cc-69f8-4a9e-93ed-0294c3a8ac2f is delayed"
}
DEBUG: Request
method: "POST"
url: "http://localhost:3001/auth/token"
body: ""
INFO: Response
status: 200
method: "POST"
url: "http://localhost:3001/auth/token"
DEBUG: Response Details
status: 200
method: "POST"
url: "http://localhost:3001/auth/token"
body: {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwiY2xpZW50SWQiOiJ0b2RvcyIsImlhdCI6MTUxNjIzOTAyMn0.YD_HJROnQHJIROzVBagK27D862QoNvokSDFxDbA8_Ug"
}
INFO: Response
status: 200
method: "POST"
url: "http://localhost:3001/notify/notifications"
DEBUG: Response Details
status: 200
method: "POST"
url: "http://localhost:3001/notify/notifications"
body: {
"message": "Notification created"
}
DEBUG: Request
method: "POST"
url: "http://localhost:3001/notify/notifications"
body: {
"message": "Todo 1d2be9f2-4fe9-4509-b8ec-d0d9425c3685 is delayed"
}
DEBUG: Request
method: "POST"
url: "http://localhost:3001/auth/token"
body: ""
INFO: Response
status: 200
method: "POST"
url: "http://localhost:3001/auth/token"
DEBUG: Response Details
status: 200
method: "POST"
url: "http://localhost:3001/auth/token"
body: {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwiY2xpZW50SWQiOiJ0b2RvcyIsImlhdCI6MTUxNjIzOTAyMn0.YD_HJROnQHJIROzVBagK27D862QoNvokSDFxDbA8_Ug"
}
INFO: Response
status: 200
method: "POST"
url: "http://localhost:3001/notify/notifications"
DEBUG: Response Details
status: 200
method: "POST"
url: "http://localhost:3001/notify/notifications"
body: {
"message": "Notification created"
}
INFO: Finished to send delay notifications.
startDate: "2022-06-03"

We see that the endpoint POST /notify/notifications is called two times. Before these calls, the configured Axios instance retrieves and attaches JWTs. Since both todos are in different workspaces, two different JWT are retrieved and the cache is not utilized.

Conclusion ๐Ÿ”—

We started to implement the asynchronous batch job send-delay-notifications. The job retrieves all delayed todos and sends a notifications for each of them. In the following posts, we make the job idempotent to avoid duplicate notifications on repeated runs. Also, we implement paging for the delayed todos to avoid exhaustion of the memory.

Become a GitHub sponsor to access the code of this post as a GitHub repo.