DEV Community

Cover image for Data Streaming for Microservices using NATS Streaming - Part 1
Winner Musole Masu
Winner Musole Masu

Posted on

Data Streaming for Microservices using NATS Streaming - Part 1

The goal behind Data streaming is to process and analyze in real-time, data that move from data sources to destinations.

It is mostly used in microservices to ensure inter-services communication. In a microservice architecture, the recommendation is to build independent services that can be altered, updated or taken down without affecting the rest of the architecture.

In this tutorial, we are going to learn how to use NATS Streaming in a Kubernetes Cluster. NATS Streaming is a data streaming system powered by NATS.

We will build a Basketball Dunk Contest App with two services, a Dunk Service that will handle players registration and dunk shot attempts for registered players. And a Statistic Service which will be displaying the Dunk Contest statistic in real-time from data accompanying events messages. NATS Streaming here will be the events transporter between our two services.

Image description

Before we dive into the code, make sure you have the following in order to follow along with this tutorial:

  • Working knowledge of Node.js / typescript, Docker, Kubernetes Objects,
  • Node.js (preferably the latest LTS version), Docker How to install docker on Ubuntu? and local Kubernetes Cluster installed via Minikube, click HERE to install minikube,
  • Docker Hub account, click HERE to sign up,

I will be coding in a Linux machine for the tutorial.

1. Project Structure

Project
Let's set up our project, we will first work on a number of kubernetes objects related to the project.
Run the following:

$ mkdir dunk-contest
$ cd dunk-contest/
$ mkdir kubernetes
Enter fullscreen mode Exit fullscreen mode

These commands create the project directory dunk-contest/ then navigate inside the directory to create another directory named kubernetes/.

In kubernetes/ directory, we are going to add new files with required configurations to build the following Kubernetes objects:

  1. Deployment objects for the NATS Streaming Server, for the Dunk Service MongoDB database and for the Statistic Service MongoDB database,
  2. Service objects for Pods running containers of NATS Streaming image, Mongo image for Dunk Service and for Statistic Service.

1.1 Deployment and Service objects

Make sure to work this part of the tutorial in the kubernetes/ directory!

1.1.1 NATS Deployment and Service

NATS DEPL SERV

  • Add a new YAML file named nats-deployment.yaml and put the configuration below:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nats-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nats
  template:
    metadata:
      labels:
        app: nats
    spec:
      containers:
        - name: nats
          image: nats-streaming:0.22.1
          args:
            [
              "-p",
              "4222",
              "-hbi",
              "5s",
              "-hbt",
              "5s",
              "-hbf",
              "2",
              "-SD",
              "-cid",
              "dunk-contest",
            ]
Enter fullscreen mode Exit fullscreen mode

This config file will create a Pod running a container of nats-streaming:0.22.1 docker image and a Deployment to monitor the Pod. Practically, this Pod will act as the project NATS Streaming Server exposing port 4222 to clients(Dunk Service and Statistic Service).

  • Add a new YAML file named nats-service.yaml and put the configuration below:
apiVersion: v1
kind: Service
metadata:
  name: nats-service
spec:
  selector:
    app: nats
  ports:
    - name: client
      protocol: TCP
      port: 4222
      targetPort: 4222
Enter fullscreen mode Exit fullscreen mode

This config file will create a kubernetes object of kind Service, that other pods inside the kubernetes cluster will use to access the NATS streaming server Pod on port 4222.

1.1.2 MongoDB Deployment and Service

Mongo depl serv

Here we are going to add 4 new config files:

  • 1 dunk-mongo-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: dunk-mongo-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: dunk-mongo
  template:
    metadata:
      labels:
        app: dunk-mongo
    spec:
      containers:
        - name: dunk-mongo
          image: mongo
Enter fullscreen mode Exit fullscreen mode

With these configurations, kubernetes will create a Deployment object to monitor a MongoDB Pod of mongo docker image. This database will be dedicated to the Dunk Service.

  • 2 dunk-mongo-service.yaml:
apiVersion: v1
kind: Service
metadata:
  name: dunk-mongo-service
spec:
  selector:
    app: dunk-mongo
  ports:
    - name: db
      protocol: TCP
      port: 27017
      targetPort: 27017
Enter fullscreen mode Exit fullscreen mode

Another config file that will create a kubernetes object of kind Service that will permit other pods in the cluster to access the mongo pod of the Dunk Service.

  • 3 stats-mongo-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: stats-mongo-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: stats-mongo
  template:
    metadata:
      labels:
        app: stats-mongo
    spec:
      containers:
        - name: stats-mongo
          image: mongo
Enter fullscreen mode Exit fullscreen mode

Kubernetes will use this set of configurations to create a Deployment object to manage the MongoDB Pod of mongo docker image. This mongo database will be dedicated to the Statistic Service only.

  • 4 stats-mongo-service.yaml:
apiVersion: v1
kind: Service
metadata:
  name: stats-mongo-service
spec:
  selector:
    app: stats-mongo
  ports:
    - name: db
      protocol: TCP
      port: 27017
      targetPort: 27017
Enter fullscreen mode Exit fullscreen mode

Finally we have this config file to create a Service object that will expose the MongoDB Pod of the Statistic Service to other pods in the kubernetes cluster.

Your kubernetes/ directory tree structure should look like this by now:

.
├── dunk-mongo-deployment.yaml
├── dunk-mongo-service.yaml
├── nats-deployment.yaml
├── nats-service.yaml
├── stats-mongo-deployment.yaml
└── stats-mongo-service.yaml

0 directories, 6 files
Enter fullscreen mode Exit fullscreen mode

Save all created files in the kubernetes/ directory and make sure that your Kubernetes cluster is up and running. Open the terminal and run the following:

$ minikube start
Enter fullscreen mode Exit fullscreen mode

Minikube start

Minikube quickly sets up a local Kubernetes cluster on macOS, Linux, and Windows.

Now, let's tell Kubernetes to create objects using our configuration files. Run the command below in the kubernetes/ directory:

$ kubectl apply -f . 
Enter fullscreen mode Exit fullscreen mode

kubectl apply -f .

At this moment, we must have 3 running pods, one for nats-streaming, for mongodb of the Dunk Service and one for mongodb of the Statistic Service. Verify it with this command:

$ kubectl get pods
Enter fullscreen mode Exit fullscreen mode

kubectl get pods

We reached the point in the tutorial where we are going to build our two services and connect them to the Pods created in this part of the tutorial. Let's do it in the next points.

1.2 Dunk Service

Here we are going to build an express application, listening on port 4001 for connections. It will have two API endpoints, one http://localhost:4001/dunk-contest/register to handle POST request for players registration and the other http://localhost:4001/dunk-contest/attempt/:playerName to handle POST request for players dunk shots attempts.

Navigate back to the project directory dunk-contest/ and create a new directory named dunk-service/. In the dunk-service/ directory, generate a package.json and install ts-node-dev, typescript, express, @types/express, node-nats-streaming and mongodb as dependencies:

$ cd dunk-service/
$ npm init -y
$ npm install ts-node-dev typescript express @types/express node-nats-streaming mongodb
Enter fullscreen mode Exit fullscreen mode

Open the package.json file, replace the actual script section by the one below:

 "scripts": {
    "start": "ts-node-dev src/index.ts"
  }
Enter fullscreen mode Exit fullscreen mode

Save the file. In the same directory create a directory named src/ ,in src/ add a typescript file named nats-connector.ts and paste the following:

import nats, { Stan } from "node-nats-streaming";

class NatsConnector {
  private _client?: Stan;

  get client() {
    if (!this._client) {
      throw new Error("Cannot access NATS Client before connecting!");
    }
    return this._client;
  }

  connectToNats(clusterId: string, clientId: string, url: string) {
    this._client = nats.connect(clusterId, clientId, { url });

    return new Promise<void>((resolve, reject) => {
      this.client.on("connect", () => {
        console.log(`DUNK SERVICE IS CONNECTED TO NATS STREAMING SERVER`);
        resolve();
      });
      this.client.on("error", (err) => {
        reject(err);
      });
    });
  }
}

export const natsConnector = new NatsConnector();
Enter fullscreen mode Exit fullscreen mode

Inside this file:

  • We define a variable _client of Stan type, a type imported from node-nats-streaming library,
  • We export an instance of NatsConnector class that has a method called connectToNats() - connectToNats() takes three parameters, the clusterId, the clientId and the url:
  1. clusterId: This was set early in the NATS streaming server deployment configuration file. Dunk Service, here being a client will use it to connect to the NATS server,
  2. clientId: An identifier for the Dunk Service as client to the NATS server,
  3. url: The NATS Streaming server endpoint, that the Dunk Service will use to access resources in the NATS running pod.

In connectToNats(), to _client we assign a function imported from node-nats-streaming called connect() on which we passe our three parameters as arguments.
And connectToNats() returns a promise that resolve if _client get successfully connected to NATS server and reject if otherwise.

Next, add another typescript file named event-publisher.ts and put the following:

import { Stan } from "node-nats-streaming";

export class EventPublisher {
  private client: Stan;

  constructor(client: Stan) {
    this.client = client;
  }

  publishEvent(subject: string, data: any): Promise<void> {
    return new Promise((resolve, reject) => {
      this.client.publish(subject, JSON.stringify(data), (err) => {
        if (err) {
          return reject(err);
        }
        console.log("\x1b[36m%s\x1b[0m", `EVENT ${subject} PUBLISHED!`);
        resolve();
      });
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

In this one, we export the class EventPublisher that has a variable named client of type Stan just like in the NatsConnetor class. We have a method in this class called publishEvent() of two parameters:

  1. subject: This is the name of the channel which events passe through and reach clients that had subscribed to,
  2. data: the data or message accompanying published events.

And publishEvent() returns a promise that resolve when events are successfully published and reject when there is a failure.

After this, in src/ directory, create a directory named routes/; add two new typescript files:

  • 1 registerPlayerRoutes.ts a middleware and put the code below:
import { Router, Request, Response } from "express";
import { MongoClient } from "mongodb";

interface Players {
  NAME: string;
  HEIGHT: number;
  WEIGHT: number;
  EXPERIENCE: number;
}

const registerPlayerRouter = Router();
registerPlayerRouter.post(
  "/dunk-contest/register",
  async (req: Request, res: Response) => {
    const player: Players = {
      NAME: req.body.name,
      HEIGHT: req.body.height,
      WEIGHT: req.body.weight,
      EXPERIENCE: req.body.experience,
    };

    const mongoClient = await MongoClient.connect(
      "mongodb://localhost:27017/dunk-service"
    );

    const db = mongoClient.db();
    const playerCollection = db.collection("players");
    await playerCollection.insertOne(player);

    console.log("\x1b[36m%s\x1b[0m", "PLAYER REGISTERED WITH SUCCESS");
    const newPlayer = await playerCollection.findOne({
      NAME: req.body.name,
    });
    console.table(newPlayer);
    res.send({});

    mongoClient.close();
  }
);

export { registerPlayerRouter };
Enter fullscreen mode Exit fullscreen mode

In the registerPlayerRoutes.ts file above we did the following:

  • Imported Router, Request and Response from express;
  • Imported MongoClient from mongodb;
  • Implemented a POST request on registerPlayerRouter.post("/dunk-contest/register") to register players to the players collection in dunk-service MongoDB database and fetch the registered player. MongoClient is used here to connect this process to the appropriate MongoDB Pod.

  • 2 attemptDunkRoutes.ts and put the code below:

import { Router, Request, Response } from "express";
import { MongoClient } from "mongodb";
import { natsConnector } from "./../nats-connector";
import { EventPublisher } from "./../event-publisher";

const attemptDunkRouter = Router();
attemptDunkRouter.post(
  "/dunk-contest/attempt/:playerName",
  async (req: Request, res: Response) => {
    const mongoClient = await MongoClient.connect(
      "mongodb://localhost:27017/dunk-service"
    );

    const db = mongoClient.db();
    const playerCollection = db.collection("players");

    const playerFound = await playerCollection.findOne({
      NAME: req.params.playerName,
    });

    const dunkPoint: number =
      (playerFound?.HEIGHT *
        playerFound?.WEIGHT *
        playerFound?.EXPERIENCE *
        Math.random()) /
      100;

    await new EventPublisher(natsConnector.client).publishEvent("Dunk-Shot", {
      PLAYER_NAME: playerFound?.NAME,
      DUNK_POINT: dunkPoint,
    });
    res.send({});

    mongoClient.close();
  }
);

export { attemptDunkRouter };
Enter fullscreen mode Exit fullscreen mode

With attemptDunkRoutes.ts we worked on a middleware, we did the following:

  • Imported Router, Request and Response from express;
  • Imported MongoClient from mongodb;
  • Imported natsConnector, an NatsConnector instance;
  • Imported the class EventPublisher;
  • Implemented a POST request on attemptDunkRouter.post( "/dunk-contest/attempt/:playerName") to attempt a dunk shots for a player found in the players collection by player's name got with req.params.playerName;
  • MongoClient is used here to connect this process to the appropriate MongoDB pod;
  • With EventPlubilsher class, we created a new instance that passes natsconnector.client as argument and calls publishEvent function to publish an event through the Dunk-Shot channel with PLAYER_NAME and DUNK_POINT as event message;
  • DUNK_POINT is number calculated with the player's HEIGHT, WEIGHT, EXPERIENCE and a random number.

To wrap with service up, go ahead, move back to src/ directory, add a typescript file named index.ts and paste the code below:

import express from "express";
import { registerPlayerRouter } from "./routes/registerPlayerRoutes";
import { attemptDunkRouter } from "./routes/attemptDunkRoutes";
import { natsConnector } from "./nats-connector";

const app = express();
app.use(express.json());

app.use(registerPlayerRouter);
app.use(attemptDunkRouter);

const start = async () => {
  try {
    await natsConnector.connectToNats(
      "dunk-contest",
      "123",
      "http://localhost:4222"
    );

    natsConnector.client.on("close", () => {
      process.exit();
    });
  } catch (error) {
    console.error(error);
  }
  app.listen(4001, () => {
    console.log("\x1b[36m%s\x1b[0m", "DUNK SERVICE LISTENING ON 4001");
  });
};

start();

Enter fullscreen mode Exit fullscreen mode

In the index.ts file above we did the following:

  • Imported express, Request and Response from express;
  • Imported registerPlayerRouter and attemptDunkRouter, two middlewares;
  • Imported natsConnector, an instance of class NatsConnector that was created early;
  • Called the express function express() and puts new Express application inside the app variable (to start a new Express application);
  • Used the middlewares with app.use(registerPlayerRouter) and app.use(attemptDunkRouter);
  • Wrote the start function to connect the Express application to NATS Streaming server and have it listens for connection on port 4001.

Now generate a tsconfig.json file to compile your TypeScript code in JavaScript code. Open your terminal, navigate back to dunk-service/ directory and run the command below:

$ tsc --init
Enter fullscreen mode Exit fullscreen mode

Great, we are almost done with the Dunk Service, we shall come back later to fix some little things.

The dunk-service/ directory should look like the tree below:

.
├── package.json
├── package-lock.json
├── src
│   ├── event-publisher.ts
│   ├── index.ts
│   ├── nats-connector.ts
│   └── routes
│       ├── attemptDunkRoutes.ts
│       └── registerPlayerRoutes.ts
└── tsconfig.json

2 directories, 8 files
Enter fullscreen mode Exit fullscreen mode

Let's perform a simple test to check the following:

  • Dunk Service connection to its dedicated MongoDB running pod;
  • Dunk Service connection to the Nats Streaming Server;

In the steps below, do the test:
Step - 1: Access to NATS Streaming server Pod

Take your pod's name by running:

$ kubectl get pods
Enter fullscreen mode Exit fullscreen mode

get pods nats

Copy your pod's name, you will use it in the command that is coming.

Here we are going to make the NATS Streaming server pod running in the kubernetes cluster accessible in our local machine. Open the terminal, forward a local port on your machine to a port on your pod by running the following:

$ kubectl port-forward <YOUR POD NAME> 4222:4222
Enter fullscreen mode Exit fullscreen mode

Make sure to paste your pod's name where indicated in the command above.

NATS service accessible in local m

Step - 2: Access to MongoDB pod dedicated to the Dunk Service
Take your pod's name by running:

$ kubectl get pods
Enter fullscreen mode Exit fullscreen mode

get pods mongo dunk
Copy your pod's name, you will use it in the command that is coming.
Here we are going to make the MongoDB pod of Dunk Service running in the kubernetes cluster accessible in our local machine. Open another terminal and forward a local port on your machine to a port on your pod by running the following:

$ kubectl port-forward <YOUR POD NAME> 27017:27017
Enter fullscreen mode Exit fullscreen mode

mongo pod accessible to local m

Step - 3: Start the Dunk Service (Express application)

Open a third terminal in the dunk-service/ directory and run this command:

$ npm start
Enter fullscreen mode Exit fullscreen mode

npm start

By now, the Dunk Service must be connected to the NATS Streaming server pod and to its MongoDB pod.

Step - 4: Open your API Client and do these tests

Make a POST request, with HEADERS Content-Type: application/json and a BODY of :

{
  "name": "LeBron",
  "height": 2.18,
  "weight": 105,
  "experience": 5
}
Enter fullscreen mode Exit fullscreen mode

Hopefully, you will have a similar output in your terminals as below:

Dunk Service test

2. Conclusion

In this part of the tutorial, we started to build our Dunk Contest Application with the purpose of learning how to use NATS Streaming in a microservice architecture in a Kubernetes Cluster set and running in our local machines.

The completed Application should have featured two services, the Dunk Service and Statistic Service streaming data using NATS Streaming.

In the process, we started a NATS Streaming Server running in the kubernetes cluster and two MongoDB Pods, each dedicated to a specific service. We also started the Dunk Service, which successfully registered players in its MongoDB Pod and successfully published an event to the NATS Streaming Server; event consisting of a dunk shot of a registered player.

Please stay tuned for more important NATS Streaming practices that we are going to implement in part 2 of this tutorial as we will be completing our Dunk Contest Application.

If you have questions, comments, feel free to reach out here or on my Twitter; I will be more than happy to answer and Here, you may find the project code.

See you soon.

Image description

Top comments (0)