This code example tried to simplify kafka integration with nestjs.
Github location
https://github.com/rajeshkumarbehura/ts-nestjs-kafka
How to integrate-
Kafka module is created as global module in this project. To integrate your project, copy app/common/kakfa module and inject KafkaModule as app module.
KafkaModule.register({
clientId: 'test-app-client',
brokers: ['localhost:9092'],
groupId: 'test-app-group',
}
In my project, it is injected to top level in app.module.ts
Create A Kafka Payload
Before sending message, create a custom payload to send message
const payload: KafkaPayload = {
messageId: '' + new Date().valueOf(), // uuid
body: message,
messageType: 'Say.Hello',
topicName: 'hello.topic',
};
Send message to KafkaTopic
Inject KafkaService to your service or controller and call sendMessage.
const value = await this.kafkaService.sendMessage('hello.topic', payload);
Consumer Implementation
Extend AbstractKafkaConsumer class and implement registerTopic method.
Inside registerTopic, you only need to add topic names.
No need to inject any service, as its implemented globally.
@Injectable()
export class ConsumerService extends AbstractKafkaConsumer {
protected registerTopic() {
this.addTopic('hello.topic');
this.addTopic('hello.fixed.topic');
}
}
Subscribe to Topic when GroupId is not fixed
Add annotation to the method and define topic name as its parameter
@SubscribeTo('hello.topic')
helloSubscriber(payload: KafkaPayload) {
console.log('Print message after receiving', payload);
}
Subscribe to Topic when GroupId is fixed
When multiple containers or apps are running during horizontal scaling and your only one container/application required to listen to topic.
@SubscribeToFixedGroup('hello.fixed.topic')
helloSubscriber(payload: KafkaPayload) {
console.log('Print message after receiving', payload);
}
Top comments (9)
Thanks for your tutorial.
But i have a question, i try to inject ticketService in the Listener but the result i got is undefined.
how can i fix this?
dev-to-uploads.s3.amazonaws.com/up...
did you find a workaround for this? same thing is happening to me
Thanks so much for putting this example together. There are not many examples of Nestjs with Kafka.
When I run I get a warning "Response without match". As I'm very new to Kafka I'm not too sure what the message is attempting to express. Any thoughts?
After the seeing these messages finding closed issues on Kafka.js I did upgrade a number of packages. The project continued to work fine, but the messages remained. The follow upgrade seems to work the same
I dont see such kind of messages in my log, "Response without match". Remove kafka docker images, containers and try again.
Thanks for your tutorial.
Here, could you describe in detail the port 9092?
And so is this a Kafka server or client?
I checked the repo, but confusing because it's different than nestjs official documentation for Kafka.
Here in this example, I am running kafka server in docker container in local machine. (github.com/rajeshkumarbehura/ts-ne...) .
Application to kafka server communication happens using 9092 port.
I have updated readme file to make easy understanding. I added new Kafdrop UI docker-compose to monitor kafka.
Nestjs kafka documentation has different approach which was very confusing and make kafka integration more confuse.
Hi! where do I get AbstractKafkaConsumer class? It's not in the github repo or in nestjs kafka libraries.
Thanks for the guide!
On the process of applying your code, I tried to make the KafkaModule not global but it did not work.
Is it a must for making KafkaModule global? Then why?
Thanks again!!
Making KafkaModule as global like database connection, in every module you do not need to inject KafkaModule. If you do not want kafka module as global, then "class KafkaModule",. you need modify. Create as normal Module class as per nestjs