How I Handle Long Processes Using Nest.JS and RabbitMQ

Image for post
Image for post

Who would’ve thought that the 10 seconds limit on free tier will open a new learning experience? Here comes the RabbitMQ!

RabbitMQ is an open-source and lightweight message broker that acts like queuing storage for your API and for your workers (i.e.: Microservices). For example, if your API can’t handle long processes for a short amount of time, it is great to move it to the background and notify your users that it is already in the queue. It kinda acts like middleware between the two.

On a high-level view, our app contains a producer and a consumer. A producer is the one who sends (publishes) messages to a broker. On the other hand, a consumer is the one who is listening and will receive the messages from the broker so it can handle the tasks on the background. Usually, a consumer is running on a different machine from the producer.

Let’s Get Our Hands Dirty

To get started, we need to have two (2) repositories. One is the producer and the other one is the consumer. I’ll do that by cloning the starter pack provided by the Nest.JS team.

$ mkdir rabbit-mq-nest-js-tutorial
$ cd rabbit-mq-nest-js-tutorial
$ git clone https://github.com/nestjs/typescript-starter producer
$ git clone https://github.com/nestjs/typescript-starter consumer
$ cd producer
$ npm i
$ cd consumer
$ npm i

After cloning and installing the modules, we will have the same structure like this:

Image for post
Image for post

Before we start coding, let’s head up first on creating a RabbitMQ instance. This can be done locally (yes you can install it on your machine) or through the cloud for free! Right now, I prefer to use the cloud approach in https://www.cloudamqp.com/. Head there and sign up!

Once finished signing up or logging in, create a new instance. After creating the instance, you will always have these values.

Image for post
Image for post

The first one is the queue name and the other one is the AMQP URL that we will be using to send/receive messages from the queue. Keep those values in hand.

Implementing The Producer

Let’s now head to our producer repository. Once you are already inside it, install the Nest.js microservices module and the RabbitMQ required packages:

$ npm i @nestjs/microservices amqplib amqp-connection-manager

After installing, let’s start by creating a RabbitMQ module. I will name it rabbit-mq.module.ts . Then inside the module, let’s add these lines. Remember the queue name and the AMQP URL, those will be added in the urls and in the queue properties.

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'rabbit-mq-module',
transport: Transport.RMQ,
options: {
urls: [
'amqp://xnfbgxgg:7EhYkk-2a_hsScQWmyyuOlZ33v7U-bjx@wasp.rmq.cloudamqp.com/xnfbgxgg',
],
queue: 'rabbit-mq-nest-js',
},
},
]),
],
controllers: [],
providers: [],
exports: [],
})
export class RabbitMQModule {}

💡 Tip: Put your constant values into an enum.

Then after creating the RabbitMQ module, let’s create a service file where we will put the sending/publishing to the queue. I’ll name it rabbit-mq.service.ts . Inside the service, let’s implement the sending/publishing to the queue by adding these lines:

import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
@Injectable()
export class RabbitMQService {
constructor(
@Inject('rabbit-mq-module') private readonly client: ClientProxy,
) {}
public send(pattern: string, data: any) {
return this.client.send(pattern, data).toPromise();
}
}

💡 Tip: Put your constant values into an enum.

On the constructor, the token from the @Inject is the name on the rabbit-mq.module.ts when we register it. The send implementation returns an observable by default but if you want to return a promise style, you can do it by calling the toPromise() just like I did.

Thesend method is a lazy one! It will not send unless to call .subscribe() or .toPromise() . It returns a cold observable and it is an intended behavior based from the Nest.JS creator.

See more about this:
https://github.com/nestjs/nest/issues/2718
https://docs.nestjs.com/microservices/basics#sending-messages

The pattern is the one that the consumer app will have to listen to.

The data is the one that the consumer app will receive when it receives the message from the queue.

If you are still looking for a more detailed explanation, you can see it here on the Nest.JS documentation.

Going back with our project. Now that we are finished with the service, let’s add it as well in the RabbitMQ module.

import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { RabbitMQService } from './rabbit-mq.service';@Module({
imports: [
ClientsModule.register([
{
name: 'rabbit-mq-module',
transport: Transport.RMQ,
options: {
urls: [
'amqp://xnfbgxgg:7EhYkk-2a_hsScQWmyyuOlZ33v7U-bjx@wasp.rmq.cloudamqp.com/xnfbgxgg',
],
queue: 'rabbit-mq-nest-js',
},
},
]),
],
controllers: [],
providers: [RabbitMQService],
exports: [RabbitMQService],
})
export class RabbitMQModule {}

💡 Tip: Put your constants values into an enum.

We added it in the providers and exports so that when other modules imported our module they can use the RabbitMQService and other services in the future.

After doing that, import the RabbitMQ module into the app.module.ts.

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { RabbitMQModule } from './rabbit-mq.module';
@Module({
imports: [RabbitMQModule],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}

Once we finish that one, let us now go to our last step and implement the sending/publishing to our controller. Open up our app.controller.ts and update the getHello method.

import { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';
import { RabbitMQService } from './rabbit-mq.service';
@Controller()
export class AppController {
constructor(
private readonly appService: AppService,
private readonly rabbitMQService: RabbitMQService,
) {}
@Get()
async getHello() {
this.rabbitMQService.send('rabbit-mq-producer', {
message: this.appService.getHello(),
});
return 'Message sent to the queue!';
}
}

💡 Tip: Put your constants values into an enum.

We’ve now used the RabbitMQService to send/publish to the event with the pattern of rabbit-mq-producer . Note that we didn’t wait for it or subscribe to it because we don’t have to wait for it and that’s unless you have valid reasons.

Moving on, the consumer app will have to listen to that pattern later on. Another parameter, we are sending an object with a property of message the consumer app. We should expect that message later on!

Let’s try to start our producer app and that should run as normal! 🚀

Implementing The Consumer

Now that our producer app is ready. We can now move on to the consumer app. To start with, we should also install the same required modules just like we did on the producer.

$ npm i @nestjs/microservices amqplib amqp-connection-manager

After installing the required packages. Our next goal is to modify the way how our app is being created. Currently, it is created in a server fashion. We will modify it to create the app in a microservice fashion.

To do that, let’s go to our main.ts and do the same thing like this:

import { NestFactory } from '@nestjs/core';import { AppModule } from './app.module';
import { Transport } from '@nestjs/common/enums/transport.enum';
async function bootstrap() {
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.RMQ,
options: {
urls: [
'amqp://xnfbgxgg:7EhYkk-2a_hsScQWmyyuOlZ33v7U-bjx@wasp.rmq.cloudamqp.com/xnfbgxgg'
],
queue: 'rabbit-mq-nest-js',
// false = manual acknowledgement; true = automatic acknowledgment
noAck: false,
// Get one by one
prefetchCount: 1
}
});
await app.listenAsync();
}
bootstrap();

💡 Tip: Put your constants values into an enum.

If you are looking for more options, you can see it here on the Nest.JS documentation.

Now that we’ve modified how our app is created, we can now move on to the controller and modify that as well to receive messages from the queue! Head on to the app.controller.ts

import { Controller } from '@nestjs/common';import {
MessagePattern,
RmqContext,
Ctx,
Payload
} from '@nestjs/microservices';
import { AppService } from './app.service';@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@MessagePattern('rabbit-mq-producer')
public async execute(
@Payload() data: any,
@Ctx() context: RmqContext
) {
const channel = context.getChannelRef();
const orginalMessage = context.getMesssage();
console.log('data', data); channel.ack(orginalMessage);
}
}

💡 Tip: Put your constants values into an enum.

With those modifications, we are telling the controller to listen for the pattern rabbit-mq-producer , and then log the received data. Our data that we will receive should look like this simple object:

{
message: "Hello World!"
}

Let’s try that and see if we can receive that one. Start both applications!

Image for post
Image for post

When both apps are already running, let’s trigger our producer to send/publish to the queue! Head on to the browser and type the http://localhost:3000/. It should return an immediate response to us with “Message sent to the queue!”.

Then if we will observe the terminal from the consumer app, we will see that it logs our message.

Image for post
Image for post

Hooray! We can now receive it from our consumer app! 🎉

Bringing It All Together

Now that we have implemented both our producer and consumer apps. We can now test long processes on the consumer app. Let’s say that in order to process each data, it needs like 5 seconds span of time. To test that one, we can set up a “delay” or “wait” to delay executing further.

To do that, let’s go to our consumer app and create a sample delay using promises. Go to the appService and create a sample method that I will call mySuperLongProcessOfUser that will acts like a “delay” or “wait”.

import { Injectable } from '@nestjs/common';@Injectable()
export class AppService {
getHello(): string {
return 'Hello World!';
}
mySuperLongProcessOfUser(data: any) {
return new Promise(resolve => {
setTimeout(() => {
console.log(`done processing ${JSON.stringify(data)}`);
resolve();
}, 10000);
});
}
}

Then on the app.controller.ts , add the following:

// consumer appimport { Controller } from '@nestjs/common';import {
MessagePattern,
RmqContext,
Ctx,
Payload
} from '@nestjs/microservices';
import { AppService } from './app.service';@Controller()
export class AppController {
constructor(private readonly appService: AppService) {}
@MessagePattern('rabbit-mq-producer')
public async execute(
@Payload() data: any,
@Ctx() context: RmqContext
) {
const channel = context.getChannelRef();
const orginalMessage = context.getMesssage();
console.log('data', data);
await this.appService.mySuperLongProcessOfUser(data);
channel.ack(orginalMessage);
}
}

This will now handle each message in 5 seconds. Before we see it on the action. We need to modify also our producer app to send multiple messages. To do that, let’s go back to our producer app, go to the app.controller.ts, and add the following:

// producer appimport { Controller, Get } from '@nestjs/common';
import { AppService } from './app.service';
import { RabbitMQService } from './rabbit-mq.service';
@Controller()
export class AppController {
constructor(
private readonly appService: AppService,
private readonly rabbitMQService: RabbitMQService,
) {}
@Get()
async getHello() {
const pendingOperations = Array.from(new Array(5)).map((_, index) =>
this.rabbitMQService.send('rabbit-mq-producer', {
message: this.appService.getHello() + index,
}),
);
Promise.all(pendingOperations);
return 'Message sent to the queue!';
}
}

💡 Info: This is equivalent to hitting your endpoint 5 times.

This will create 5 operations that will be sent to the queue! So our consumer app will receive 5 messages and each message will be processed 5 seconds each.

Our expected behavior is that our producer app will not hold the request for too long and there will be no blocking requests. Then our consumer app will process each message in a span of 5 seconds using our custom “delay” or “wait” function. Let’s see that!

Image for post
Image for post

The processing was fast forwarded to create a short GIF. But you can observe that there’s a delay on every processing of item.

On a real-world application, the processing will probably take more than 5 seconds. I think an example of that is like a CI/CD server where it will clone, install, test and deploy an application. That’s probably one of the best examples that can benefit from this architecture style.

In case you need it, all the source code that we’ve been through can be found on this repository. https://github.com/jmaicaaan/tutorial-nestjs-rabbitmq

Congratulations! We have now handled long processing using Nest.JS and RabbitMQ 🎉

Written by

Improvise, Adapt, Overcome

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store