DEV Community

Cover image for Data Streaming for Microservices using NATS Streaming - Part 2
Winner Musole Masu
Winner Musole Masu

Posted on

Data Streaming for Microservices using NATS Streaming - Part 2

Welcome to this tutorial, here we are going to finished what was started in Data Streaming for Microservices using Nats Streaming - Part 1. To recapitulate, we will complete the application designed in the figure below:

Image description

It's a basketball dunk contest application made of two services that communicate with each other using NATS Streaming. Service 1, Dunk Service is an Express Application that registers players for the contest, where players attempt dunk shots and publishes shots information through events on a NATS Streaming Server. This was successfully completed in Part 1, let's work on Service 2 in the next lines.

1. Statistic Service

This is an Express Application that will display statistics of the contest app in real-time. It will be ranking players as shown in table below:

Statistic table

Now open the terminal in the project directory dunk-contest/, run these commands:

$ mkdir statistic-service
$ cd statistic-service/
$ npm init -y
$ npm install ts-node-dev typescript express @types/express node-nats-streaming mongodb
Enter fullscreen mode Exit fullscreen mode

Open the package.json file, replace the actual script section by the one below:

  "scripts": {
    "listen": "ts-node-dev --rs --notify false src/listener.ts"
  },
Enter fullscreen mode Exit fullscreen mode

Save the file, and in the same directory create a folder named src/ where you add a typescript file called listener.ts.
Now in listener.ts put the following code:

import nats, { Message } from "node-nats-streaming";
import { MongoClient } from "mongodb";

const start = async () => {
  const stan = nats.connect("dunk-contest", "321", {
    url: "http://localhost:4222",
  });

  stan.on("connect", () => {
    console.log(
      "Statistic Service is connected to NATS Streaming Server \nWaiting for Events ..."
    );

    stan.on("close", () => {
      console.log("Nats connection closed!");
      process.exit();
    });

    const options = stan
      .subscriptionOptions()
      .setManualAckMode(true)
      .setDeliverAllAvailable()
      .setDurableName("Dunk-Shot-Queue-Group");

    const subscription = stan.subscribe(
      "Dunk-Shot",
      "Dunk-Shot-Queue-Group",
      options
    );

    subscription.on("message", async (msg: Message) => {
      const parsedData = JSON.parse(msg.getData().toString("utf-8"));
      console.log("EVENT RECEIVED WITH THE DATA BELOW :");
      console.table(parsedData);

      const mongoClient = await MongoClient.connect(
        "mongodb://localhost:27017/statistic-service"
      );

      const db = mongoClient.db();
      const dunkCollection = db.collection("dunks");
      await dunkCollection.insertOne(parsedData);

      const dunkStatistic = await dunkCollection
        .aggregate([
          {
            $group: {
              _id: "$PLAYER_NAME",
              TOTAL_DUNK: { $count: {} },
              TOTAL_POINT: { $sum: "$DUNK_POINT" },
            },
          },
          { $sort: { TOTAL_POINT: -1 } },
        ])
        .toArray();

      console.log("\x1b[36m%s\x1b[0m", "DUNK CONTEST STATISTIC :");
      console.table(dunkStatistic);
      mongoClient.close();

      msg.ack();
    });
  });
};

start();
Enter fullscreen mode Exit fullscreen mode

In the code above;

  • We imported nats and Message from the node-nats-streaming library,
  • We imported MongoClient from the mongodb library,
  • And in the start function:
  1. We defined a connection to the NATS-Streaming-Server that we started in part 1 on this tutorial series,
  2. On connection, the app subscribes to Dunk-Shot channel, the channel which the Dunk-Service publishes events through. And apart from the channel, we added subscriptions options and a queue group,
  3. On message event, which occurs when the Dunk Service publishes a dunk-shot info; the Statistic-Service connects to its MongoDB server using the mongodb library, takes the message data(dunk-shot info) and inserts it in the dunks collection,
  4. With the dunks collection we made a query that selects and groups each player by the total number of dunk shots and total number of dunk points.

Now generate a tsconfig.json file to compile your TypeScript code in JavaScript code. Open your terminal, navigate back to statistic-service/ directory and run the command below:

$ tsc --init
Enter fullscreen mode Exit fullscreen mode

The statistic-service/ directory should look like the tree below:

.
├── package.json
├── package-lock.json
├── src
│   └── listener.ts
└── tsconfig.json

1 directory, 4 files

Enter fullscreen mode Exit fullscreen mode

Save all files and let's do the following tests:

Test 1: Access to NATS Streaming server Pod

Start the local Kubernetes Cluster by running this in the terminal:

$ minikube start
Enter fullscreen mode Exit fullscreen mode

Then let's get the list of all running pods in the cluster by running:

$ kubectl get pods
Enter fullscreen mode Exit fullscreen mode

Image nat server

Copy your pod's name, you will use it in the command that is coming.

Here we are going to make the NATS Streaming server pod running in the kubernetes cluster accessible in our local machine. Open the terminal, forward a local port on your machine to a port on your pod by running the following:

$ kubectl port-forward <YOUR POD NAME> 4222:4222
Enter fullscreen mode Exit fullscreen mode

Note: Make sure to replace by your real pod's name.

Image description

Test 2: Access to MongoDB pod dedicated to the Dunk Service

Run kubectl get pods in the terminal to take the pod's name;

Image description
With this pod's name forward port 27017 on your local machine to port 27017 on the MongoDB pod inside the cluster:

$ kubectl port-forward <YOUR POD NAME> 27017:27017
Enter fullscreen mode Exit fullscreen mode

Note: Make sure to replace by your real pod's name.

Running mongo dunk service

Test 3: Access to MongoDB pod dedicated to the Statistic Service

Here too, forward port 27016 on your local machine to port 27017 on the MongoDB pod dedicated to the statistic service in the cluster:
Image description

$ kubectl port-forward <YOUR POD NAME> 27016:27017
Enter fullscreen mode Exit fullscreen mode

Note: Make sure to replace by your real pod's name.

kubectl port for statistic

Test 4: Start the Dunk and the Statistic Services(Express applications)

Note: All last three terminals we opened above must still be running.

We must be in the project directory dunk-contest/, where we open more terminals.
In the directory dunk-service/, open a terminal and run the following command:

$ npm start
Enter fullscreen mode Exit fullscreen mode

Dunk Service

And in the directory statistic-service/, open a terminal and run the following command:

$ npm run listen
Enter fullscreen mode Exit fullscreen mode

run listen npm

By now, the Dunk Service and the Statistic Service should be connected to the NATS Streaming Server pod and to their respective MongoDB pods.

Test 5: Open your API Client and do the following requests

Request 1: Post request on http://localhost:4001/dunk-contest/register

Make 2 POST requests, with HEADERS Content-Type: application/json:
Post Request 1:

{
  "name": "LeBron",
  "height": 2.06,
  "weight": 113,
  "experience": 19
}
Enter fullscreen mode Exit fullscreen mode

Post Request 2:

{
  "name": "Stephen",
  "height": 1.88,
  "weight": 84,
  "experience": 13
}
Enter fullscreen mode Exit fullscreen mode

Register Players

Note: Just here, we registered two players for the contest,in the next requests let's have them attempt shot.

Request 2:

Dunk Statistic

Note: With the two post requests, we made our two players attempting each one dunk shot. Feel free to make a lot of these two last requests to see how the Statistic Service produces the real-time contest statistic in its console.

We are done with the tests, our two services are working as intended. Dunk Service registers players and have players attempt dunk shots while Statistic Service makes and updates the statistic of the contest at every dunk shot.

The tests were performed out of the Kubernetes Cluster so far, just for demonstration purposes. In next line, let's make all the services work inside the Cluster.

2. Dockerize Services

We reached the point of the tutorial where we are going to containerize our services with Docker. By doing this, docker images of our services will be uploaded on docker hub and later be used to create pods inside the Kubernetes Cluster.

2.1 Dunk Service Docker Image

In the dunk-service/ directory, create a file named Dockerfile and put the code below:

FROM node:alpine

WORKDIR /app
COPY package.json .
RUN npm install --only=prod
COPY . .

CMD ["npm", "start"]
Enter fullscreen mode Exit fullscreen mode

Let's pull this apart piece by piece.
FROM
This command tells that the base image of this new docker is node:alpine.
WORKDIR
Here the working directory of the running image is defined.
COPY
COPY command just copies the selected file to the working directory.
RUN
RUN instruction allows you to install your application and packages required for it.
CMD
This allows us to set a command that will be executed when the container is running.

Still in the same direction, create another file named .dockerignore and paste the following code:

node_modules
Enter fullscreen mode Exit fullscreen mode

.dockerignore will tell docker not to copy node_modules to the working directory.

Note: In part 1 of the tutorial, prerequisites stated that one should have docker installed in his/her computer, also have a docker hub account.

Now in same directory dunk-service/, open the terminal and do the commands below:

$ docker build -t <YOUR DOCKER HUB ID>/dunk-service .
Enter fullscreen mode Exit fullscreen mode

Note: Make sure to replace by your real docker hub id.

Image docker build

With the command above, we built a docker image for the Dunk Service.
Next, push the image recently built to Docker Hub. The first step to pushing the image is to log in to the Docker Hub account:

$ docker login
Enter fullscreen mode Exit fullscreen mode

Once logged in, push the image with the command below:

$ docker push <YOUR DOCKER HUB ID>/dunk-service
Enter fullscreen mode Exit fullscreen mode

Note: Make sure to replace by your real docker hub id.

Docker push

2.2 Statistic Service Docker Image

Here, we are going to go through the same steps we went through in 2.1 section of this tutorial but in the statistic-service/ directory.

Create a file named Dockerfile in statistic-service/ directory and put the code below:

FROM node:alpine

WORKDIR /app
COPY package.json .
RUN npm install --only=prod
COPY . .

CMD ["npm", "run", "listen"]
Enter fullscreen mode Exit fullscreen mode

And in the same directory, create another file named .dockerignore and put the following:

node_modules
Enter fullscreen mode Exit fullscreen mode

Build the Statistic Service image with

$ docker build -t <YOUR DOCKER HUB ID>/statistic-service .
Enter fullscreen mode Exit fullscreen mode

Note: Make sure to replace by your real docker hub id.

Build image statistic
Push the Statistic Service image to Docker Hub:

$ docker push <YOUR DOCKER HUB ID>/statistic-service
Enter fullscreen mode Exit fullscreen mode

Note: Make sure to replace by your real docker hub id.

Push statitsic app

Your docker hub repositories should have these images as repositories by now:
Docker Hub Repo

3. Kubernetes Pods

In this section, we are going to deploy pods of our services images from Docker Hub in the Kubernetes Cluster.

3.1 Dunk-Service Pod

Dunk svc depl

The figure shows how we shall proceed, we will first create a Deployment object, which will generate and monitor a Pod of the Dunk-Service image. After we will create a Service object that will make pod generated by the deployment object accessible.

Move to the kubernetes/ directory, create a yaml file named dunk-service-deployment.yaml and put the code below:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: dunk-service-depl
spec:
  replicas: 1
  selector:
    matchLabels:
      app: dunk-service
  template:
    metadata:
      labels:
        app: dunk-service
    spec:
      containers:
        - name: dunk-service
          image: <YOUR DOCKER HUB ID>/dunk-service
Enter fullscreen mode Exit fullscreen mode

Note: Make sure to replace by your real docker hub id.

With these configurations, kubernetes will create a Deployment object to monitor a Pod running the Dunk-Service image.

And create another yaml file named dunk-service-svc.yaml and put the follow:

apiVersion: v1
kind: Service
metadata:
  name: dunk-service-svc
spec:
  selector:
    app: dunk-service
  type: ClusterIP
  ports:
    - name: dunk-service
      protocol: TCP
      port: 4001
      targetPort: 4001


Enter fullscreen mode Exit fullscreen mode

Another config file that will create a kubernetes object of kind Service that will permit other pods in the cluster to access the Dunk-Service pod.

3.2 Statistic-Service Pod

Image description
Similar to 3.1, we will first create a Deployment object, which will generate and monitor a Pod of the Statistic-Service image.

In the same directory, create a yaml file named statistic-service-deployment.yaml and with the following code:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: statistic-service-depl
spec:
  replicas: 1
  selector:
    matchLabels:
      app: statistic-service
  template:
    metadata:
      labels:
        app: statistic-service
    spec:
      containers:
        - name: statistic-service
          image: <YOUR DOCKER HUB ID>/statistic-service
Enter fullscreen mode Exit fullscreen mode

Note: Make sure to replace by your real docker hub id.

Your kubernetes/ directory tree structure should look like this by now:

.
├── dunk-mongo-deployment.yaml
├── dunk-mongo-service.yaml
├── dunk-service-deployment.yaml
├── dunk-service-svc.yaml
├── nats-deployment.yaml
├── nats-service.yaml
├── statistic-service-deployment.yaml
├── stats-mongo-deployment.yaml
└── stats-mongo-service.yaml

0 directories, 9 files

Enter fullscreen mode Exit fullscreen mode

Open the terminal in the same directory and run the command below to create kubernetes objects newly added:

kubectl apply -f .
Enter fullscreen mode Exit fullscreen mode

Image description

At this stage of the tutorial we must have five pods running in the kubernetes cluster.

Early, we exposed our services outside of the cluster by forwarding ports in our local machines to different ports of pods running within the cluster. Optimally, let's use Ingress to do same thing.

3.3 Ingress

Image description
Ingress is a Kubernetes object which manages external accesses to services in a cluster.
Go ahead and create a yaml file in the kubernetes/ directory named ingress-service.yaml with the following code inside:

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: ingress-service
  annotations:
    kubernetes.io/ingress.class: nginx
    nginx.ingress.kubernetes.io/use-regex: "true"
    nginx.ingress.kubernetes.io/rewrite-target: /?(.*)
spec:
  rules:
    - http:
        paths:
          - path: /?(.*)
            pathType: Prefix
            backend:
              service:
                name: dunk-service-svc
                port:
                  number: 4001


Enter fullscreen mode Exit fullscreen mode

In this file, we defined the kind of kubernetes resource we want to create Ingress and in the Ingress spec we set http(s) rules which assure that all incoming requests are directed to the Dunk-Service pod via its dedicated kubernetes service resource.

Great, before moving further run this command in the terminal:

$ minikube addons enable ingress
Enter fullscreen mode Exit fullscreen mode

This command will install ingress controller in the cluster, which enables ingress resource to work.
After, open the terminal in kubernetes/ directory and run:

$ kubectl apply -f ingress-service.yaml
Enter fullscreen mode Exit fullscreen mode

Image description

Now that we can access pods running within our kubernetes cluster, let's see how this will work.

4. Test

In this test section, you are required to install Skaffold in your machine. It is required because it facilitates continuous development for kubernetes-native applications.
Basically, Skaffold will build a new docker image for each of our services anytime we make code changes, it will then push that image to docker hub and finally deploy it inside the kubernetes cluster.
Click Here to download Skaffold in your computer.

Create a new yaml file in the project directory dunk-contest/, name the file skaffold.yaml and put the code below:

apiVersion: skaffold/v2alpha4
kind: Config
deploy:
  kubectl:
    manifests:
      - ./kubernetes/*
build:
  local:
    push: false
  artifacts:
    - image: <YOUR DOCKER HUB ID>/dunk-service
      context: dunk-service
      docker:
        dockerfile: Dockerfile
      sync:
        manual:
          - src: "src/**/*.ts"
            dest: .
    - image: <YOUR DOCKER HUB ID>/statistic-service
      context: statistic-service
      docker:
        dockerfile: Dockerfile
      sync:
        manual:
          - src: "src/**/*.ts"
            dest: .

Enter fullscreen mode Exit fullscreen mode

Up there, in the deploy section, we tell how images are to be deployed in the cluster. For this case it will be by running kubectl apply on all yaml files in location specified by the manifests.

Then with build section in the artifacts we define the name of the image to be built. In the context we define directory containing the artifact's sources. In docker we describes an artifact built from a Dockerfile, located in the workspace.
Finally with sync, we sync local files to pods instead of triggering an image build when changes occur. And manually we indicate the files sources and the destination.

Note: make sure to replace by Docker Hub id.

Save it, open the terminal in the project directory dunk-contest/ and run the command below:

$ skaffold dev
Enter fullscreen mode Exit fullscreen mode

Note: The first attempt might fail, you can try the command even trice.

Image description

Let's make some changes in our services code. First changes will be done in the Dunk-Service code where instead of connecting the service to pods that were exposed to our local machines with port-forward, we will connect the service to those pods using their appropriate Kubernetes service object.
Open the project in the dunk-service/src/ directory and in the index.ts file, replace the old url by the new one:

Image description

In dunk-service/src/routes directory, open registerPlayerRoutes.ts to replace the old code by the new:

Image description

Still in dunk-service/src/routes directory/ open attemptDunkRoutes.ts to replace old code by new:

Image description

And the last changes will be done in the Statistic-Service code. Open the project in statistic-service/src/ directory and in listener.ts replace old code by new ones:

Image description

Make sure to save all changes we did so far and if skaffold has stopped running, run skaffold dev command again in the project directory. Now proceed with final test:

Here take your cluster IP with

$ minikube ip
Enter fullscreen mode Exit fullscreen mode

Image description
It's going to be part of the URL for the test.

Open your API client, do these post requests:

Body 1

{
  "name": "LeBron",
  "height": 2.06,
  "weight": 113,
  "experience": 19
}
Enter fullscreen mode Exit fullscreen mode

Body 2:

{
  "name": "Stephen",
  "height": 1.88,
  "weight": 84,
  "experience": 13
}
Enter fullscreen mode Exit fullscreen mode

Note: Just here, we registered two players for the contest,in the next requests let's have them attempt shot.

Note: With the two post requests, we made our two players attempting each one dunk shot. Feel free to make a lot of these two last requests to see how the Statistic Service produces the real-time contest statistic in its console.

Final Test

5. Conclusion

Hopefully through this tutorial series, you have seen and understood how to stream data between services in a Kubernetes Cluster with NATS Streaming server. The all topic is complex, so there is a lot more to learn and to apply out there.

For your questions, comments, feel free to reach out here or on my Twitter; I will be more than happy to answer and Here, you may find the project code.

See you soon.

Discussion (2)

Collapse
akomega profile image
Ajay

Great tutorial! Explained it in much detail.

Collapse
ldco2016 profile image
Daniel Cortes

In my opinion, as someone that has set up NATS Streaming Server, before we request the reader drop in a whole bunch of code, we should remind them and instruct them to test things out first, so before we drop in something like this:

subscription.on("message", async (msg: Message) => {
const parsedData = JSON.parse(msg.getData().toString("utf-8"));
console.log("EVENT RECEIVED WITH THE DATA BELOW :");
console.table(parsedData);

  const mongoClient = await MongoClient.connect(
    "mongodb://localhost:27017/statistic-service"
  );

  const db = mongoClient.db();
  const dunkCollection = db.collection("dunks");
  await dunkCollection.insertOne(parsedData);

  const dunkStatistic = await dunkCollection
    .aggregate([
      {
        $group: {
          _id: "$PLAYER_NAME",
          TOTAL_DUNK: { $count: {} },
          TOTAL_POINT: { $sum: "$DUNK_POINT" },
        },
      },
      { $sort: { TOTAL_POINT: -1 } },
    ])
    .toArray();

  console.log("\x1b[36m%s\x1b[0m", "DUNK CONTEST STATISTIC :");
  console.table(dunkStatistic);
  mongoClient.close();

  msg.ack();
});
Enter fullscreen mode Exit fullscreen mode

});

How about we first do something like this:

subscription.on('message', (msg) => {
console.log('Message received');
});

Right away, if we are working in TypeScript and we should be, TypeScript will consider msg argument as any and we don't want that, so we can solve that by adding Message from the node-nats-streaming library, which you do mention in your post, which is an interface that will describe the type of the msg argument, so like so:

subscription.on('message', (msg: Message) => {
console.log('Message received');
});

And then we want to encourage the reader to go inside the type definition file of that Message type to see what methods are available to the person wanting to learn from this blog post.

And no the npm documentation on node-nats-streaming is not going to educate the reader on the methods available to them on the type of Message.

For example, getSubject() returns the name of the channel that the message just came out of. But more relevant may be getSquence() and getData() which you use in your code you are asking others to just paste without explanation.

getSequence() is going to return the number of this event, in NATS all events starts out as number 1, 2, 3 and so on.

getData() returns the actual data included inside the Message or event.

You may also want to mention that with this here: msg.getData(), you are going to get back a Buffer and not a string and thats why you are doing JSON.parse(). I mean lets keep in mind the wide variety of skill level of developers that may be reading this article, from the most novice on up.