loading...

Kafkajs & NestJS with Typescript Simplified Example

rajeshkumarbehura profile image Rajesh Kumar Updated on ・1 min read

This code example tried to simplify kafka integration with nestjs.

Github location

https://github.com/rajeshkumarbehura/ts-nestjs-kafka

How to integrate-

Kafka module is created as global module in this project. To integrate your project, copy app/common/kakfa module and inject KafkaModule as app module.

    KafkaModule.register({
      clientId: 'test-app-client',
      brokers: ['localhost:9092'],
      groupId: 'test-app-group',
    }

In my project, it is injected to top level in app.module.ts

Create A Kafka Payload

Before sending message, create a custom payload to send message

    const payload: KafkaPayload = {
      messageId: '' + new Date().valueOf(), // uuid
      body: message,
      messageType: 'Say.Hello',  
      topicName: 'hello.topic', 
    };

Send message to KafkaTopic

Inject KafkaService to your service or controller and call sendMessage.

const value = await this.kafkaService.sendMessage('hello.topic', payload);

Consumer Implementation

Extend AbstractKafkaConsumer class and implement registerTopic method.
Inside registerTopic, you only need to add topic names.
No need to inject any service, as its implemented globally.

@Injectable()
export class ConsumerService extends AbstractKafkaConsumer {
    protected registerTopic() {
        this.addTopic('hello.topic');  
        this.addTopic('hello.fixed.topic');
    }
}

Subscribe to Topic when GroupId is not fixed

Add annotation to the method and define topic name as its parameter

 @SubscribeTo('hello.topic')
 helloSubscriber(payload: KafkaPayload) {
        console.log('Print message after receiving', payload);
 }

Subscribe to Topic when GroupId is fixed

When multiple containers or apps are running during horizontal scaling and your only one container/application required to listen to topic.

 @SubscribeToFixedGroup('hello.fixed.topic')
 helloSubscriber(payload: KafkaPayload) {
        console.log('Print message after receiving', payload);
 }

Discussion

markdown guide
 

Thanks so much for putting this example together. There are not many examples of Nestjs with Kafka.
When I run I get a warning "Response without match". As I'm very new to Kafka I'm not too sure what the message is attempting to express. Any thoughts?
Response without match warning

After the seeing these messages finding closed issues on Kafka.js I did upgrade a number of packages. The project continued to work fine, but the messages remained. The follow upgrade seems to work the same

diff --git a/package.json b/package.json
index 65f64a9..317ca0c 100644
--- a/package.json
+++ b/package.json
@@ -20,31 +20,31 @@
     "test:e2e": "jest --config ./test/jest-e2e.json"
   },
   "dependencies": {
-    "@nestjs/common": "^6.7.2",
-    "@nestjs/core": "^6.7.2",
-    "@nestjs/platform-express": "^6.7.2",
+    "@nestjs/common": "^7.4.3",
+    "@nestjs/core": "^7.4.3",
+    "@nestjs/platform-express": "^7.4.3",
     "kafkajs": "^1.12.0",
     "reflect-metadata": "^0.1.13",
-    "rimraf": "^3.0.0",
-    "rxjs": "^6.5.3"
+    "rimraf": "^3.0.2",
+    "rxjs": "^6.6.2"
   },
   "devDependencies": {
-    "@nestjs/cli": "^6.9.0",
-    "@nestjs/schematics": "^6.7.0",
-    "@nestjs/testing": "^6.7.1",
-    "@types/express": "^4.17.1",
-    "@types/jest": "^24.0.18",
-    "@types/node": "^12.7.5",
-    "@types/supertest": "^2.0.8",
-    "jest": "^24.9.0",
-    "prettier": "^1.18.2",
+    "@nestjs/cli": "^7.5.1",
+    "@nestjs/schematics": "^7.1.1",
+    "@nestjs/testing": "^7.4.3",
+    "@types/express": "^4.17.7",
+    "@types/jest": "^26.0.10",
+    "@types/node": "^14.6.1",
+    "@types/supertest": "^2.0.10",
+    "jest": "^26.4.2",
+    "prettier": "^2.1.1",
     "supertest": "^4.0.2",
-    "ts-jest": "^24.1.0",
-    "ts-loader": "^6.1.1",
-    "ts-node": "^8.4.1",
+    "ts-jest": "^26.3.0",
+    "ts-loader": "^8.0.3",
+    "ts-node": "^9.0.0",
     "tsconfig-paths": "^3.9.0",
-    "tslint": "^5.20.0",
-    "typescript": "^3.6.3"
+    "tslint": "^6.1.3",
+    "typescript": "^4.0.2"
   },
   "jest": {
     "moduleFileExtensions": [
 

I dont see such kind of messages in my log, "Response without match". Remove kafka docker images, containers and try again.

 

Thanks for your tutorial.
Here, could you describe in detail the port 9092?
And so is this a Kafka server or client?
I checked the repo, but confusing because it's different than nestjs official documentation for Kafka.

 

Here in this example, I am running kafka server in docker container in local machine. (github.com/rajeshkumarbehura/ts-ne...) .
Application to kafka server communication happens using 9092 port.
I have updated readme file to make easy understanding. I added new Kafdrop UI docker-compose to monitor kafka.
Nestjs kafka documentation has different approach which was very confusing and make kafka integration more confuse.

 

Thanks for the guide!
On the process of applying your code, I tried to make the KafkaModule not global but it did not work.
Is it a must for making KafkaModule global? Then why?
Thanks again!!

 

Making KafkaModule as global like database connection, in every module you do not need to inject KafkaModule. If you do not want kafka module as global, then "class KafkaModule",. you need modify. Create as normal Module class as per nestjs