Here is the high-level architecture of this simple asynchronous processing example with 2 microservices.
Microservice 1 - is a REST microservice which receives data from a /POST HTTP call to it. After receiving the request, it retrieves the data from the HTTP request and saves it to Kafka. After saving, it responds to the caller with the same data sent via /POST
Microservice 2 - is a microservice which subscribes to a topic in Kafka, where Microservice 1 saves the data. Once a message is consumed by the microservice, it then saves the data to MongoDB.
First, you must configure your local environment for the Kafka server.
You must create a directory where the two projects are managed.
mkdir tsclean-kafka-mongo
After this, we generate the two projects for the microservices. We will continue using the NPM package @tsclean/scaffold
npm i -g @tsclean/scaffold
Microservice Rest
We created the first project that will manage the Rest microservice.
scaffold create:project --name=api
We start by creating the entity User.
scaffold create:entity --name=user
export type UserModel = {
id: string | number;
name: string;
email: string;
}
export type AddUserParams = Omit<UserModel, 'id'>
Then we create the interface that will communicate with the Kafka Adapter.
scaffold create:interface --name=add-user --path=models
import {AddUserParams, UserModel} from "@/domain/models/user";
export const ADD_USER_REPOSITORY = "ADD_USER_REPOSITORY";
export interface IAddUserRepository {
addUser:(data: AddUserParams) => Promise<UserModel>
}
We create the service where the business logic of the microservice is to go.
scaffold create:service --name=add-user
import {AddUserParams, UserModel} from "@/domain/models/user";
export const ADD_USER_SERVICE = "ADD_USER_SERVICE";
export interface IAddUserService {
addUser:(data: AddUserParams) => Promise<UserModel>
}
import {Adapter, Service} from "@tsclean/core";
import {IAddUserService} from "@/domain/use-cases/add-user-service";
import {AddUserParams, UserModel} from "@/domain/models/user";
import {ADD_USER_REPOSITORY, IAddUserRepository} from "@/domain/models/contracts/add-user-repository";
@Service()
export class AddUserServiceImpl implements IAddUserService {
constructor(
@Adapter(ADD_USER_REPOSITORY)
private readonly addUserRepository: IAddUserRepository
) {
}
async addUser(data: AddUserParams): Promise<UserModel> {
return await this.addUserRepository.addUser(data);
}
}
Now we create the Kafka adapter to communicate with the domain layer where the business logic is. We do this by means of one of the SOLID principles, Dependency Inversion.
scaffold create:adapter --name=kafka
import dotenv from "dotenv";
import {Kafka} from "kafkajs"
import {AddUserParams, UserModel} from "@/domain/models/user";
import {IAddUserRepository} from "@/domain/models/contracts/add-user-repository";
dotenv.config({path: ".env"})
export class KafkaAdapter implements IAddUserRepository {
public kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_ID, brokers: ["localhost:9092"]}
)
public producer = this.kafka.producer()
async addUser(data: AddUserParams): Promise<UserModel | any> {
const topic = process.env.KAFKA_TOPIC;
await this.producer.connect()
let i = 0
try {
await this.producer.send({
topic,
messages: [
{
key: String(i),
value: JSON.stringify(data),
},
],
})
i++
} catch (err) {
console.error("could not write message " + err)
}
}
}
We make the configurations in the file @/infrastructure/driven-adapters/providers/index.ts.
import {ADD_USER_SERVICE} from "@/domain/use-cases/add-user-service";
import {AddUserServiceImpl} from "@/domain/use-cases/impl/add-user-service-impl";
import {KafkaAdapter} from "@/infrastructure/driven-adapters/adapters/kafka-adapter";
import {ADD_USER_REPOSITORY} from "@/domain/models/contracts/add-user-repository";
export const adapters = [
{
provide: ADD_USER_REPOSITORY,
useClass: KafkaAdapter
}
]
export const services = [
{
provide: ADD_USER_SERVICE,
useClass: AddUserServiceImpl
}
]
We, create the controller with the microservice entry point.
scaffold create:controller --name=add-user
import {Mapping, Post, Body, Adapter} from "@tsclean/core";
import {AddUserParams, UserModel} from "@/domain/models/user";
import {ADD_USER_SERVICE, IAddUserService} from "@/domain/use-cases/add-user-service";
@Mapping('api/v1/add-user')
export class AddUserController {
constructor(
@Adapter(ADD_USER_SERVICE)
private readonly addUserService: IAddUserService
) {
}
@Post()
async addUserController(@Body() data: AddUserParams): Promise<UserModel | any> {
await this.addUserService.addUser(data);
return { message: "User created successfully" }
}
}
Finally, we include the configuration in the main container of the application.
import {Container} from "@tsclean/core";
import {controllers} from "@/infrastructure/entry-points/api";
import {adapters, services} from "@/infrastructure/driven-adapters/providers";
@Container({
providers: [...services, ...adapters],
controllers: [...controllers]
})
export class AppContainer {}
Microservice Kafka Mongo
The important thing in this code is the consumption from Kafka, so we have this decoupled microservice and in case of a change, do it only in this component.
We created the second project that will manage the Rest microservice.
scaffold create:project --name=kafka-mongo
We create the entity user to normalize the data to be persisted in Mongo
scaffold create:entity --name=user
export type UserModel = {
id: string | number;
name: string;
email: string;
}
export type AddUserParams = Omit<UserModel, 'id'>
Now we create the adapters for the consumption of the Rest microservice.
scaffold create:adapter-orm --name=user --orm=mongoose
import {model, Schema} from "mongoose";
import {UserModel} from '@/domain/models/user';
const schema = new Schema<UserModel>({
id: {
type: String
},
name: {
type: String
},
email: {
type: String
}
},
{
strict: false
}
);
export const UserModelSchema = model<UserModel>('users', schema);
This adapter will consume all messages arriving from the other microservice and store them in the database.
scaffold create:adapter --name=kafka
import {Kafka} from "kafkajs"
import dotenv from "dotenv";
import {UserModelSchema} from "@/infrastructure/driven-adapters/adapters/orm/mongoose/models/user";
dotenv.config({path: ".env"})
export class KafkaAdapter {
public kafka = new Kafka({clientId: process.env.KAFKA_CLIENT_ID, brokers: ["localhost:9092"]})
public consumer = this.kafka.consumer({ groupId: process.env.KAFKA_CLIENT_ID })
async consume(): Promise<any> {
const topic = process.env.KAFKA_TOPIC;
await this.consumer.connect()
await this.consumer.subscribe({topic})
await this.consumer.run({
eachMessage: async ({ message }) => {
await UserModelSchema.create(JSON.parse(message.value.toString()))
console.log(`User created successfully: ${message.value}`)
},
})
}
}
export const Consumer = new KafkaAdapter();
Finally, we import the adapter in the file that starts the application.
import 'module-alias/register'
import helmet from 'helmet';
import { connect } from 'mongoose';
import { StartProjectInit } from "@tsclean/core";
import { AppContainer } from "@/application/app";
import {MONGODB_URI} from "@/application/config/environment";
import {Consumer} from "@/infrastructure/driven-adapters/adapters/kafka-adapter";
async function run(): Promise<void> {
await connect(MONGODB_URI);
console.log('DB Mongo connected')
const app = await StartProjectInit.create(AppContainer);
app.use(helmet());
await Consumer.consume();
// This line is not necessary for the consumer
// await app.listen(PORT, () => console.log('Running on port: ' + PORT))
}
run().catch();
Application flow
We run the api microservice
npm run watch
Here is the log you will see in microservice rest. Once you see this, it means data has been received from Postman and saved to Kafka
Since we are not running microservice kafka-mongo yet, the data saved by microservice rest will only be in Kafka. Let's consume and save to MongoDB by running microservice kafka-mongo.
npm run watch
Now you will see that microservice kafka-mongo consumes the data and stores it in mongoDB.
Check if the data is stored in mongoDB. If it's there, we're good!
If you liked this blog post, follow me on Twitter @JohnpiedrahitaH and on
LinkedIn john-piedrahita-30593a179
Top comments (0)