DEV Community

John Piedrahita
John Piedrahita

Posted on

Microservices using Kafka and clean architecture

Here is the high-level architecture of this simple asynchronous processing example with 2 microservices.

Image 1

Microservice 1 - is a REST microservice which receives data from a /POST HTTP call to it. After receiving the request, it retrieves the data from the HTTP request and saves it to Kafka. After saving, it responds to the caller with the same data sent via /POST

Microservice 2 - is a microservice which subscribes to a topic in Kafka, where Microservice 1 saves the data. Once a message is consumed by the microservice, it then saves the data to MongoDB.

First, you must configure your local environment for the Kafka server.

You must create a directory where the two projects are managed.

mkdir tsclean-kafka-mongo
Enter fullscreen mode Exit fullscreen mode

After this, we generate the two projects for the microservices. We will continue using the NPM package @tsclean/scaffold

npm i -g @tsclean/scaffold
Enter fullscreen mode Exit fullscreen mode

Microservice Rest

We created the first project that will manage the Rest microservice.

scaffold create:project --name=api
Enter fullscreen mode Exit fullscreen mode

We start by creating the entity User.

scaffold create:entity --name=user
Enter fullscreen mode Exit fullscreen mode
export type UserModel = {
    id: string | number;
    name: string;
    email: string;
}

export type AddUserParams = Omit<UserModel, 'id'>
Enter fullscreen mode Exit fullscreen mode

Then we create the interface that will communicate with the Kafka Adapter.

scaffold create:interface --name=add-user
Enter fullscreen mode Exit fullscreen mode
import {AddUserParams, UserModel} from "@/domain/models/user";

export const ADD_USER_REPOSITORY = "ADD_USER_REPOSITORY";

export interface IAddUserRepository {
    addUser:(data: AddUserParams) => Promise<UserModel>
}
Enter fullscreen mode Exit fullscreen mode

We create the service where the business logic of the microservice is to go.

scaffold create:service --name=add-user
Enter fullscreen mode Exit fullscreen mode
import {AddUserParams, UserModel} from "@/domain/models/user";

export const ADD_USER_SERVICE = "ADD_USER_SERVICE";

export interface IAddUserService {
    addUser:(data: AddUserParams) => Promise<UserModel>
}
Enter fullscreen mode Exit fullscreen mode
import {Adapter, Service} from "@tsclean/core";
import {IAddUserService} from "@/domain/use-cases/add-user-service";
import {AddUserParams, UserModel} from "@/domain/models/user";
import {ADD_USER_REPOSITORY, IAddUserRepository} from "@/domain/models/contracts/add-user-repository";

@Service()
export class AddUserServiceImpl implements IAddUserService {
    constructor(
        @Adapter(ADD_USER_REPOSITORY)
        private readonly addUserRepository: IAddUserRepository
    ) {
    }

    async addUser(data: AddUserParams): Promise<UserModel> {
        return await this.addUserRepository.addUser(data);
    }
}
Enter fullscreen mode Exit fullscreen mode

Now we create the Kafka adapter to communicate with the domain layer where the business logic is. We do this by means of one of the SOLID principles, Dependency Inversion.

scaffold create:adapter --name=kafka
Enter fullscreen mode Exit fullscreen mode
import dotenv from "dotenv";
import {Kafka} from "kafkajs"

import {AddUserParams, UserModel} from "@/domain/models/user";
import {IAddUserRepository} from "@/domain/models/contracts/add-user-repository";

dotenv.config({path: ".env"})

export class KafkaAdapter implements IAddUserRepository {

    public kafka = new Kafka({
        clientId: process.env.KAFKA_CLIENT_ID, brokers: ["localhost:9092"]}
    )
    public producer = this.kafka.producer()

    async addUser(data: AddUserParams): Promise<UserModel | any> {
        const topic = process.env.KAFKA_TOPIC;
        await this.producer.connect()

        let i = 0

        try {
            await this.producer.send({
                topic,
                messages: [
                    {
                        key: String(i),
                        value: JSON.stringify(data),
                    },
                ],
            })
            i++
        } catch (err) {
            console.error("could not write message " + err)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

We make the configurations in the file @/infrastructure/driven-adapters/providers/index.ts.

import {ADD_USER_SERVICE} from "@/domain/use-cases/add-user-service";
import {AddUserServiceImpl} from "@/domain/use-cases/impl/add-user-service-impl";
import {KafkaAdapter} from "@/infrastructure/driven-adapters/adapters/kafka-adapter";
import {ADD_USER_REPOSITORY} from "@/domain/models/contracts/add-user-repository";

export const adapters = [
    {
        provide: ADD_USER_REPOSITORY,
        useClass: KafkaAdapter
    }
]

export const services = [
    {
        provide: ADD_USER_SERVICE,
        useClass: AddUserServiceImpl
    }
]
Enter fullscreen mode Exit fullscreen mode

We, create the controller with the microservice entry point.

scaffold create:controller --name=add-user
Enter fullscreen mode Exit fullscreen mode
import {Mapping, Post, Body, Adapter} from "@tsclean/core";
import {AddUserParams, UserModel} from "@/domain/models/user";
import {ADD_USER_SERVICE, IAddUserService} from "@/domain/use-cases/add-user-service";

@Mapping('api/v1/add-user')
export class AddUserController {

    constructor(
        @Adapter(ADD_USER_SERVICE)
        private readonly addUserService: IAddUserService
    ) {
    }

    @Post()
    async addUserController(@Body() data: AddUserParams): Promise<UserModel | any> {
        await this.addUserService.addUser(data);
        return { message: "User created successfully" }
    }
}
Enter fullscreen mode Exit fullscreen mode

Finally, we include the configuration in the main container of the application.

import {Container} from "@tsclean/core";
import {controllers} from "@/infrastructure/entry-points/api";
import {adapters, services} from "@/infrastructure/driven-adapters/providers";

@Container({
    providers: [...services, ...adapters],
    controllers: [...controllers]
})

export class AppContainer {}
Enter fullscreen mode Exit fullscreen mode

Microservice Kafka Mongo

The important thing in this code is the consumption from Kafka, so we have this decoupled microservice and in case of a change, do it only in this component.

We created the second project that will manage the Rest microservice.

scaffold create:project --name=kafka-mongo
Enter fullscreen mode Exit fullscreen mode

We create the entity user to normalize the data to be persisted in Mongo

scaffold create:entity --name=user
Enter fullscreen mode Exit fullscreen mode
export type UserModel = {
    id: string | number;
    name: string;
    email: string;
}

export type AddUserParams = Omit<UserModel, 'id'>
Enter fullscreen mode Exit fullscreen mode

Now we create the adapters for the consumption of the Rest microservice.

scaffold create:adapter-orm --name=user --orm=mongoose
Enter fullscreen mode Exit fullscreen mode
import {model, Schema} from "mongoose";
import {UserModel} from '@/domain/models/user';

const schema = new Schema<UserModel>({
        id: {
            type: String
        },
        name: {
            type: String
        },
        email: {
            type: String
        }
    },
    {
        strict: false
    }
);

export const UserModelSchema = model<UserModel>('users', schema);
Enter fullscreen mode Exit fullscreen mode

This adapter will consume all messages arriving from the other microservice and store them in the database.

scaffold create:adapter --name=kafka
Enter fullscreen mode Exit fullscreen mode
import {Kafka} from "kafkajs"
import dotenv from "dotenv";
import {UserModelSchema} from "@/infrastructure/driven-adapters/adapters/orm/mongoose/models/user";

dotenv.config({path: ".env"})

export class KafkaAdapter {

    public kafka = new Kafka({clientId: process.env.KAFKA_CLIENT_ID, brokers: ["localhost:9092"]})
    public consumer = this.kafka.consumer({ groupId: process.env.KAFKA_CLIENT_ID })

    async consume(): Promise<any> {
        const topic = process.env.KAFKA_TOPIC;

        await this.consumer.connect()
        await this.consumer.subscribe({topic})

        await this.consumer.run({
            eachMessage: async ({ message }) => {
                await UserModelSchema.create(JSON.parse(message.value.toString()))
                console.log(`User created successfully: ${message.value}`)
            },
        })
    }
}

export const Consumer = new KafkaAdapter();
Enter fullscreen mode Exit fullscreen mode

Finally, we import the adapter in the file that starts the application.

import 'module-alias/register'

import helmet from 'helmet';
import { connect } from 'mongoose';
import { StartProjectInit } from "@tsclean/core";

import { AppContainer } from "@/application/app";
import {MONGODB_URI} from "@/application/config/environment";
import {Consumer} from "@/infrastructure/driven-adapters/adapters/kafka-adapter";

async function run(): Promise<void> {
  await connect(MONGODB_URI);
  console.log('DB Mongo connected')
  const app = await StartProjectInit.create(AppContainer);
  app.use(helmet());
  await Consumer.consume();
  // This line is not necessary for the consumer
  // await app.listen(PORT, () => console.log('Running on port: ' + PORT))
}

run().catch();
Enter fullscreen mode Exit fullscreen mode

Application flow

We run the api microservice

npm run watch
Enter fullscreen mode Exit fullscreen mode

Image 2

Here is the log you will see in microservice rest. Once you see this, it means data has been received from Postman and saved to Kafka

Image 3

Since we are not running microservice kafka-mongo yet, the data saved by microservice rest will only be in Kafka. Let's consume and save to MongoDB by running microservice kafka-mongo.

npm run watch
Enter fullscreen mode Exit fullscreen mode

Image 4

Now you will see that microservice kafka-mongo consumes the data and stores it in mongoDB.

Image 5

Check if the data is stored in mongoDB. If it's there, we're good!

Image 6

If you liked this blog post, follow me on Twitter @JohnpiedrahitaH and on
LinkedIn john-piedrahita-30593a179

Discussion (0)