DEV Community

Igor Dubinin
Igor Dubinin

Posted on • Updated on

Microservices for beginners. Message service. Nest js. Mongodb. Kafka.

Whole series:

Microservices for beginners

Microservices for beginners. Front-end service. Vue js. Socket.io.

Microservices for beginners. Api Gateway service. Nest js. Kafka.

Microservices for beginners. User service. Nest js. Mongodb. Kafka.

Microservices for beginners. Message service. Nest js. Mongodb. Kafka.

Microservices for beginners. Spam service. Python. Scikit-learn. Kafka.

Microservices for beginners. Toxic service. Python. Tensorflow. Kafka.

Microservices for beginners. Common code. Typescript.

Message service provides functionality for messages in application, this service contains all messages and chat rooms. I use Nest.js for backend, Mongo database and Kafka as message broker.

Full code - link

Whole scheme:

Containers

Short description:

  • User opens front-end application in web browser and joins chat room, front-end emits an event to the api gateway by socket.io.
  • Api gateway gets chat data from the message service by http request and emits this to the front-end.
  • For the messaging, front end service communicates with api gateway by socket.io.
  • Api gateway implements publish-subscribe pattern to emit raw message events for listeners, through kafka message broker.
  • Message service receives raw message events, saves messages and emits events with saved messages.
  • Api gateway receives saved messages and emits this to the front-end application.
  • Also message service subscribes to analysis events from spam and toxic services, and saves analises for the messages.

Scheme of message service:

Code

main.ts - initialization of service. I use swagger as documentation and REST client for testing of http requests.

import { AppModule } from './app.module';
import { NestFactory } from '@nestjs/core';
import { ConfigService } from '@nestjs/config';
import * as basicAuth from 'express-basic-auth';
import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const configService: ConfigService = app.get(ConfigService);

  app.use(
    ['/swagger'],
    basicAuth({
      challenge: true,
      users: {
        [configService.get<string>('DOC_USER')]:
          configService.get<string>('DOC_PASS'),
      },
    }),
  );

  const swaggerConfig = new DocumentBuilder()
    .setTitle('Message Service')
    .setDescription('Message Service API description')
    .setVersion('0.0.1')
    .build();

  const document = SwaggerModule.createDocument(app, swaggerConfig);

  SwaggerModule.setup('swagger', app, document, {
    swaggerOptions: {
      persistAuthorization: true,
    },
  });

  await app.listen(configService.get<string>('APP_PORT'));
}

bootstrap();
Enter fullscreen mode Exit fullscreen mode

app.module.ts - describes modules in service:

  • ConfigModule - provides configuration functionality
  • MongooseModule - provides Mongoose ODM functionality
  • MessagesRepoModule - provides communication with database through Mongoose ODM
  • MessagesModule - implements application and domain logics of service
import { Module } from '@nestjs/common';
import { MongooseModule } from '@nestjs/mongoose';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { AppService } from './app.service';
import { AppController } from './app.controller';
import { MessagesModule } from './modules/messages/messages.module';
import { MessagesRepoModule } from './modules/messages-repo/messages.repo.module';

@Module({
  imports: [
    ConfigModule.forRoot({
      envFilePath: [
        __dirname + '/../config/.env.prod',
        __dirname + '/../config/.env.dev',
      ],
      isGlobal: true,
    }),
    MongooseModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: async (configService: ConfigService) => {
        return { uri: configService.get<string>('MONGO_URI') };
      },
    }),
    MessagesRepoModule,
    MessagesModule,
  ],
  controllers: [AppController],
  providers: [AppService],
})

export class AppModule {}
Enter fullscreen mode Exit fullscreen mode

Messages module

messages.module.ts - describes messages module

import { Module } from '@nestjs/common';
import { MessagesService } from './messages.service';
import { MessagesController } from './messages.controller';
import { KafkaController } from './kafka.controller';
import { MessagesRepoModule } from 'src/modules/messages-repo/messages.repo.module';

@Module({
  providers: [MessagesService],
  controllers: [MessagesController, KafkaController],
  imports: [MessagesRepoModule],
  exports: [MessagesService],
})
export class MessagesModule {}

Enter fullscreen mode Exit fullscreen mode

messages.controller.ts - provides http requests to receive data about chat rooms

import { Controller, Get, UsePipes, HttpStatus, Query } from '@nestjs/common';
import { ApiOperation, ApiResponse, ApiTags } from '@nestjs/swagger';
import { PrivateRoomQueryDto, RoomDataDto } from 'dto-common';
import { JoiValidationPipe } from 'src/pipes/joi.validation.pipe';
import { MessagesService } from './messages.service';
import { privateRoomQueryJoi } from './messages.joi';

@Controller('messages')
export class MessagesController {
  constructor(private messagesService: MessagesService) {}

  @Get('/get-private-room')
  @ApiOperation({ summary: 'Get user by id' })
  @ApiTags('Users')
  @ApiResponse({
    status: HttpStatus.OK,
    description: 'Success',
    type: RoomDataDto,
  })
  @UsePipes(new JoiValidationPipe(privateRoomQueryJoi))
  getPrivateRoom(@Query() params: PrivateRoomQueryDto): Promise<RoomDataDto> {
    return this.messagesService.getPrivateRoom(params);
  }
}
Enter fullscreen mode Exit fullscreen mode

messages.joi.ts - provides validation for messages.controller api

import * as Joi from 'joi';

export const privateRoomQueryJoi = Joi.object({
  userIds: Joi.array().length(2).items(Joi.string().length(24)),
});
Enter fullscreen mode Exit fullscreen mode

kafka.controller.ts - provides publish-subscribe functionality for communication with other services

import { Kafka, Producer, Consumer, KafkaMessage } from 'kafkajs';
import { ConfigService } from '@nestjs/config';
import {
  Controller,
  OnModuleDestroy,
  OnModuleInit,
  Logger,
} from '@nestjs/common';
import { MessagesService } from './messages.service';
import { MessageWebDto, messageAnalysisDto } from 'dto-common';

@Controller()
export class KafkaController implements OnModuleInit, OnModuleDestroy {
  constructor(
    private configService: ConfigService,
    private messagesService: MessagesService,
  ) {}

  private readonly logger = new Logger(KafkaController.name);

  private readonly kafka: Kafka = new Kafka({
    clientId: 'messages',
    brokers: [this.configService.get<string>('KAFKA_URI')],
  });

  private readonly producer: Producer = this.kafka.producer();
  private readonly consumer: Consumer = this.kafka.consumer({
    groupId: this.configService.get<string>('KAFKA_RAW_MESSAGE_GROUP'),
  });
  private readonly analysisConsumer: Consumer = this.kafka.consumer({
    groupId: this.configService.get<string>('KAFKA_ANALYSIS_MESSAGE_GROUP'),
  });

  async onModuleInit() {
    try {
      await this.producer.connect();

      await this.consumer.connect();
      await this.consumer.subscribe({
        topic: this.configService.get<string>('KAFKA_RAW_MESSAGE_TOPIC'),
        fromBeginning: true,
      });
      await this.analysisConsumer.subscribe({
        topic: this.configService.get<string>('KAFKA_ANALYSIS_MESSAGE_TOPIC'),
        fromBeginning: true,
      });

      await this.consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          this.receiveMessage(message);
        },
      });
      await this.analysisConsumer.run({
        eachMessage: async ({ topic, partition, message }) => {
          this.receiveAnalysis(message);
        },
      });
    } catch (error) {
      this.logger.error(error);
    }
  }

  async onModuleDestroy() {
    try {
      await this.producer.disconnect();
      await this.consumer.disconnect();
      await this.analysisConsumer.disconnect();
    } catch (error) {
      this.logger.error(error);
    }
  }

  async receiveMessage(params: KafkaMessage) {
    try {
      const messageValue = JSON.parse(params.value.toString());
      const { uuid, message, room_id, user_id, created_at } = messageValue;
      const readyMessage: MessageWebDto =
        await this.messagesService.receiveMessage({
          uuid,
          message,
          room_id,
          user_id,
          created_at,
        });

      await this.producer.send({
        topic: this.configService.get<string>('KAFKA_READY_MESSAGE_TOPIC'),
        messages: [
          {
            key: room_id,
            value: JSON.stringify(readyMessage),
          },
        ],
      });
    } catch (error) {
      this.logger.error(error);
    }
  }

  async receiveAnalysis(params: KafkaMessage) {
    try {
      const messageValue: { id: string; analysis: messageAnalysisDto } =
        JSON.parse(params.value.toString());
      const { id, analysis } = messageValue;
      await this.messagesService.receiveAnalysis({ id, analysis });
    } catch (error) {
      this.logger.error(error);
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

messages.service.ts - provides application and domain logic

import { Injectable, ForbiddenException } from '@nestjs/common';
import {
  RoomWebDto,
  PrivateRoomQueryDto,
  MessageWebDto,
  RoomDataDto,
  messageAnalysisDto,
} from 'dto-common';
import { MessagesRepoService } from 'src/modules/messages-repo/messages.repo.service';

@Injectable()
export class MessagesService {
  constructor(private messagesRepoService: MessagesRepoService) {}

  async getPrivateRoom(
    param: PrivateRoomQueryDto,
  ): Promise<RoomDataDto | undefined> {
    let privateRoom = await this.messagesRepoService.getPrivateRoom(param);

    if (!privateRoom)
      privateRoom = await this.messagesRepoService.createPrivateRoom(param);

    const messages = await this.messagesRepoService.getRoomMessages({
      roomId: privateRoom.id,
    });

    return {
      room: privateRoom,
      messages: messages?.reverse(),
    };
  }

  async receiveMessage(
    param: MessageWebDto,
  ): Promise<MessageWebDto | undefined> {
    const { id, user_id } = param;
    const messageRoom: RoomWebDto = await this.messagesRepoService.getUserRoom({
      id,
      user_id,
    });

    if (!messageRoom) {
      throw new ForbiddenException(`Room ${id} forbidden for user ${user_id}`);
    }

    const newMessage = await this.messagesRepoService.saveMessage(param);

    return newMessage;
  }

  async receiveAnalysis(param: { id: string; analysis: messageAnalysisDto }) {
    return this.messagesRepoService.saveAnalysis(param);
  }
}
Enter fullscreen mode Exit fullscreen mode

Messages repo module

messages.repo.module.ts - describes module

import { Module } from '@nestjs/common';
import { MongooseModule } from '@nestjs/mongoose';
import { MessagesRepoService } from './messages.repo.service';
import { Message, MessageSchema } from './messages.schema';
import { Room, RoomSchema } from './rooms.schema';

@Module({
  providers: [MessagesRepoService],
  exports: [MessagesRepoService],
  imports: [
    MongooseModule.forFeature([
      { name: Message.name, schema: MessageSchema },
      { name: Room.name, schema: RoomSchema },
    ]),
  ],
})
export class MessagesRepoModule {}
Enter fullscreen mode Exit fullscreen mode

messages.schema.ts - contains Mongoose schema for messages collection in Mongo database.

import { HydratedDocument } from 'mongoose';
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose';
import { messageAnalysisDto } from 'dto-common';

export type MessageDocument = HydratedDocument<Message>;

@Schema()
export class Message {
  @Prop({ type: String })
  uuid: string;

  @Prop({ type: String })
  message: string;

  @Prop({ type: String })
  room_id: string;

  @Prop({ type: String })
  user_id: string;

  @Prop({ type: Date })
  created_at: Date;

  @Prop({ type: Object })
  analysis?: messageAnalysisDto;
}

export const MessageSchema = SchemaFactory.createForClass(Message);
Enter fullscreen mode Exit fullscreen mode

rooms.schema.ts - contains Mongoose schema for chat rooms collection in Mongo database.

import { HydratedDocument } from 'mongoose';
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose';

export type RoomDocument = HydratedDocument<Room>;

@Schema()
export class Room {
  @Prop({ type: String })
  type: string;

  @Prop({ type: Array })
  user_ids: string[];

  @Prop({ type: Date })
  created_at: Date;
}

export const RoomSchema = SchemaFactory.createForClass(Room);
Enter fullscreen mode Exit fullscreen mode

messages.repo.service.ts - contains database requests for messages module

import { Model } from 'mongoose';
import { Injectable } from '@nestjs/common';
import { InjectModel } from '@nestjs/mongoose';
import { Room, RoomDocument } from './rooms.schema';
import { Message, MessageDocument } from './messages.schema';
import {
  RoomWebDto,
  MessageWebDto,
  PrivateRoomQueryDto,
  UserRoomQueryDto,
  messageAnalysisDto,
  messageDto,
} from 'dto-common';

@Injectable()
export class MessagesRepoService {
  constructor(
    @InjectModel(Room.name) private roomModel: Model<RoomDocument>,
    @InjectModel(Message.name) private messageModel: Model<MessageDocument>,
  ) {}

  getWebRoomDto(roomDoc: RoomDocument): RoomWebDto {
    const { _id, user_ids, type, created_at } = roomDoc;
    return {
      id: _id.toString(),
      user_ids: user_ids.map((id) => id.toString()),
      type,
      created_at,
    };
  }

  getWebMessageDto(messageDoc: MessageDocument): MessageWebDto {
    const { _id, uuid, message, room_id, user_id, created_at } = messageDoc;
    return {
      id: _id.toString(),
      uuid,
      message,
      room_id: room_id.toString(),
      user_id: user_id.toString(),
      created_at,
    };
  }

  async getPrivateRoom(
    param: PrivateRoomQueryDto,
  ): Promise<RoomWebDto | undefined> {
    const room: RoomDocument = await this.roomModel.findOne({
      user_ids: { $all: param.userIds },
      type: 'private',
    });

    return room ? this.getWebRoomDto(room) : room;
  }

  async createPrivateRoom(
    param: PrivateRoomQueryDto,
  ): Promise<RoomWebDto | undefined> {
    const room: RoomDocument = await this.roomModel.create({
      type: 'private',
      created_at: new Date(),
      user_ids: param.userIds,
    });
    await room.save();

    return this.getWebRoomDto(room);
  }

  async getUserRoom(param: UserRoomQueryDto): Promise<RoomWebDto | undefined> {
    const { id, user_id } = param;
    const room: RoomDocument = await this.roomModel.findOne({
      id,
      user_ids: user_id,
    });

    return room ? this.getWebRoomDto(room) : room;
  }

  async saveMessage(param: MessageWebDto): Promise<MessageWebDto | undefined> {
    const { uuid, message, room_id, user_id, created_at } = param;
    const newMessage: MessageDocument = new this.messageModel({
      uuid,
      message,
      room_id,
      user_id,
      created_at,
    });
    await newMessage.save();

    return this.getWebMessageDto(newMessage);
  }

  async getRoomMessages(param: {
    roomId: string;
  }): Promise<MessageWebDto[] | undefined> {
    const messages: MessageWebDto[] = await this.messageModel
      .find({ room_id: param.roomId })
      .sort({ _id: -1 })
      .limit(100);

    return messages?.length
      ? messages.map((message: MessageDocument) =>
          this.getWebMessageDto(message),
        )
      : [];
  }

  async saveAnalysis(param: { id: string; analysis: messageAnalysisDto }) {
    const { id, analysis } = param;

    const message = await this.messageModel.findById(id);
    if (!message) {
      return;
    }
    message.analysis = {
      ...(message.analysis || {}),
      ...analysis,
    };
    await message.save();
  }
}
Enter fullscreen mode Exit fullscreen mode

Additionally:

How to install Mongo from docker:

Download docker container, launch with required properties, and open console:

docker pull mongo
docker run -d --name micro-mongo -p 27017:27017 -e MONGO_INITDB_ROOT_USERNAME=micro_user -e MONGO_INITDB_ROOT_PASSWORD=micro_pwd mongo
docker exec -it micro-mongo mongosh
Enter fullscreen mode Exit fullscreen mode

Settings for database and text index:

use admin
db.auth('micro_user', 'micro_pwd')

use micro_messages
db.createUser(
  {
    user: 'micro_user',
    pwd: 'micro_pwd',
    roles: [
       { db: 'micro_messages', role: 'dbAdmin', },
       { db: 'micro_messages', role: 'readWrite', }
    ]
  }
);
db.users.createIndex({ login: 'text' });
Enter fullscreen mode Exit fullscreen mode

How to install Kafka from docker:

Download docker-compose from github - https://github.com/tchiotludo/akhq.
Launch docker-compose:

docker-compose pull
docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Settings:

KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
Enter fullscreen mode Exit fullscreen mode

Top comments (0)