Scope
On this article, we are going to cover the basics of centralized logging on a microservices' based architecture using Fluentd as a logging agent. We will also talk a little bit about how to generate customized logs on a microservice created with Nestjs via an external logger such as Winston.
Introduction
Logs are an essential part of any software solution in order to diagnose problems and on a general note, to know what is happening inside of an application.
Let's talk about logging on Kubernetes. On a node level, everything that is written by a containerized application to stdout
and stderr
is picked up by the kubelet
service running on that node. These messages are then sent to the container engine, which handles them according to the logging driver configured in Kubernetes.
By default, logs emitted by apps containerized with Docker end up in the /var/log/containers
directory on the host machine.
These logs are then accesible via the command:
kubectl logs <pod_name>
Even previous execution logs can be accessible using:
kubectl logs <pod_name> --previous
However, this command has some clear limitations, as these logs are lost for example whenever a pod is deleted or a node crashes.
As such, logs should have a separate storage and lifecycle independent of nodes, pods, or containers. This concept is called cluster-level-logging.
Cluster-level logging also allows us to have an overall understanding of how the whole cluster is working together.
Cluster-level logging architectures
While Kubernetes does not provide a native solution for cluster-level logging, their documentation proposes three different approaches:
- Use a node-level logging agent that runs on every node.
- Include a dedicated sidecar container for logging in an application pod.
- Push logs directly to a backend from within an application.
On this article, we will focus on the first option. Feel free to check the referred documentation if you would like to know more about the other alternatives.
Logging agents work by deploying a node level logging agent which checks on different logs sources such as log files and forwards them to a dedicated backend.
As it is necessary for a replica of the logging agent to run on each node of the cluster, the recommended approach is to use a DaemonSet
:
A DaemonSet ensures that all (or some) Nodes run a copy of a Pod. As nodes are added to the cluster, Pods are added to them. As nodes are removed from the cluster, those Pods are garbage collected. Deleting a DaemonSet will clean up the Pods it created.
Generating logs
Before setting up Fluentd as our logging agent, we need to have at least one microservice generating logs in our cluster with a certain format. For this purpose, a simple Nestjs microservice is available at your disposal. The process containerized tries to connect to a NATS server and emits a healthcheck message through this connection and to its standard output every 5 seconds.
You can clone it and edit it as you like, or just apply the manifest on the k8s
folder into your cluster with:
kubectl apply -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/k8s/manifest.yaml
The important part of this Nestjs microservice is the way in which the logs are formatted in a production environment. In order to learn how logs are set up to use an external logger in a Nestjs application, I suggest this great article from Nestjs documentation.
The key files regarding logging configuration in this microservice are:
src/factory/winstonConfig.ts
import { format, transports, addColors, LoggerOptions } from "winston";
import { MESSAGE } from 'triple-beam';
import * as dotenv from 'dotenv';
dotenv.config();
export class LoggerConfig {
private readonly options: LoggerOptions;
private static instance: LoggerConfig;
public static getInstance(): LoggerConfig {
if (!LoggerConfig.instance) {
LoggerConfig.instance = new LoggerConfig();
}
return LoggerConfig.instance;
}
private defaultFormat() {
return format.combine(
format.timestamp(),
format.metadata({
fillExcept: ['message', 'level', 'timestamp', 'label']
})
);
}
private stagingFormat() {
return format.combine(
this.defaultFormat(),
format.colorize(),
format.printf((info) => {
const reqId = info.metadata.reqId;
delete info.metadata.reqId;
return (`${info.level} | ${info.timestamp} ${reqId ? `| ${reqId}` : ''} | ${info.message} | ${JSON.stringify(info.metadata)}`);
})
);
}
private productionFormat() {
return format.combine(
this.defaultFormat(),
format((info) => {
const reqId = info.metadata.reqId;
delete info.metadata.reqId;
info.reqId = reqId;
if (typeof info.message === 'object' && info.message !== null) {
info.message = JSON.stringify(info.message);
}
const { level, timestamp, label } = info;
const message = {
level,
timestamp,
label,
reqId,
metadata: info.metadata,
message: info.message
};
delete info.metadata;
delete info.message;
delete info.level;
delete info.timestamp;
delete info.label;
info[MESSAGE] = JSON.stringify(message);
return info;
})()
);
}
constructor() {
this.options = {
format: process.env.NODE_ENV === 'production' ? this.productionFormat() : this.stagingFormat(),
transports: [
new transports.Console({
level: 'silly', // TODO: Get value from configfile
})
],
exitOnError: false // do not exit on handled exceptions
};
}
public console(): LoggerOptions {
return this.options;
}
}
addColors({
error: 'bold green redBG',
warn: 'italic black yellowBG',
info: 'cyan',
http: 'grey',
verbose: 'magenta',
debug: 'green',
silly: 'bold gray magentaBG'
});
The productionFormat
function reshapes the logs so they show up at later stages as a series of JSONs with some useful data such as its log level or metadata as part of the message's key/value pairs. This way of formatting the logs will be great at the moment of collecting the data with Fluentd. We will talk about it in more detail in a moment.
src/main.ts
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { NATSConfigService } from './config/NATSConfigService';
import { WinstonModule } from 'nest-winston';
import { LoggerConfig } from './factory/winstonConfig';
import { InjectMetadataInterceptor } from './interceptors/InjectMetadataInterceptor';
async function bootstrap() {
const logger: LoggerConfig = LoggerConfig.getInstance();
const winstonLogger = WinstonModule.createLogger(logger.console());
const context = await NestFactory.createApplicationContext(AppModule.register(), {
logger: winstonLogger
});
const natsConfigService : NATSConfigService = context.get(NATSConfigService);
context.close();
const app = await NestFactory.createMicroservice(AppModule.register(), {
...natsConfigService.getNATSConfig,
logger: winstonLogger
});
const globalInterceptors = [
new InjectMetadataInterceptor()
];
app.useGlobalInterceptors(...globalInterceptors);
app.listen(() => winstonLogger.log('Microservice ms-test running'));
}
bootstrap();
This is just a standard bootstrap for a pure Nestjs microservice with NATS as its messaging system and a custom logger.
src/app.module.ts
import { ConfigModule } from '@nestjs/config';
import { Module, DynamicModule } from '@nestjs/common';
import { NATSConfigService } from './config/NATSConfigService';
import { object as JoiObject, string as JoiString, number as JoiNumber } from '@hapi/joi';
import { WinstonModule } from 'nest-winston';
import { LoggerConfig } from './factory/winstonConfig';
import { MessageModule } from './message/message.module';
const logger: LoggerConfig = LoggerConfig.getInstance();
@Module({})
export class AppModule {
public static register(): DynamicModule {
const imports = [
WinstonModule.forRoot(logger.console()),
ConfigModule.forRoot({
isGlobal: true,
validationSchema: JoiObject({
NODE_ENV: JoiString()
.valid('development', 'production', 'test')
.default('development'),
PORT: JoiNumber().port().default(3030),
NATS_URL: JoiString().required().default('nats://localhost:4222'),
NATS_USER: JoiString(),
NATS_PASSWORD: JoiString(),
})
}),
MessageModule
];
const controllers = [];
const providers = [NATSConfigService];
return {
module: AppModule,
imports,
controllers,
providers,
};
}
}
The app module definition where Logger, Config and Message modules are imported to the app.
src/message/message.service.ts
import { Injectable, Inject, OnModuleInit } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { MicroserviceMessage } from '../interface/MicroserviceMessage';
import { WINSTON_MODULE_PROVIDER } from "nest-winston";
import { Logger } from "winston";
import { ConfigService } from '@nestjs/config';
@Injectable()
export class MessageService implements OnModuleInit {
constructor(
@Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger,
@Inject('MESSAGE_CLIENT') private client: ClientProxy,
private configService: ConfigService
){};
sendMessage<Toutput = any, Tinput = any>(pattern: Record<string, any> | string, message: MicroserviceMessage<Tinput>): Observable<Toutput> {
return this.client.send<Toutput, MicroserviceMessage<Tinput>>(pattern, message);
}
emitMessage<Toutput = any, Tinput = any>(pattern: Record<string, any> | string, message: MicroserviceMessage<Tinput>): Observable<Toutput> {
return this.client.emit<Toutput, MicroserviceMessage<Tinput>>(pattern, message);
}
async onModuleInit(): Promise<void> {
try {
await this.client.connect();
} catch (err) {
this.logger.error(err, 'Connection to client has failed, start recovery');
process.exit(1);
}
setInterval(async () => {
try {
await this.client.emit('ms-test"', 'healthcheck').toPromise();
this.logger.info('TEST MESSAGE');
} catch (err) {
this.logger.error(err, 'Healthcheck has failed, start recovery');
process.exit(2);
}
}, +this.configService.get<string>('HEALTHCHECK_INTERVAL'));
}
}
The message service in which the healtcheck message is emitted and logged to stdout
. The MessageModule
intended use is to send messages to other services.
Let's see what shows up at stdout
:
$ kubectl logs ms-test-64d79dc754-jmshs
> ms-test@0.0.1 start:prod /app
> node dist/main
{"level":"info","timestamp":"2020-11-03T20:54:27.193Z","metadata":{"context":"NestFactory"},"message":"Starting Nest application..."}
{"level":"info","timestamp":"2020-11-03T20:54:28.002Z","metadata":{"context":"InstanceLoader"},"message":"ConfigHostModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.088Z","metadata":{"context":"InstanceLoader"},"message":"WinstonModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.090Z","metadata":{"context":"InstanceLoader"},"message":"ConfigModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.992Z","metadata":{"context":"InstanceLoader"},"message":"MetricsModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.994Z","metadata":{"context":"InstanceLoader"},"message":"MessageModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.995Z","metadata":{"context":"InstanceLoader"},"message":"AppModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:29.291Z","metadata":{"context":"NestMicroservice"},"message":"Nest microservice successfully started"}
{"level":"info","timestamp":"2020-11-03T20:54:29.595Z","metadata":{"context":"RoutesResolver"},"message":"AppController {}:"}
{"level":"info","timestamp":"2020-11-03T20:54:29.698Z","metadata":{"context":"RouterExplorer"},"message":"Mapped {, POST} route"}
{"level":"info","timestamp":"2020-11-03T20:54:29.789Z","metadata":{"context":"RouterExplorer"},"message":"Mapped {, GET} route"}
{"level":"info","timestamp":"2020-11-03T20:54:30.193Z","metadata":{"context":"NestApplication"},"message":"Nest application successfully started"}
{"level":"info","timestamp":"2020-11-03T20:54:30.293Z","metadata":{},"message":"Hybrid ms-test test running on port 3003"}
{"level":"info","timestamp":"2020-11-03T20:54:35.106Z","metadata":{},"message":"TEST MESSAGE"}
{"level":"info","timestamp":"2020-11-03T20:54:40.103Z","metadata":{},"message":"TEST MESSAGE"}
Great! Now onward towards Fluentd.
Fluentd
Fluentd is a popular opensource data collector with an outstanding amount of plugin options for several purposes :
- Output (AWS S3, ElasticSearch)
- Input (Apache Kafka, HTTP, TCP)
- Big Data (webhdfs)
- Filter (anonymizer, kubernetes)
- Notifications (Slack, twilio)
Among many others.
Before deploying it to our cluster, let's discuss about how it works and how to configure it. Refer to the Fluentd docs for more details.
Fluentd manages events which are originated at sources. This events have the following three components
- tag: Specifies the origin where an event comes from. It is used for message routing.
- time: Specifies the time when an event happens with nanosecond resolution.
- record: Specifies the actual log as a JSON object.
Events follow a step-by-step cycle where they are processed in order, from top-to-bottom. In this process, they may pass through filters, which play two roles:
- Behave like a rule to pass or reject an event
- Transform the event (parse, add metadata as fields)
After passing these filters, the events should arrive to a match directive which assigns them to a certain output.
As mentioned before, tags are used for message routing. Filter and match directives must always have a match pattern which defines which events are processed by them. For example:
<match tag.to.match>
@type null
</match>
<match **>
@type stdout
</match>
The first match directive picks up events with tag equals to tag.to.match
and discards them by sending these events to a null output.
The second directive uses a **
wildcard which matches zero or more tag parts.
For example, the pattern a.**
matches a
, a.b
and a.b.c
And it sends the events to the standard output.
The configuration file allows the user to control the input and output behavior of Fluentd by 1) selecting input and output plugins; and, 2) specifying the plugin parameters. The file is required by Fluentd to operate properly.
The configuration file consists of the following directives:
- source directives determine the input sources
- match directives determine the output destinations
- filter directives determine the event processing pipelines
- system directives set system-wide configuration
- label directives group the output and filter for internal routing
- @include directives include other files
We will use what is called a high availability configuration for Fluentd, in order to achieve a resilient cluster level logging. It may seem as overkill for such a simple example, but I think it would be useful to have a configuration as close as possible to one which could be used in a high-traffic production environment.
Following this network topology, we will have two types of pod:
- Forwarders: Receive log events from the containers running on the node and send them to the aggregators
- Aggregators: Receive log events from the forwarders, buffer them and periodically upload them to the cloud. In our case, they will output logs to the standard output.
In order to add it to the cluster, we will be using helm charts due to their simplicity. In our case, bitnamis's fluentd helm chart will do the trick.
If you don't have helm installed in your cluster, please follow these instructions.
Once helm is successfully set up in your machine, we can use it to install fluentd with:
helm repo add bitnami https://charts.bitnami.com/bitnami
helm install fluentd bitnami/fluentd -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/values.yml
The first command adds the bitnami repository to helm, while the second one uses this values definition to deploy a DaemonSet of Forwarders and 2 aggregators with the necessary networking as a series of services. It also states that the forwarders look for their configuration on a ConfigMap named fluentd-forwarder-cm
while the aggregators will use one called fluentd-aggregator-cm
. Once deployed, our cluster will look like this:
kubectl get all
NAME READY STATUS RESTARTS AGE
pod/fluentd-0 0/1 ContainerCreating 0 95m
pod/fluentd-hwwcb 0/1 ContainerCreating 0 95m
pod/ms-test-67c97b479c-rpzrz 1/1 Running 0 5h54m
pod/nats-deployment-65687968fc-4rdxd 1/1 Running 0 5h54m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/fluentd-aggregator ClusterIP 10.107.174.54 <none> 9880/TCP,24224/TCP 95m
service/fluentd-forwarder ClusterIP 10.108.14.24 <none> 9880/TCP 95m
service/fluentd-headless ClusterIP None <none> 9880/TCP,24224/TCP 95m
service/fluentd-metrics ClusterIP 10.107.92.253 <none> 24231/TCP 95m
service/kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 28d
service/ms-test ClusterIP 10.107.57.48 <none> 3000/TCP 5h54m
service/nats-service ClusterIP 10.105.130.222 <none> 4222/TCP 5h54m
NAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE SELECTOR AGE
daemonset.apps/fluentd 1 1 0 1 0 <none> 95m
NAME READY UP-TO-DATE AVAILABLE AGE
deployment.apps/ms-test 1/1 1 1 5h54m
deployment.apps/nats-deployment 1/1 1 1 5h54m
NAME DESIRED CURRENT READY AGE
replicaset.apps/ms-test-67c97b479c 1 1 1 5h54m
replicaset.apps/nats-deployment-65687968fc 1 1 1 5h54m
NAME READY AGE
statefulset.apps/fluentd 0/2 95m
Our fluentd pods are not able to start yet because the ConfigMaps that they should use have not been created.
Let's change that:
kubectl apply -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/configmap.fluentd-forwarder.yml
kubectl apply -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/configmap.fluentd-aggregator.yml
The first command is creating a ConfigMap for the forwarder with the following Fluentd configuration:
fluentd/configmap.fluentd-forwarder.yml
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-forwarder-cm
labels:
app.kubernetes.io/component: forwarder
data:
fluentd.conf: |
# Ignore fluentd own events
<match fluent.**>
@type null
</match>
# HTTP input for the liveness and readiness probes
<source>
@type http
port 9880
</source>
# Throw the healthcheck to the standard output instead of forwarding it
<match fluentd.healthcheck>
@type stdout
</match>
# Get the logs from the containers running in the node
<source>
@type tail
path /var/log/containers/*.log
# exclude Fluentd logs
exclude_path /var/log/containers/*fluentd*.log
pos_file /opt/bitnami/fluentd/logs/buffers/fluentd-docker.pos
tag kubernetes.*
read_from_head true
<parse>
@type json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%NZ
</parse>
</source>
# Parse log field from our microservices to extract data
<filter kubernetes.**ms-test**>
@type parser
key_name log
reserve_data true
remove_key_name_field true
<parse>
@type json
</parse>
</filter>
# enrich with kubernetes metadata
<filter kubernetes.**>
@type kubernetes_metadata
</filter>
# Forward all logs to the aggregators
<match **>
@type forward
<server>
host fluentd-aggregator
port 24224
</server>
<buffer>
@type file
path /opt/bitnami/fluentd/logs/buffers/logs.buffer
flush_thread_count 2
flush_interval 5s
</buffer>
</match>
The first match
directive is used to ignore fluentd own log events. The first source
and second match
directives configure the healthcheck probes.
On the source
with @type tail
is where the magic begins to happen. Let's dissect this directive:
-
path
generates log events every time a new line is added to any of the log files which matches the regular expression -
exclude_path
allows us to ignore certain log files -
pos_file
records the last read position on this file -
read_from_head
tells fluentd to read logfiles from the head instead that from the tail -
tag
adds the tag kubernetes. The asterisk*
is used as a placeholder that expands to the actual file path, replacing'/'
with'.'
-
parse
states the format of the log. As we configured our microservice to emit logs as JSON with winston's help, we must specify the parse@type
field tojson
.
The event that comes out of this directive if there wasn't any filters after it would be:
2020-11-04 22:45:17.870457759 +0000 kubernetes.var.log.containers.ms-test-67c97b479c-
rpzrz_default_ms-test 26a3b0fed870efec99633c2c12b3eaa6daf70c8bfbebf7a21c256868d7927
0ec.log: {"log":"{\"level\":\"info\",\"timestamp\":\"2020-11 04T22:45:17.014Z\",\"metadata\":{},\"message\":\"TEST
MESSAGE\"}\n","stream":"stdout","time":"2020-11-04T22:45:17.01498099Z"}
This means that all data on the original log would go under the log
key, which is not very useful if we want to filter based on any of the inner log
JSON attributes.
That's the problem which the following filter directive of @type parser
solves. Let's see what all the attributes mean:
-
key_name
which field of the fluentd JSON event we want to extract attributes of -
reserve_data
if it's true, keep not only the parsed inner JSON attributes but also everything from the original outer JSON -
remove_key_name_field
remove the parsedlog
key
The resulting log looks like:
2020-11-04 22:52:07.867717854 +0000 kubernetes.var.log.containers.ms-test-67c97b479c-
rpzrz_default_ms-test-26a3b0fed870efec99633c2c12b3eaa6daf70c8bfbebf7a21c256868d7927
0ec.log: {"stream":"stdout","time":"2020-11-04T22:52:07.337748319Z","level":"info","timestamp":"2020-11-
04T22:52:07.337Z","metadata":{},"message":"TEST MESSAGE"}
Wouldn't it also be nice to have information about the kubernetes resource that emitted this log? This is exactly what the filter directive of @type kubernetes_metadata
does:
2020-11-04 23:06:58.709794753 +0000 kubernetes.var.log.containers.ms-test-67c97b479c-
rpzrz_default_ms-test-26a3b0fed870efec99633c2c12b3eaa6daf70c8bfbebf7a21c256868d7927
0ec.log: {"stream":"stdout","time":"2020-11-04T23:06:58.071944709Z","level":"info","timestamp":"2020-11-
04T23:06:58.071Z","metadata":{},"message":"TEST MESSAGE","docker":
{"container_id":"26a3b0fed870efec99633c2c12b3eaa6daf70c8bfbebf7a21c256868d79270ec"},"kubernetes":{"container_name":"ms-
test","namespace_name":"default","pod_name":"ms-test-67c97b479c-rpzrz","container_image":"marcosantarcangelo/ms-
test:v1","container_image_id":"docker-pullable://marcosantarcangelo/ms-
test@sha256:201d150436254ad534b31ed4e3807ba47dd432ad593f45a70d2151379c2d6ab4","pod_id":"0b51630f-3b00-40fc-8177-
6dfb4cbeedec","host":"minikube","labels":{"component":"ms-test","pod-template-
hash":"67c97b479c"},"master_url":"https://10.96.0.1:443/api","namespace_id":"18918f3a-d513-48b0-9651-105fbbea102f"}}
Finally the match **
directive forwards the received events to the aggregator every flush_interval
:
When a log forwarder receives events from applications, the events are first written into a disk buffer (specified by
buffer_path
). After everyflush_interval
, the buffered data is forwarded to aggregators.
This process is inherently robust against data loss. If a log forwarder's fluentd process dies then on its restart the buffered data is properly transferred to its aggregator. If the network between forwarders and aggregators breaks, the data transfer is automatically retried.
Now that our event has been sent to the aggregators, let's talk about their configuration
fluentd/configmap.fluentd-aggregator.yml
apiVersion: v1
kind: ConfigMap
metadata:
name: fluentd-aggregator-cm
labels:
app.kubernetes.io/component: aggregator
data:
fluentd.conf: |
# Ignore fluentd own events
<match fluent.**>
@type null
</match>
# TCP input to receive logs from the forwarders
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
# HTTP input for the liveness and readiness probes
<source>
@type http
bind 0.0.0.0
port 9880
</source>
# Throw the healthcheck to the standard output instead of forwarding it
<match fluentd.healthcheck>
@type null
</match>
<match kubernetes.var.log.containers.**fluentd**.log>
@type null
</match>
<match kubernetes.var.log.containers.**kube-system**.log>
@type null
</match>
<match kubernetes.var.log.containers.**ms-test**.log>
@type stdout
</match>
This configuration is easier that the one from the forwarder. First, we ignore Fluentd own events with the first match
directive.
Then, two sources are defined, one for healtcheck probes and another one to receive events from the forwarders.
Finally, we ignore healthchecks, fluentd logs and kube-system logs by sending them to a @type null
match directive and, on the other hand, we send the events coming from our ms-test
microservice to the standard output.
Finally the logs from the aggregator look like:
2020-11-05 12:40:26.124012096 +0000 kubernetes.var.log.containers.ms-test-67c97b479c-
6s2tf_default_ms-test-16e155539c376aaad0ca4b552e97762b847a78c4d8ab9d04fb641b7f8d362
6ab.log: {"stream":"stdout","time":"2020-11-05T12:40:25.559275494Z","level":"info","timestamp":"2020-11-
05T12:40:25.558Z","metadata":{},"message":"TEST MESSAGE","docker":
{"container_id":"16e155539c376aaad0ca4b552e97762b847a78c4d8ab9d04fb641b7f8d3626ab"},"kubernetes":{"container_name":"ms-
test","namespace_name":"default","pod_name":"ms-test-67c97b479c-6s2tf","container_image":"marcosantarcangelo/ms-
test:v1","container_image_id":"docker-pullable://marcosantarcangelo/ms-
test@sha256:201d150436254ad534b31ed4e3807ba47dd432ad593f45a70d2151379c2d6ab4","pod_id":"f58c5e53-4ab6-4b59-8133-
6cddcf5b31e6","host":"minikube","labels":{"component":"ms-test","pod-template-
hash":"67c97b479c"},"master_url":"https://10.96.0.1:443/api","namespace_id":"18918f3a-d513-48b0-9651-105fbbea102f"}}
A final step that we could take is, instead of outputting the events to the standard output, sending them to a log backend. As an example, we could substitute the final match
directive from the aggregator config with:
<match kubernetes.var.log.containers.**ms-test**.log>
@type elasticsearch
@log_level debug
include_tag_key true
tag_key @log_name
host "#{ENV['ELASTICSEARCH_HOST']}"
port "#{ENV['ELASTICSEARCH_PORT']}"
user "#{ENV['ELASTICSEARCH_USER']}"
password "#{ENV['ELASTICSEARCH_PASS']}"
scheme https
logstash_format true
<buffer>
@type file
path /opt/bitnami/fluentd/logs/buffers/logs.buffer
flush_thread_count 2
flush_interval 5s
</buffer>
</match>
This output plugin would buffer the received events and send them to the ElasticSearch server defined with host
, port
, user
and password
every 5 seconds. logstash_format true
configures Fluentd to use the conventional index name format logstash-%Y.%m.%d
Cleanup
kubectl delete -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/k8s/manifest.yaml
kubectl delete -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/configmap.fluentd-forwarder.yml
kubectl delete -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/configmap.fluentd-aggregator.yml
helm delete fluentd
Conclusion
We have taken a tour over the world of logging at a cluster level. Logs written by a pod running a Docker image of a Nestjs microservice with a production ready JSON format were picked up by a Fluentd forwarder. Thanks to the parser filter, we extracted information from the log record and added it to the Fluentd event. We were also able to add kubernetes metadata to this Event with the help of another filter, in this case of @type kubernetes_metadata
. These messages were buffered and sent to Fluentd aggregators, which in turn printed them to their standard output. We have also proposed a way to send them to a popular log backend such as ElasticSearch.
Bibliography
Kubernetes logging
NestJS logger technique
Fluentd docs
Fluentd Kubernetes metadata filter
Top comments (0)