DEV Community

Rajesh Kumar
Rajesh Kumar

Posted on • Updated on

Kafkajs & NestJS with Typescript Simplified Example

This code example tried to simplify kafka integration with nestjs.

Github location

https://github.com/rajeshkumarbehura/ts-nestjs-kafka
Enter fullscreen mode Exit fullscreen mode

How to integrate-

Kafka module is created as global module in this project. To integrate your project, copy app/common/kakfa module and inject KafkaModule as app module.

    KafkaModule.register({
      clientId: 'test-app-client',
      brokers: ['localhost:9092'],
      groupId: 'test-app-group',
    }
Enter fullscreen mode Exit fullscreen mode

In my project, it is injected to top level in app.module.ts

Create A Kafka Payload

Before sending message, create a custom payload to send message

    const payload: KafkaPayload = {
      messageId: '' + new Date().valueOf(), // uuid
      body: message,
      messageType: 'Say.Hello',  
      topicName: 'hello.topic', 
    };
Enter fullscreen mode Exit fullscreen mode

Send message to KafkaTopic

Inject KafkaService to your service or controller and call sendMessage.

const value = await this.kafkaService.sendMessage('hello.topic', payload);
Enter fullscreen mode Exit fullscreen mode

Consumer Implementation

Extend AbstractKafkaConsumer class and implement registerTopic method.
Inside registerTopic, you only need to add topic names.
No need to inject any service, as its implemented globally.

@Injectable()
export class ConsumerService extends AbstractKafkaConsumer {
    protected registerTopic() {
        this.addTopic('hello.topic');  
        this.addTopic('hello.fixed.topic');
    }
}
Enter fullscreen mode Exit fullscreen mode

Subscribe to Topic when GroupId is not fixed

Add annotation to the method and define topic name as its parameter

 @SubscribeTo('hello.topic')
 helloSubscriber(payload: KafkaPayload) {
        console.log('Print message after receiving', payload);
 }
Enter fullscreen mode Exit fullscreen mode

Subscribe to Topic when GroupId is fixed

When multiple containers or apps are running during horizontal scaling and your only one container/application required to listen to topic.

 @SubscribeToFixedGroup('hello.fixed.topic')
 helloSubscriber(payload: KafkaPayload) {
        console.log('Print message after receiving', payload);
 }
Enter fullscreen mode Exit fullscreen mode

Top comments (9)

Collapse
 
gknguyen profile image
GK Nguyen

Thanks for your tutorial.
But i have a question, i try to inject ticketService in the Listener but the result i got is undefined.
how can i fix this?

dev-to-uploads.s3.amazonaws.com/up...

Collapse
 
emineminero profile image
Javier Laurenzano

did you find a workaround for this? same thing is happening to me

Collapse
 
cramhead profile image
Marc d'Entremont • Edited

Thanks so much for putting this example together. There are not many examples of Nestjs with Kafka.
When I run I get a warning "Response without match". As I'm very new to Kafka I'm not too sure what the message is attempting to express. Any thoughts?
Response without match warning

After the seeing these messages finding closed issues on Kafka.js I did upgrade a number of packages. The project continued to work fine, but the messages remained. The follow upgrade seems to work the same

diff --git a/package.json b/package.json
index 65f64a9..317ca0c 100644
--- a/package.json
+++ b/package.json
@@ -20,31 +20,31 @@
     "test:e2e": "jest --config ./test/jest-e2e.json"
   },
   "dependencies": {
-    "@nestjs/common": "^6.7.2",
-    "@nestjs/core": "^6.7.2",
-    "@nestjs/platform-express": "^6.7.2",
+    "@nestjs/common": "^7.4.3",
+    "@nestjs/core": "^7.4.3",
+    "@nestjs/platform-express": "^7.4.3",
     "kafkajs": "^1.12.0",
     "reflect-metadata": "^0.1.13",
-    "rimraf": "^3.0.0",
-    "rxjs": "^6.5.3"
+    "rimraf": "^3.0.2",
+    "rxjs": "^6.6.2"
   },
   "devDependencies": {
-    "@nestjs/cli": "^6.9.0",
-    "@nestjs/schematics": "^6.7.0",
-    "@nestjs/testing": "^6.7.1",
-    "@types/express": "^4.17.1",
-    "@types/jest": "^24.0.18",
-    "@types/node": "^12.7.5",
-    "@types/supertest": "^2.0.8",
-    "jest": "^24.9.0",
-    "prettier": "^1.18.2",
+    "@nestjs/cli": "^7.5.1",
+    "@nestjs/schematics": "^7.1.1",
+    "@nestjs/testing": "^7.4.3",
+    "@types/express": "^4.17.7",
+    "@types/jest": "^26.0.10",
+    "@types/node": "^14.6.1",
+    "@types/supertest": "^2.0.10",
+    "jest": "^26.4.2",
+    "prettier": "^2.1.1",
     "supertest": "^4.0.2",
-    "ts-jest": "^24.1.0",
-    "ts-loader": "^6.1.1",
-    "ts-node": "^8.4.1",
+    "ts-jest": "^26.3.0",
+    "ts-loader": "^8.0.3",
+    "ts-node": "^9.0.0",
     "tsconfig-paths": "^3.9.0",
-    "tslint": "^5.20.0",
-    "typescript": "^3.6.3"
+    "tslint": "^6.1.3",
+    "typescript": "^4.0.2"
   },
   "jest": {
     "moduleFileExtensions": [
Collapse
 
rajeshkumarbehura profile image
Rajesh Kumar

I dont see such kind of messages in my log, "Response without match". Remove kafka docker images, containers and try again.

Collapse
 
prettydev profile image
Max

Thanks for your tutorial.
Here, could you describe in detail the port 9092?
And so is this a Kafka server or client?
I checked the repo, but confusing because it's different than nestjs official documentation for Kafka.

Collapse
 
rajeshkumarbehura profile image
Rajesh Kumar

Here in this example, I am running kafka server in docker container in local machine. (github.com/rajeshkumarbehura/ts-ne...) .
Application to kafka server communication happens using 9092 port.
I have updated readme file to make easy understanding. I added new Kafdrop UI docker-compose to monitor kafka.
Nestjs kafka documentation has different approach which was very confusing and make kafka integration more confuse.

Collapse
 
emineminero profile image
Javier Laurenzano

Hi! where do I get AbstractKafkaConsumer class? It's not in the github repo or in nestjs kafka libraries.

Collapse
 
khavng profile image
Kha Nguyen

Thanks for the guide!
On the process of applying your code, I tried to make the KafkaModule not global but it did not work.
Is it a must for making KafkaModule global? Then why?
Thanks again!!

Collapse
 
rajeshkumarbehura profile image
Rajesh Kumar

Making KafkaModule as global like database connection, in every module you do not need to inject KafkaModule. If you do not want kafka module as global, then "class KafkaModule",. you need modify. Create as normal Module class as per nestjs