DEV Community

Cover image for Streaming data into Kafka S01/E04 — Parsing log files using Grok Expressions
Florian Hussonnois
Florian Hussonnois

Posted on • Updated on

Streaming data into Kafka S01/E04 — Parsing log files using Grok Expressions

This is the fourth and last article of the series "Streaming data into Kafka" series. In the first three articles, we saw how it's fairly easy to use Kafka Connect to load records from CSV, XML and JSON files into Apache Kafka without writing a single line of code. For doing this, we have used the Kafka Connect FilePulse connector which packs with a lot of nice features to parse and transform data.

In this last article, we are going to see how to parse unstructured logs files from an NGINX web server into structured data fields.

Kafka Connect File Pulse connector

If you have already read the previous articles go directly to the next section (i.e Ingesting Data).

The Kafka Connect FilePulse connector is a powerful source connector that makes it easy to parse, transform, and load data from the local file system into Apache Kafka. It offers built-in support for various file formats (e.g: CSV, XML, JSON, LOG4J, AVRO).

For a broad overview of FilePulse, I suggest you read this article:

For more information, you can check-out the documentation here.

How to use the connector

The easiest and fastest way to get started with the Kafka Connect FilePulse connector is to use the Docker image available on Docker Hub.

$ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
Enter fullscreen mode Exit fullscreen mode

You can download the docker-compose.yml file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.

$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
$ docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on http://localhost:8083.

$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse

"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
Enter fullscreen mode Exit fullscreen mode

Note: You can also install the connector either from GitHub Releases Page or from Confluent Hub.

Starting NGINX

First, let's start an NGINX instance to serve a single HTML page, using Docker.

  • Let's create some directories for the sake of this demo.
$ mkdir -p demo/content demo/logs
Enter fullscreen mode Exit fullscreen mode
  • ...and a simple HTML page called ìndex.html:
$ cat <<EOF > demo/content/index.html
<!DOCTYPE html>
<html>
  <head>
    <title>Hi!</title>
  </head>
  <body>
    <h1>Hello World - Kafka Connect FilePulse</h1>
    <strong>You can add a Star to this repository to support us! Thank You<a href="https://github.com/streamthoughts/kafka-connect-file-pulse">GitHub</a></strong>
  </body>
</html>
EOF
Enter fullscreen mode Exit fullscreen mode
  • Then, start the NGINX web server, by running:
$ docker run --name nginx \
-p 8080:80 \
-v `pwd`/demo/content:/usr/share/nginx/html:ro -d nginx
Enter fullscreen mode Exit fullscreen mode
  • Check that the server is running properly:
$ curl -X GET http://localhost:8080
Enter fullscreen mode Exit fullscreen mode
  • Finally, and to simplify things for the rest of the article, we will redirect stderr and stdout of the container to the ./demo/logs/nginx.log file.
$ docker logs -f nginx > ./demo/logs/nginx.log 2>&1 & 
Enter fullscreen mode Exit fullscreen mode

Ingesting Data

First, let's stop the container running Kafka Connect that was started using docker-compose.

$ docker stop connect && docker rm connect
Enter fullscreen mode Exit fullscreen mode

Then, start a new one with a mounted volume for accessing the nginx.log file.

  • Create a file to define the environment variables that must be set for Kafka Connect container.
$ cat <<EOF > connect-file-pulse-env.list
CONNECT_BOOTSTRAP_SERVERS=localhost:9092
CONNECT_REST_ADVERTISED_HOST_NAME=connect
CONNECT_REST_PORT=8083
CONNECT_GROUP_ID=compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC=docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
CONNECT_OFFSET_FLUSH_INTERVAL_MS=10000
CONNECT_OFFSET_STORAGE_TOPIC=docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
CONNECT_STATUS_STORAGE_TOPIC=docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
CONNECT_KEY_CONVERTER=org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://localhost:8081
CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT=localhost:2181
CONNECT_PLUGIN_PATH=/usr/share/java,/usr/share/confluent-hub-components/
EOF
Enter fullscreen mode Exit fullscreen mode
  • Start the container running the Kafka Connect FilePulse connector:
$ docker run -it \
 --network=host \
 --name=connect \
 --env-file connect-file-pulse-env.list \
 -v `pwd`/demo/logs:/tmp/connect-data \
 streamthoughts/kafka-connect-file-pulse:latest
Enter fullscreen mode Exit fullscreen mode

Then, create a new connector with the below configuration:

$ curl \
    -sX PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-01/config \
    -d '{
     "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
     "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
     "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
     "fs.scan.directory.path": "/tmp/connect-data",
     "fs.scan.interval.ms": "10000",
     "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
     "file.filter.regex.pattern":".*\\.log$",
     "internal.kafka.reporter.bootstrap.servers": "localhost:9092",
     "internal.kafka.reporter.topic": "connect-file-pulse-status",
     "offset.strategy": "name",
     "read.max.wait.ms": "900000",
     "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
     "topic": "connect-file-pulse-nginx-raw-logs",
     "tasks.max": 1
}' | jq
Enter fullscreen mode Exit fullscreen mode

Finally, you can consume the Topic named connect-file-pulse-nginx-raw-logs to verify that the connector has detected the .log file:

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t connect-file-pulse-nginx-raw-logs \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
Enter fullscreen mode Exit fullscreen mode

(output)

{
  "message": {
    "string": "172.17.0.1 - - [05/Jan/2021:10:56:52 +0000] \"GET / HTTP/1.1\" 200 306 \"-\" \"curl/7.58.0\" \"-\""
  }
}
Enter fullscreen mode Exit fullscreen mode
  • Eventually, you can generate more access logs by running:
$ for i in $(seq 0 100); do curl -sX GET http://localhost:8080 >/dev/null; sleep 1 ; done &
Enter fullscreen mode Exit fullscreen mode

Note: In the example above, we have used kafkacat to consume the topics. The option -o-1 is used to only consume the latest message

Ok, let's take a moment to describe the configuration we've just used.

First, the connector will periodically scan the input directory /tmp/connect-data that we set through the property fs.scan.directory.path and will lookup for files matching the pattern .*\\.log$.

Then, each file will be uniquely identified and tracked depending on the value of the offset.strategy property. Here, the configuration specifies that a file must be identified by its name. Alternatively, we could, for example, have chosen to use the inode of the file as an identifier. Connect FilePulse supports multiple identifiers that can be combined (e.g: name+hash).

In addition, the connector is configured to use the RowFileInputReader (see: task.reader.class) that allows creating one Kafka record per line.

One of the characteristics of the RowFileInputReader is that will not immediately complete the processing after hitting the end of the file but will wait until reaching a timeout for more bytes to be written to the file. This behavior is configured through the read.max.wait.ms property. Here we are waiting 15 minutes before finishing the file processing.

Parsing Data using Grok Expressions

So far, we have been able to read the NGINX logs continuously. Each time a new line is added to the file a new record is sent to Kafka containing a single text field called message. But, it would be preferable to be able to parse each line to extract useful data and to produce structured messages into Kafka.

The Elastic/ELK Stack and in particular the Logstash solution has popularised the use of Grok expressions to parse and transform unstructured data into meaningful fields. Grok sits on top of Regular Expression (regex) to match relevant data using text patterns.

Connect FilePulse brings the power of Grok Expression directly to Kafka Connect with the GrokFilter that under the hood uses the Joni library the Java port of Oniguruma regexp library. It also provides a lot of predefined and reusable grok patterns. See the complete list of patterns.

Let's define a custom grok pattern to match lines of the NGINX access log file.

$ cat <<EOF > nginx
NGINX_ACCESS %{IPORHOST:remote_addr} - %{USERNAME:remote_user} \[%{HTTPDATE:time_local}\] \"%{DATA:request}\" %{INT:status} %{NUMBER:bytes_sent} \"%{DATA:http_referer}\" \"%{DATA:http_user_agent}\"
EOF
Enter fullscreen mode Exit fullscreen mode

Note: The syntax for a grok pattern is %{SYNTAX:SEMANTIC} or %{SYNTAX:SEMANTIC:TYPE}.

Then, we have to make the pattern available to the connector by copying the nginx file, previously created, into the container :

$ docker exec -it connect mkdir -p /tmp/grok-patterns
$ docker cp nginx connect://tmp/grok-patterns/nginx
Enter fullscreen mode Exit fullscreen mode

After that, we can create a new connector with the following configuration :

$ curl \
    -sX PUT -H "Accept:application/json" \
    -H "Content-Type:application/json" http://localhost:8083/connectors/source-log-filepulse-02/config \
    -d '{
     "connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
     "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
     "fs.scanner.class": "io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker",
     "fs.scan.directory.path": "/tmp/connect-data",
     "fs.scan.interval.ms": "10000",
     "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
     "file.filter.regex.pattern":".*\\.log$",
     "internal.kafka.reporter.bootstrap.servers": "localhost:9092",
     "internal.kafka.reporter.topic": "connect-file-pulse-status",
     "offset.strategy": "name",
     "read.max.wait.ms": "120000",
     "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
     "topic": "connect-file-pulse-nginx-parsed-logs",
     "tasks.max": 1,
     "filters": "ParseLog",
     "filters.ParseLog.type": "io.streamthoughts.kafka.connect.filepulse.filter.GrokFilter",
     "filters.ParseLog.match": "%{NGINX_ACCESS}",
     "filters.ParseLog.overwrite": "message",
     "filters.ParseLog.source": "message",
     "filters.ParseLog.ignoreFailure": "true",
     "filters.ParseLog.patternsDir": "/tmp/grok-patterns"
}' | jq
Enter fullscreen mode Exit fullscreen mode

Finally, let's consume the output Topic connect-file-pulse-nginx-parsed-logs to observe the extracted fields.

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t connect-file-pulse-nginx-parsed-logs \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
Enter fullscreen mode Exit fullscreen mode

(output)

{
  "message": {
    "string": "172.17.0.1 - - [05/Jan/2021:13:14:54 +0000] \"GET / HTTP/1.1\" 200 306 \"-\" \"curl/7.58.0\" \"-\""
  },
  "remote_addr": {
    "string": "172.17.0.1"
  },
  "remote_user": {
    "string": "-"
  },
  "time_local": {
    "string": "05/Jan/2021:13:14:54 +0000"
  },
  "request": {
    "string": "GET / HTTP/1.1"
  },
  "status": {
    "string": "200"
  },
  "bytes_sent": {
    "string": "306"
  },
  "http_referer": {
    "string": "-"
  },
  "http_user_agent": {
    "string": "curl/7.58.0"
  }
}
Enter fullscreen mode Exit fullscreen mode

As previously, you can Eventually generate more access logs by running:

$ for i in $(seq 0 100); do curl -sX GET http://localhost:8080 >/dev/null; sleep 1 ; done
Enter fullscreen mode Exit fullscreen mode

Et voilà, It's as simple as that!

Using Grok Expression with Kafka Connect SMT

In the previous example, we use the processing filter chain feature provided by Connect FilePulse.

But, Kafka Connect already ships with a mechanism called Single Message Transforms (SMTs) that was added in Apache Kafka 0.10 (KIP-66). SMTs can be used to modify the data of each record that flow through Kafka Connect pipeline.

Good news ! We have externalized the work done with the GrokFilter to a dedicated SMT called: Kafka Connect Grok Transformation.

Conclusion

We have seen in this article that it is fairly easy to continuously read and parse log files. The Connect File Pulse connector ships with the GrokFilter to parse unstructured data using Grok expressions as you would have done with Logstash.

More generally, Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before sending it into Apache Kafka. Please do not hesitate to share this article. If you like this project then add a ⭐ to the GitHub repository to support us. Thank you.

Top comments (2)

Collapse
 
nybblehub profile image
Nybble

Hi Florian,

Really cool feature ! I have one question, is it possible to add a k/v during the parsing process or this need to be done after, separately ?

Thanks !

Collapse
 
fhussonnois profile image
Florian Hussonnois

Hi,

If your need is to add a simple K/V field this can be done using the AppendFilter provided by FilePulse. FilePulse allows you to chain multiple processing filters similar to SMTs.

I hope I've answered to your question