DEV Community

Marco Santarcangelo Zazzetta for Rebellion Pay

Posted on

Cluster-level logging in Kubernetes with Fluentd and Nest microservices

Scope

On this article, we are going to cover the basics of centralized logging on a microservices' based architecture using Fluentd as a logging agent. We will also talk a little bit about how to generate customized logs on a microservice created with Nestjs via an external logger such as Winston.

Introduction

Logs are an essential part of any software solution in order to diagnose problems and on a general note, to know what is happening inside of an application.
Let's talk about logging on Kubernetes. On a node level, everything that is written by a containerized application to stdout and stderr is picked up by the kubelet service running on that node. These messages are then sent to the container engine, which handles them according to the logging driver configured in Kubernetes.

By default, logs emitted by apps containerized with Docker end up in the /var/log/containers directory on the host machine.
These logs are then accesible via the command:

kubectl logs <pod_name>
Enter fullscreen mode Exit fullscreen mode

Even previous execution logs can be accessible using:

kubectl logs <pod_name> --previous
Enter fullscreen mode Exit fullscreen mode

However, this command has some clear limitations, as these logs are lost for example whenever a pod is deleted or a node crashes.
As such, logs should have a separate storage and lifecycle independent of nodes, pods, or containers. This concept is called cluster-level-logging.

diagram

Cluster-level logging also allows us to have an overall understanding of how the whole cluster is working together.

Cluster-level logging architectures

While Kubernetes does not provide a native solution for cluster-level logging, their documentation proposes three different approaches:

  1. Use a node-level logging agent that runs on every node.
  2. Include a dedicated sidecar container for logging in an application pod.
  3. Push logs directly to a backend from within an application.

On this article, we will focus on the first option. Feel free to check the referred documentation if you would like to know more about the other alternatives.

Logging agents work by deploying a node level logging agent which checks on different logs sources such as log files and forwards them to a dedicated backend.

As it is necessary for a replica of the logging agent to run on each node of the cluster, the recommended approach is to use a DaemonSet:

A DaemonSet ensures that all (or some) Nodes run a copy of a Pod. As nodes are added to the cluster, Pods are added to them. As nodes are removed from the cluster, those Pods are garbage collected. Deleting a DaemonSet will clean up the Pods it created.

Generating logs

Before setting up Fluentd as our logging agent, we need to have at least one microservice generating logs in our cluster with a certain format. For this purpose, a simple Nestjs microservice is available at your disposal. The process containerized tries to connect to a NATS server and emits a healthcheck message through this connection and to its standard output every 5 seconds.

You can clone it and edit it as you like, or just apply the manifest on the k8s folder into your cluster with:

kubectl apply -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/k8s/manifest.yaml
Enter fullscreen mode Exit fullscreen mode

The important part of this Nestjs microservice is the way in which the logs are formatted in a production environment. In order to learn how logs are set up to use an external logger in a Nestjs application, I suggest this great article from Nestjs documentation.
The key files regarding logging configuration in this microservice are:

src/factory/winstonConfig.ts

import { format, transports, addColors, LoggerOptions } from "winston";
import { MESSAGE } from 'triple-beam';
import * as dotenv from 'dotenv';

dotenv.config();
export class LoggerConfig {
  private readonly options: LoggerOptions;

  private static instance: LoggerConfig;

  public static getInstance(): LoggerConfig {
    if (!LoggerConfig.instance) {
      LoggerConfig.instance = new LoggerConfig();
    }

    return LoggerConfig.instance;
  }

  private defaultFormat() {
    return format.combine(
      format.timestamp(),
      format.metadata({
        fillExcept: ['message', 'level', 'timestamp', 'label']
      })
    );
  }

  private stagingFormat() {
    return format.combine(
      this.defaultFormat(),
      format.colorize(),
      format.printf((info) => {
        const reqId = info.metadata.reqId;
        delete info.metadata.reqId;

        return (`${info.level} | ${info.timestamp} ${reqId ? `| ${reqId}` : ''} | ${info.message} | ${JSON.stringify(info.metadata)}`);
      })
    );
  }

  private productionFormat() {
    return format.combine(
      this.defaultFormat(),
      format((info) => {
        const reqId = info.metadata.reqId;
        delete info.metadata.reqId;

        info.reqId = reqId;

        if (typeof info.message === 'object' && info.message !== null) {
          info.message = JSON.stringify(info.message);
        }

        const { level, timestamp, label } = info;

        const message = {
          level,
          timestamp,
          label,
          reqId,
          metadata: info.metadata,
          message: info.message
        };

        delete info.metadata;
        delete info.message;
        delete info.level;
        delete info.timestamp;
        delete info.label;

        info[MESSAGE] = JSON.stringify(message);

        return info;
      })()
    );
  }

  constructor() {
    this.options = {
      format: process.env.NODE_ENV === 'production' ? this.productionFormat() : this.stagingFormat(),
      transports: [
        new transports.Console({
          level: 'silly', // TODO: Get value from configfile 
        })
      ],
      exitOnError: false // do not exit on handled exceptions
    };
  }

  public console(): LoggerOptions {
    return this.options;
  }
}

addColors({
  error: 'bold green redBG',
  warn: 'italic black yellowBG',
  info: 'cyan',
  http: 'grey',
  verbose: 'magenta',
  debug: 'green',
  silly: 'bold gray magentaBG'
});
Enter fullscreen mode Exit fullscreen mode

The productionFormat function reshapes the logs so they show up at later stages as a series of JSONs with some useful data such as its log level or metadata as part of the message's key/value pairs. This way of formatting the logs will be great at the moment of collecting the data with Fluentd. We will talk about it in more detail in a moment.

src/main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { NATSConfigService } from './config/NATSConfigService';
import { WinstonModule } from 'nest-winston';
import { LoggerConfig } from './factory/winstonConfig';
import { InjectMetadataInterceptor } from './interceptors/InjectMetadataInterceptor';

async function bootstrap() {
  const logger: LoggerConfig = LoggerConfig.getInstance();
  const winstonLogger = WinstonModule.createLogger(logger.console());

  const context = await NestFactory.createApplicationContext(AppModule.register(), {
    logger: winstonLogger
  });

  const natsConfigService : NATSConfigService = context.get(NATSConfigService);

  context.close();

  const app = await NestFactory.createMicroservice(AppModule.register(), {
    ...natsConfigService.getNATSConfig,
    logger: winstonLogger
  });

  const globalInterceptors = [
    new InjectMetadataInterceptor()
  ];

  app.useGlobalInterceptors(...globalInterceptors);

  app.listen(() => winstonLogger.log('Microservice ms-test running'));

}
bootstrap();
Enter fullscreen mode Exit fullscreen mode

This is just a standard bootstrap for a pure Nestjs microservice with NATS as its messaging system and a custom logger.

src/app.module.ts

import { ConfigModule } from '@nestjs/config';
import { Module, DynamicModule } from '@nestjs/common';
import { NATSConfigService } from './config/NATSConfigService';
import { object as JoiObject, string as JoiString, number as JoiNumber } from '@hapi/joi';
import { WinstonModule } from 'nest-winston';
import { LoggerConfig } from './factory/winstonConfig';
import { MessageModule } from './message/message.module';
const logger: LoggerConfig = LoggerConfig.getInstance();    

@Module({})
export class AppModule {
  public static register(): DynamicModule {
    const imports = [
      WinstonModule.forRoot(logger.console()),
      ConfigModule.forRoot({
        isGlobal: true,
        validationSchema: JoiObject({
          NODE_ENV: JoiString()
            .valid('development', 'production', 'test')
            .default('development'),
          PORT: JoiNumber().port().default(3030),
          NATS_URL: JoiString().required().default('nats://localhost:4222'),
          NATS_USER: JoiString(),
          NATS_PASSWORD: JoiString(),
        })
      }),
      MessageModule
    ];

    const controllers = [];

    const providers = [NATSConfigService];

    return {
      module: AppModule,
      imports,
      controllers,
      providers,
    };
  }
}
Enter fullscreen mode Exit fullscreen mode

The app module definition where Logger, Config and Message modules are imported to the app.

src/message/message.service.ts

import { Injectable, Inject, OnModuleInit } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { MicroserviceMessage } from '../interface/MicroserviceMessage';
import { WINSTON_MODULE_PROVIDER } from "nest-winston";
import { Logger } from "winston";
import { ConfigService } from '@nestjs/config';

@Injectable()
export class MessageService implements OnModuleInit {
  constructor(
    @Inject(WINSTON_MODULE_PROVIDER) private readonly logger: Logger,
    @Inject('MESSAGE_CLIENT') private client: ClientProxy,
    private configService: ConfigService
    ){};

  sendMessage<Toutput = any, Tinput = any>(pattern: Record<string, any> | string, message: MicroserviceMessage<Tinput>): Observable<Toutput> {
    return this.client.send<Toutput, MicroserviceMessage<Tinput>>(pattern, message);
  }

  emitMessage<Toutput = any, Tinput = any>(pattern: Record<string, any> | string, message: MicroserviceMessage<Tinput>): Observable<Toutput> {
    return this.client.emit<Toutput, MicroserviceMessage<Tinput>>(pattern, message);
  }

  async onModuleInit(): Promise<void> {
    try {
      await this.client.connect();
    } catch (err) {
      this.logger.error(err, 'Connection to client has failed, start recovery');
      process.exit(1);
    }
    setInterval(async () => {
      try {
        await this.client.emit('ms-test"', 'healthcheck').toPromise();
        this.logger.info('TEST MESSAGE');
      } catch (err) {
        this.logger.error(err, 'Healthcheck has failed, start recovery');
        process.exit(2);
      }
    }, +this.configService.get<string>('HEALTHCHECK_INTERVAL'));
  }
}
Enter fullscreen mode Exit fullscreen mode

The message service in which the healtcheck message is emitted and logged to stdout. The MessageModule intended use is to send messages to other services.

Let's see what shows up at stdout:

$ kubectl logs ms-test-64d79dc754-jmshs

> ms-test@0.0.1 start:prod /app
> node dist/main

{"level":"info","timestamp":"2020-11-03T20:54:27.193Z","metadata":{"context":"NestFactory"},"message":"Starting Nest application..."}
{"level":"info","timestamp":"2020-11-03T20:54:28.002Z","metadata":{"context":"InstanceLoader"},"message":"ConfigHostModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.088Z","metadata":{"context":"InstanceLoader"},"message":"WinstonModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.090Z","metadata":{"context":"InstanceLoader"},"message":"ConfigModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.992Z","metadata":{"context":"InstanceLoader"},"message":"MetricsModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.994Z","metadata":{"context":"InstanceLoader"},"message":"MessageModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:28.995Z","metadata":{"context":"InstanceLoader"},"message":"AppModule dependencies initialized"}
{"level":"info","timestamp":"2020-11-03T20:54:29.291Z","metadata":{"context":"NestMicroservice"},"message":"Nest microservice successfully started"}
{"level":"info","timestamp":"2020-11-03T20:54:29.595Z","metadata":{"context":"RoutesResolver"},"message":"AppController {}:"}
{"level":"info","timestamp":"2020-11-03T20:54:29.698Z","metadata":{"context":"RouterExplorer"},"message":"Mapped {, POST} route"}
{"level":"info","timestamp":"2020-11-03T20:54:29.789Z","metadata":{"context":"RouterExplorer"},"message":"Mapped {, GET} route"}
{"level":"info","timestamp":"2020-11-03T20:54:30.193Z","metadata":{"context":"NestApplication"},"message":"Nest application successfully started"}
{"level":"info","timestamp":"2020-11-03T20:54:30.293Z","metadata":{},"message":"Hybrid ms-test test running on port 3003"}
{"level":"info","timestamp":"2020-11-03T20:54:35.106Z","metadata":{},"message":"TEST MESSAGE"}
{"level":"info","timestamp":"2020-11-03T20:54:40.103Z","metadata":{},"message":"TEST MESSAGE"}

Enter fullscreen mode Exit fullscreen mode

Great! Now onward towards Fluentd.

Fluentd

Fluentd is a popular opensource data collector with an outstanding amount of plugin options for several purposes :

  • Output (AWS S3, ElasticSearch)
  • Input (Apache Kafka, HTTP, TCP)
  • Big Data (webhdfs)
  • Filter (anonymizer, kubernetes)
  • Notifications (Slack, twilio)

Among many others.
Before deploying it to our cluster, let's discuss about how it works and how to configure it. Refer to the Fluentd docs for more details.
Fluentd manages events which are originated at sources. This events have the following three components

  • tag: Specifies the origin where an event comes from. It is used for message routing.
  • time: Specifies the time when an event happens with nanosecond resolution.
  • record: Specifies the actual log as a JSON object.

Events follow a step-by-step cycle where they are processed in order, from top-to-bottom. In this process, they may pass through filters, which play two roles:

  1. Behave like a rule to pass or reject an event
  2. Transform the event (parse, add metadata as fields)

After passing these filters, the events should arrive to a match directive which assigns them to a certain output.
As mentioned before, tags are used for message routing. Filter and match directives must always have a match pattern which defines which events are processed by them. For example:

<match tag.to.match>
  @type null
</match>

<match **>
  @type stdout
</match>
Enter fullscreen mode Exit fullscreen mode

The first match directive picks up events with tag equals to tag.to.match and discards them by sending these events to a null output.
The second directive uses a ** wildcard which matches zero or more tag parts.
For example, the pattern a.** matches a, a.b and a.b.c
And it sends the events to the standard output.

The configuration file allows the user to control the input and output behavior of Fluentd by 1) selecting input and output plugins; and, 2) specifying the plugin parameters. The file is required by Fluentd to operate properly.
The configuration file consists of the following directives:

  • source directives determine the input sources
  • match directives determine the output destinations
  • filter directives determine the event processing pipelines
  • system directives set system-wide configuration
  • label directives group the output and filter for internal routing
  • @include directives include other files

We will use what is called a high availability configuration for Fluentd, in order to achieve a resilient cluster level logging. It may seem as overkill for such a simple example, but I think it would be useful to have a configuration as close as possible to one which could be used in a high-traffic production environment.
Following this network topology, we will have two types of pod:

  1. Forwarders: Receive log events from the containers running on the node and send them to the aggregators
  2. Aggregators: Receive log events from the forwarders, buffer them and periodically upload them to the cloud. In our case, they will output logs to the standard output.

In order to add it to the cluster, we will be using helm charts due to their simplicity. In our case, bitnamis's fluentd helm chart will do the trick.
If you don't have helm installed in your cluster, please follow these instructions.
Once helm is successfully set up in your machine, we can use it to install fluentd with:

helm repo add bitnami https://charts.bitnami.com/bitnami
helm install fluentd bitnami/fluentd -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/values.yml
Enter fullscreen mode Exit fullscreen mode

The first command adds the bitnami repository to helm, while the second one uses this values definition to deploy a DaemonSet of Forwarders and 2 aggregators with the necessary networking as a series of services. It also states that the forwarders look for their configuration on a ConfigMap named fluentd-forwarder-cm while the aggregators will use one called fluentd-aggregator-cm. Once deployed, our cluster will look like this:

kubectl get all
NAME                                   READY   STATUS              RESTARTS   AGE
pod/fluentd-0                          0/1     ContainerCreating   0          95m
pod/fluentd-hwwcb                      0/1     ContainerCreating   0          95m
pod/ms-test-67c97b479c-rpzrz           1/1     Running             0          5h54m
pod/nats-deployment-65687968fc-4rdxd   1/1     Running             0          5h54m

NAME                         TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)              AGE
service/fluentd-aggregator   ClusterIP   10.107.174.54    <none>        9880/TCP,24224/TCP   95m
service/fluentd-forwarder    ClusterIP   10.108.14.24     <none>        9880/TCP             95m
service/fluentd-headless     ClusterIP   None             <none>        9880/TCP,24224/TCP   95m
service/fluentd-metrics      ClusterIP   10.107.92.253    <none>        24231/TCP            95m
service/kubernetes           ClusterIP   10.96.0.1        <none>        443/TCP              28d
service/ms-test              ClusterIP   10.107.57.48     <none>        3000/TCP             5h54m
service/nats-service         ClusterIP   10.105.130.222   <none>        4222/TCP             5h54m

NAME                     DESIRED   CURRENT   READY   UP-TO-DATE   AVAILABLE   NODE SELECTOR   AGE
daemonset.apps/fluentd   1         1         0       1            0           <none>          95m

NAME                              READY   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/ms-test           1/1     1            1           5h54m
deployment.apps/nats-deployment   1/1     1            1           5h54m

NAME                                         DESIRED   CURRENT   READY   AGE
replicaset.apps/ms-test-67c97b479c           1         1         1       5h54m
replicaset.apps/nats-deployment-65687968fc   1         1         1       5h54m

NAME                       READY   AGE
statefulset.apps/fluentd   0/2     95m

Enter fullscreen mode Exit fullscreen mode

Our fluentd pods are not able to start yet because the ConfigMaps that they should use have not been created.
Let's change that:

kubectl apply -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/configmap.fluentd-forwarder.yml
kubectl apply -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/configmap.fluentd-aggregator.yml
Enter fullscreen mode Exit fullscreen mode

The first command is creating a ConfigMap for the forwarder with the following Fluentd configuration:

fluentd/configmap.fluentd-forwarder.yml

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-forwarder-cm
  labels:
    app.kubernetes.io/component: forwarder
data:
  fluentd.conf: |
    # Ignore fluentd own events
    <match fluent.**>
      @type null
    </match>

    # HTTP input for the liveness and readiness probes
    <source>
      @type http
      port 9880
    </source>

    # Throw the healthcheck to the standard output instead of forwarding it
    <match fluentd.healthcheck>
      @type stdout
    </match>

    # Get the logs from the containers running in the node
    <source>
      @type tail
      path /var/log/containers/*.log
      # exclude Fluentd logs
      exclude_path /var/log/containers/*fluentd*.log
      pos_file /opt/bitnami/fluentd/logs/buffers/fluentd-docker.pos
      tag kubernetes.*
      read_from_head true
      <parse>
        @type json
        time_key timestamp
        time_format %Y-%m-%dT%H:%M:%S.%NZ
      </parse>
    </source>

    # Parse log field from our microservices to extract data
    <filter kubernetes.**ms-test**>
      @type parser
      key_name log
      reserve_data true
      remove_key_name_field true
      <parse>
        @type json
      </parse>
    </filter>

    # enrich with kubernetes metadata
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>

    # Forward all logs to the aggregators
    <match **>
      @type forward
      <server>
        host fluentd-aggregator
        port 24224
      </server>

      <buffer>
        @type file
        path /opt/bitnami/fluentd/logs/buffers/logs.buffer
        flush_thread_count 2
        flush_interval 5s
      </buffer>
    </match>

Enter fullscreen mode Exit fullscreen mode

The first match directive is used to ignore fluentd own log events. The first source and second match directives configure the healthcheck probes.

On the source with @type tail is where the magic begins to happen. Let's dissect this directive:

  • path generates log events every time a new line is added to any of the log files which matches the regular expression
  • exclude_path allows us to ignore certain log files
  • pos_file records the last read position on this file
  • read_from_head tells fluentd to read logfiles from the head instead that from the tail
  • tag adds the tag kubernetes. The asterisk * is used as a placeholder that expands to the actual file path, replacing '/' with '.'
  • parse states the format of the log. As we configured our microservice to emit logs as JSON with winston's help, we must specify the parse @type field to json.

The event that comes out of this directive if there wasn't any filters after it would be:

2020-11-04 22:45:17.870457759 +0000 kubernetes.var.log.containers.ms-test-67c97b479c-
rpzrz_default_ms-test 26a3b0fed870efec99633c2c12b3eaa6daf70c8bfbebf7a21c256868d7927
0ec.log: {"log":"{\"level\":\"info\",\"timestamp\":\"2020-11 04T22:45:17.014Z\",\"metadata\":{},\"message\":\"TEST 
MESSAGE\"}\n","stream":"stdout","time":"2020-11-04T22:45:17.01498099Z"}
Enter fullscreen mode Exit fullscreen mode

This means that all data on the original log would go under the log key, which is not very useful if we want to filter based on any of the inner log JSON attributes.

That's the problem which the following filter directive of @type parser solves. Let's see what all the attributes mean:

  • key_name which field of the fluentd JSON event we want to extract attributes of
  • reserve_data if it's true, keep not only the parsed inner JSON attributes but also everything from the original outer JSON
  • remove_key_name_field remove the parsed log key

The resulting log looks like:

2020-11-04 22:52:07.867717854 +0000 kubernetes.var.log.containers.ms-test-67c97b479c-
rpzrz_default_ms-test-26a3b0fed870efec99633c2c12b3eaa6daf70c8bfbebf7a21c256868d7927
0ec.log: {"stream":"stdout","time":"2020-11-04T22:52:07.337748319Z","level":"info","timestamp":"2020-11-
04T22:52:07.337Z","metadata":{},"message":"TEST MESSAGE"}
Enter fullscreen mode Exit fullscreen mode

Wouldn't it also be nice to have information about the kubernetes resource that emitted this log? This is exactly what the filter directive of @type kubernetes_metadata does:

2020-11-04 23:06:58.709794753 +0000 kubernetes.var.log.containers.ms-test-67c97b479c-
rpzrz_default_ms-test-26a3b0fed870efec99633c2c12b3eaa6daf70c8bfbebf7a21c256868d7927
0ec.log: {"stream":"stdout","time":"2020-11-04T23:06:58.071944709Z","level":"info","timestamp":"2020-11-
04T23:06:58.071Z","metadata":{},"message":"TEST MESSAGE","docker":
{"container_id":"26a3b0fed870efec99633c2c12b3eaa6daf70c8bfbebf7a21c256868d79270ec"},"kubernetes":{"container_name":"ms-
test","namespace_name":"default","pod_name":"ms-test-67c97b479c-rpzrz","container_image":"marcosantarcangelo/ms-
test:v1","container_image_id":"docker-pullable://marcosantarcangelo/ms-
test@sha256:201d150436254ad534b31ed4e3807ba47dd432ad593f45a70d2151379c2d6ab4","pod_id":"0b51630f-3b00-40fc-8177-
6dfb4cbeedec","host":"minikube","labels":{"component":"ms-test","pod-template-
hash":"67c97b479c"},"master_url":"https://10.96.0.1:443/api","namespace_id":"18918f3a-d513-48b0-9651-105fbbea102f"}}
Enter fullscreen mode Exit fullscreen mode

Finally the match ** directive forwards the received events to the aggregator every flush_interval:

When a log forwarder receives events from applications, the events are first written into a disk buffer (specified by buffer_path). After every flush_interval, the buffered data is forwarded to aggregators.
This process is inherently robust against data loss. If a log forwarder's fluentd process dies then on its restart the buffered data is properly transferred to its aggregator. If the network between forwarders and aggregators breaks, the data transfer is automatically retried.

Now that our event has been sent to the aggregators, let's talk about their configuration

fluentd/configmap.fluentd-aggregator.yml

apiVersion: v1
kind: ConfigMap
metadata:
  name: fluentd-aggregator-cm
  labels:
    app.kubernetes.io/component: aggregator
data:
  fluentd.conf: |
    # Ignore fluentd own events
    <match fluent.**>
      @type null
    </match>

    # TCP input to receive logs from the forwarders
    <source>
      @type forward
      bind 0.0.0.0
      port 24224
    </source>

    # HTTP input for the liveness and readiness probes
    <source>
      @type http
      bind 0.0.0.0
      port 9880
    </source>

    # Throw the healthcheck to the standard output instead of forwarding it
    <match fluentd.healthcheck>
      @type null
    </match>

    <match kubernetes.var.log.containers.**fluentd**.log>
      @type null
    </match>

    <match kubernetes.var.log.containers.**kube-system**.log>
      @type null
    </match>

    <match kubernetes.var.log.containers.**ms-test**.log>
      @type stdout
    </match>
Enter fullscreen mode Exit fullscreen mode

This configuration is easier that the one from the forwarder. First, we ignore Fluentd own events with the first match directive.
Then, two sources are defined, one for healtcheck probes and another one to receive events from the forwarders.
Finally, we ignore healthchecks, fluentd logs and kube-system logs by sending them to a @type null match directive and, on the other hand, we send the events coming from our ms-test microservice to the standard output.
Finally the logs from the aggregator look like:

2020-11-05 12:40:26.124012096 +0000 kubernetes.var.log.containers.ms-test-67c97b479c-
6s2tf_default_ms-test-16e155539c376aaad0ca4b552e97762b847a78c4d8ab9d04fb641b7f8d362
6ab.log: {"stream":"stdout","time":"2020-11-05T12:40:25.559275494Z","level":"info","timestamp":"2020-11-
05T12:40:25.558Z","metadata":{},"message":"TEST MESSAGE","docker":
{"container_id":"16e155539c376aaad0ca4b552e97762b847a78c4d8ab9d04fb641b7f8d3626ab"},"kubernetes":{"container_name":"ms-
test","namespace_name":"default","pod_name":"ms-test-67c97b479c-6s2tf","container_image":"marcosantarcangelo/ms-
test:v1","container_image_id":"docker-pullable://marcosantarcangelo/ms-
test@sha256:201d150436254ad534b31ed4e3807ba47dd432ad593f45a70d2151379c2d6ab4","pod_id":"f58c5e53-4ab6-4b59-8133-
6cddcf5b31e6","host":"minikube","labels":{"component":"ms-test","pod-template-
hash":"67c97b479c"},"master_url":"https://10.96.0.1:443/api","namespace_id":"18918f3a-d513-48b0-9651-105fbbea102f"}}
Enter fullscreen mode Exit fullscreen mode

A final step that we could take is, instead of outputting the events to the standard output, sending them to a log backend. As an example, we could substitute the final match directive from the aggregator config with:

    <match kubernetes.var.log.containers.**ms-test**.log>
      @type elasticsearch
      @log_level debug
      include_tag_key true
      tag_key @log_name
      host "#{ENV['ELASTICSEARCH_HOST']}"
      port "#{ENV['ELASTICSEARCH_PORT']}"
      user "#{ENV['ELASTICSEARCH_USER']}"
      password "#{ENV['ELASTICSEARCH_PASS']}"
      scheme https
      logstash_format true

      <buffer>
        @type file
        path /opt/bitnami/fluentd/logs/buffers/logs.buffer
        flush_thread_count 2
        flush_interval 5s
      </buffer>
    </match>
Enter fullscreen mode Exit fullscreen mode

This output plugin would buffer the received events and send them to the ElasticSearch server defined with host, port, user and password every 5 seconds. logstash_format true configures Fluentd to use the conventional index name format logstash-%Y.%m.%d

Cleanup

kubectl delete -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/k8s/manifest.yaml
kubectl delete -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/configmap.fluentd-forwarder.yml
kubectl delete -f https://raw.githubusercontent.com/marcosantar93/ms-test/main/fluentd/configmap.fluentd-aggregator.yml
helm delete fluentd
Enter fullscreen mode Exit fullscreen mode

Conclusion

We have taken a tour over the world of logging at a cluster level. Logs written by a pod running a Docker image of a Nestjs microservice with a production ready JSON format were picked up by a Fluentd forwarder. Thanks to the parser filter, we extracted information from the log record and added it to the Fluentd event. We were also able to add kubernetes metadata to this Event with the help of another filter, in this case of @type kubernetes_metadata. These messages were buffered and sent to Fluentd aggregators, which in turn printed them to their standard output. We have also proposed a way to send them to a popular log backend such as ElasticSearch.

Bibliography

Kubernetes logging
NestJS logger technique
Fluentd docs
Fluentd Kubernetes metadata filter

Top comments (0)