Welcome to this tutorial, here we are going to finished what was started in Data Streaming for Microservices using Nats Streaming - Part 1. To recapitulate, we will complete the application designed in the figure below:
It's a basketball dunk contest application made of two services that communicate with each other using NATS Streaming. Service 1, Dunk Service is an Express Application that registers players for the contest, where players attempt dunk shots and publishes shots information through events on a NATS Streaming Server. This was successfully completed in Part 1, let's work on Service 2 in the next lines.
1. Statistic Service
This is an Express Application that will display statistics of the contest app in real-time. It will be ranking players as shown in table below:
Now open the terminal in the project directory dunk-contest/
, run these commands:
$ mkdir statistic-service
$ cd statistic-service/
$ npm init -y
$ npm install ts-node-dev typescript express @types/express node-nats-streaming mongodb
Open the package.json file, replace the actual script section by the one below:
"scripts": {
"listen": "ts-node-dev --rs --notify false src/listener.ts"
},
Save the file, and in the same directory create a folder named src/
where you add a typescript file called listener.ts
.
Now in listener.ts
put the following code:
import nats, { Message } from "node-nats-streaming";
import { MongoClient } from "mongodb";
const start = async () => {
const stan = nats.connect("dunk-contest", "321", {
url: "http://localhost:4222",
});
stan.on("connect", () => {
console.log(
"Statistic Service is connected to NATS Streaming Server \nWaiting for Events ..."
);
stan.on("close", () => {
console.log("Nats connection closed!");
process.exit();
});
const options = stan
.subscriptionOptions()
.setManualAckMode(true)
.setDeliverAllAvailable()
.setDurableName("Dunk-Shot-Queue-Group");
const subscription = stan.subscribe(
"Dunk-Shot",
"Dunk-Shot-Queue-Group",
options
);
subscription.on("message", async (msg: Message) => {
const parsedData = JSON.parse(msg.getData().toString("utf-8"));
console.log("EVENT RECEIVED WITH THE DATA BELOW :");
console.table(parsedData);
const mongoClient = await MongoClient.connect(
"mongodb://localhost:27017/statistic-service"
);
const db = mongoClient.db();
const dunkCollection = db.collection("dunks");
await dunkCollection.insertOne(parsedData);
const dunkStatistic = await dunkCollection
.aggregate([
{
$group: {
_id: "$PLAYER_NAME",
TOTAL_DUNK: { $count: {} },
TOTAL_POINT: { $sum: "$DUNK_POINT" },
},
},
{ $sort: { TOTAL_POINT: -1 } },
])
.toArray();
console.log("\x1b[36m%s\x1b[0m", "DUNK CONTEST STATISTIC :");
console.table(dunkStatistic);
mongoClient.close();
msg.ack();
});
});
};
start();
In the code above;
- We imported
nats
andMessage
from the node-nats-streaming library, - We imported
MongoClient
from the mongodb library, - And in the
start
function:
- We defined a connection to the NATS-Streaming-Server that we started in part 1 on this tutorial series,
- On connection, the app subscribes to
Dunk-Shot
channel, the channel which the Dunk-Service publishes events through. And apart from the channel, we added subscriptions options and a queue group, - On message event, which occurs when the Dunk Service publishes a dunk-shot info; the Statistic-Service connects to its MongoDB server using the mongodb library, takes the message data(dunk-shot info) and inserts it in the dunks collection,
- With the dunks collection we made a query that selects and groups each player by the total number of dunk shots and total number of dunk points.
Now generate a tsconfig.json file to compile your TypeScript code in JavaScript code. Open your terminal, navigate back to statistic-service/
directory and run the command below:
$ tsc --init
The statistic-service/
directory should look like the tree below:
.
├── package.json
├── package-lock.json
├── src
│ └── listener.ts
└── tsconfig.json
1 directory, 4 files
Save all files and let's do the following tests:
Test 1: Access to NATS Streaming server Pod
Start the local Kubernetes Cluster by running this in the terminal:
$ minikube start
Then let's get the list of all running pods in the cluster by running:
$ kubectl get pods
Copy your pod's name, you will use it in the command that is coming.
Here we are going to make the NATS Streaming server pod running in the kubernetes cluster accessible in our local machine. Open the terminal, forward a local port on your machine to a port on your pod by running the following:
$ kubectl port-forward <YOUR POD NAME> 4222:4222
Note: Make sure to replace by your real pod's name.
Test 2: Access to MongoDB pod dedicated to the Dunk Service
Run kubectl get pods
in the terminal to take the pod's name;
With this pod's name forward port 27017 on your local machine to port 27017 on the MongoDB pod inside the cluster:
$ kubectl port-forward <YOUR POD NAME> 27017:27017
Note: Make sure to replace by your real pod's name.
Test 3: Access to MongoDB pod dedicated to the Statistic Service
Here too, forward port 27016 on your local machine to port 27017 on the MongoDB pod dedicated to the statistic service in the cluster:
$ kubectl port-forward <YOUR POD NAME> 27016:27017
Note: Make sure to replace by your real pod's name.
Test 4: Start the Dunk and the Statistic Services(Express applications)
Note: All last three terminals we opened above must still be running.
We must be in the project directory dunk-contest/
, where we open more terminals.
In the directory dunk-service/
, open a terminal and run the following command:
$ npm start
And in the directory statistic-service/
, open a terminal and run the following command:
$ npm run listen
By now, the Dunk Service and the Statistic Service should be connected to the NATS Streaming Server pod and to their respective MongoDB pods.
Test 5: Open your API Client and do the following requests
Request 1: Post request on http://localhost:4001/dunk-contest/register
Make 2 POST requests, with HEADERS Content-Type: application/json:
Post Request 1:
{
"name": "LeBron",
"height": 2.06,
"weight": 113,
"experience": 19
}
Post Request 2:
{
"name": "Stephen",
"height": 1.88,
"weight": 84,
"experience": 13
}
Note: Just here, we registered two players for the contest,in the next requests let's have them attempt shot.
Request 2:
Post request on http://localhost:4001/dunk-contest/attempt/LeBron
Post request on http://localhost:4001/dunk-contest/attempt/Stephen
Note: With the two post requests, we made our two players attempting each one dunk shot. Feel free to make a lot of these two last requests to see how the Statistic Service produces the real-time contest statistic in its console.
We are done with the tests, our two services are working as intended. Dunk Service registers players and have players attempt dunk shots while Statistic Service makes and updates the statistic of the contest at every dunk shot.
The tests were performed out of the Kubernetes Cluster so far, just for demonstration purposes. In next line, let's make all the services work inside the Cluster.
2. Dockerize Services
We reached the point of the tutorial where we are going to containerize our services with Docker. By doing this, docker images of our services will be uploaded on docker hub and later be used to create pods inside the Kubernetes Cluster.
2.1 Dunk Service Docker Image
In the dunk-service/
directory, create a file named Dockerfile and put the code below:
FROM node:alpine
WORKDIR /app
COPY package.json .
RUN npm install --only=prod
COPY . .
CMD ["npm", "start"]
Let's pull this apart piece by piece.
FROM
This command tells that the base image of this new docker is node:alpine.
WORKDIR
Here the working directory of the running image is defined.
COPY
COPY command just copies the selected file to the working directory.
RUN
RUN instruction allows you to install your application and packages required for it.
CMD
This allows us to set a command that will be executed when the container is running.
Still in the same direction, create another file named .dockerignore and paste the following code:
node_modules
.dockerignore
will tell docker not to copy node_modules to the working directory.
Note: In part 1 of the tutorial, prerequisites stated that one should have docker installed in his/her computer, also have a docker hub account.
Now in same directory dunk-service/
, open the terminal and do the commands below:
$ docker build -t <YOUR DOCKER HUB ID>/dunk-service .
Note: Make sure to replace by your real docker hub id.
With the command above, we built a docker image for the Dunk Service.
Next, push the image recently built to Docker Hub. The first step to pushing the image is to log in to the Docker Hub account:
$ docker login
Once logged in, push the image with the command below:
$ docker push <YOUR DOCKER HUB ID>/dunk-service
Note: Make sure to replace by your real docker hub id.
2.2 Statistic Service Docker Image
Here, we are going to go through the same steps we went through in 2.1 section of this tutorial but in the statistic-service/
directory.
Create a file named Dockerfile in statistic-service/
directory and put the code below:
FROM node:alpine
WORKDIR /app
COPY package.json .
RUN npm install --only=prod
COPY . .
CMD ["npm", "run", "listen"]
And in the same directory, create another file named .dockerignore and put the following:
node_modules
Build the Statistic Service image with
$ docker build -t <YOUR DOCKER HUB ID>/statistic-service .
Note: Make sure to replace by your real docker hub id.
Push the Statistic Service image to Docker Hub:
$ docker push <YOUR DOCKER HUB ID>/statistic-service
Note: Make sure to replace by your real docker hub id.
Your docker hub repositories should have these images as repositories by now:
3. Kubernetes Pods
In this section, we are going to deploy pods of our services images from Docker Hub in the Kubernetes Cluster.
3.1 Dunk-Service Pod
The figure shows how we shall proceed, we will first create a Deployment object, which will generate and monitor a Pod of the Dunk-Service image. After we will create a Service object that will make pod generated by the deployment object accessible.
Move to the kubernetes/
directory, create a yaml file named dunk-service-deployment.yaml
and put the code below:
apiVersion: apps/v1
kind: Deployment
metadata:
name: dunk-service-depl
spec:
replicas: 1
selector:
matchLabels:
app: dunk-service
template:
metadata:
labels:
app: dunk-service
spec:
containers:
- name: dunk-service
image: <YOUR DOCKER HUB ID>/dunk-service
Note: Make sure to replace by your real docker hub id.
With these configurations, kubernetes will create a Deployment object to monitor a Pod running the Dunk-Service image.
And create another yaml file named dunk-service-svc.yaml
and put the follow:
apiVersion: v1
kind: Service
metadata:
name: dunk-service-svc
spec:
selector:
app: dunk-service
type: ClusterIP
ports:
- name: dunk-service
protocol: TCP
port: 4001
targetPort: 4001
Another config file that will create a kubernetes object of kind Service that will permit other pods in the cluster to access the Dunk-Service pod.
3.2 Statistic-Service Pod
Similar to 3.1, we will first create a Deployment object, which will generate and monitor a Pod of the Statistic-Service image.
In the same directory, create a yaml file named statistic-service-deployment.yaml
and with the following code:
apiVersion: apps/v1
kind: Deployment
metadata:
name: statistic-service-depl
spec:
replicas: 1
selector:
matchLabels:
app: statistic-service
template:
metadata:
labels:
app: statistic-service
spec:
containers:
- name: statistic-service
image: <YOUR DOCKER HUB ID>/statistic-service
Note: Make sure to replace by your real docker hub id.
Your kubernetes/
directory tree structure should look like this by now:
.
├── dunk-mongo-deployment.yaml
├── dunk-mongo-service.yaml
├── dunk-service-deployment.yaml
├── dunk-service-svc.yaml
├── nats-deployment.yaml
├── nats-service.yaml
├── statistic-service-deployment.yaml
├── stats-mongo-deployment.yaml
└── stats-mongo-service.yaml
0 directories, 9 files
Open the terminal in the same directory and run the command below to create kubernetes objects newly added:
kubectl apply -f .
At this stage of the tutorial we must have five pods running in the kubernetes cluster.
Early, we exposed our services outside of the cluster by forwarding ports in our local machines to different ports of pods running within the cluster. Optimally, let's use Ingress to do same thing.
3.3 Ingress
Ingress is a Kubernetes object which manages external accesses to services in a cluster.
Go ahead and create a yaml file in the kubernetes/
directory named ingress-service.yaml
with the following code inside:
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: ingress-service
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/use-regex: "true"
nginx.ingress.kubernetes.io/rewrite-target: /?(.*)
spec:
rules:
- http:
paths:
- path: /?(.*)
pathType: Prefix
backend:
service:
name: dunk-service-svc
port:
number: 4001
In this file, we defined the kind of kubernetes resource we want to create Ingress
and in the Ingress spec
we set http(s) rules which assure that all incoming requests are directed to the Dunk-Service pod via its dedicated kubernetes service resource.
Great, before moving further run this command in the terminal:
$ minikube addons enable ingress
This command will install ingress controller in the cluster, which enables ingress resource to work.
After, open the terminal in kubernetes/
directory and run:
$ kubectl apply -f ingress-service.yaml
Now that we can access pods running within our kubernetes cluster, let's see how this will work.
4. Test
In this test section, you are required to install Skaffold in your machine. It is required because it facilitates continuous development for kubernetes-native applications.
Basically, Skaffold will build a new docker image for each of our services anytime we make code changes, it will then push that image to docker hub and finally deploy it inside the kubernetes cluster.
Click Here to download Skaffold in your computer.
Create a new yaml file in the project directory dunk-contest/
, name the file skaffold.yaml
and put the code below:
apiVersion: skaffold/v2alpha4
kind: Config
deploy:
kubectl:
manifests:
- ./kubernetes/*
build:
local:
push: false
artifacts:
- image: <YOUR DOCKER HUB ID>/dunk-service
context: dunk-service
docker:
dockerfile: Dockerfile
sync:
manual:
- src: "src/**/*.ts"
dest: .
- image: <YOUR DOCKER HUB ID>/statistic-service
context: statistic-service
docker:
dockerfile: Dockerfile
sync:
manual:
- src: "src/**/*.ts"
dest: .
Up there, in the deploy
section, we tell how images are to be deployed in the cluster. For this case it will be by running kubectl apply
on all yaml files in location specified by the manifests
.
Then with build
section in the artifacts
we define the name of the image to be built. In the context
we define directory containing the artifact's sources. In docker
we describes an artifact built from a Dockerfile, located in the workspace.
Finally with sync
, we sync local files to pods instead of triggering an image build when changes occur. And manually we indicate the files sources and the destination.
Note: make sure to replace by Docker Hub id.
Save it, open the terminal in the project directory dunk-contest/
and run the command below:
$ skaffold dev
Note: The first attempt might fail, you can try the command even trice.
Let's make some changes in our services code. First changes will be done in the Dunk-Service code where instead of connecting the service to pods that were exposed to our local machines with port-forward, we will connect the service to those pods using their appropriate Kubernetes service object.
Open the project in the dunk-service/src/
directory and in the index.ts
file, replace the old url by the new one:
In dunk-service/src/routes
directory, open registerPlayerRoutes.ts
to replace the old code by the new:
Still in dunk-service/src/routes
directory/ open attemptDunkRoutes.ts
to replace old code by new:
And the last changes will be done in the Statistic-Service code. Open the project in statistic-service/src/
directory and in listener.ts
replace old code by new ones:
Make sure to save all changes we did so far and if skaffold has stopped running, run skaffold dev
command again in the project directory. Now proceed with final test:
Here take your cluster IP with
$ minikube ip
It's going to be part of the URL for the test.
Open your API client, do these post requests:
- 2 Post requests on http://192.168.49.2/dunk-contest/register with HEADERS Content-Type: application/json and each request for one body:
Body 1
{
"name": "LeBron",
"height": 2.06,
"weight": 113,
"experience": 19
}
Body 2:
{
"name": "Stephen",
"height": 1.88,
"weight": 84,
"experience": 13
}
Note: Just here, we registered two players for the contest,in the next requests let's have them attempt shot.
Post request on http://192.168.49.2/dunk-contest/attempt/LeBron
Post request on http://192.168.49.2/dunk-contest/attempt/Stephen
Note: With the two post requests, we made our two players attempting each one dunk shot. Feel free to make a lot of these two last requests to see how the Statistic Service produces the real-time contest statistic in its console.
5. Conclusion
Hopefully through this tutorial series, you have seen and understood how to stream data between services in a Kubernetes Cluster with NATS Streaming server. The all topic is complex, so there is a lot more to learn and to apply out there.
For your questions, comments, feel free to reach out here or on my Twitter; I will be more than happy to answer and Here, you may find the project code.
See you soon.
Top comments (2)
Great tutorial! Explained it in much detail.
In my opinion, as someone that has set up NATS Streaming Server, before we request the reader drop in a whole bunch of code, we should remind them and instruct them to test things out first, so before we drop in something like this:
subscription.on("message", async (msg: Message) => {
const parsedData = JSON.parse(msg.getData().toString("utf-8"));
console.log("EVENT RECEIVED WITH THE DATA BELOW :");
console.table(parsedData);
});
How about we first do something like this:
subscription.on('message', (msg) => {
console.log('Message received');
});
Right away, if we are working in TypeScript and we should be, TypeScript will consider msg argument as any and we don't want that, so we can solve that by adding Message from the node-nats-streaming library, which you do mention in your post, which is an interface that will describe the type of the msg argument, so like so:
subscription.on('message', (msg: Message) => {
console.log('Message received');
});
And then we want to encourage the reader to go inside the type definition file of that Message type to see what methods are available to the person wanting to learn from this blog post.
And no the npm documentation on node-nats-streaming is not going to educate the reader on the methods available to them on the type of Message.
For example, getSubject() returns the name of the channel that the message just came out of. But more relevant may be getSquence() and getData() which you use in your code you are asking others to just paste without explanation.
getSequence() is going to return the number of this event, in NATS all events starts out as number 1, 2, 3 and so on.
getData() returns the actual data included inside the Message or event.
You may also want to mention that with this here: msg.getData(), you are going to get back a Buffer and not a string and thats why you are doing JSON.parse(). I mean lets keep in mind the wide variety of skill level of developers that may be reading this article, from the most novice on up.