DEV Community

Cover image for Streaming data into Kafka S01/E01 - Loading CSV file
Florian Hussonnois
Florian Hussonnois

Posted on • Updated on

Streaming data into Kafka S01/E01 - Loading CSV file

Ingesting data files in Apache Kafka is a very common task. Among all the various file formats that we can find, CSV is probably the most popular one to move data between different systems. This is due to its simplicity and to the fact that it can be used to export or import data from one (small) database to another.

A CSV file is nothing more than a text file (with a .csv extension). Each line of the file represents a data record and each record consists of one or more fields, separated by a comma (or another separator).

Here is a chunk of example :

40;War;02:38;1983;U2;Rock
Acrobat;Achtung Baby;04:30;1991;U2;Rock
Sunday Bloody Sunday;War;04:39;1983;U2;Rock
With or Without You;The Joshua Tree;04:55;1987; U2;Rock
Enter fullscreen mode Exit fullscreen mode

In Addition, a CSV file can contain a header line to indicate the name of each field.

title;album;duration;release;artist;type
Enter fullscreen mode Exit fullscreen mode

In this article, we will see how to integrate such a file in Apache Kafka. Of course, we are not going to reinvent the wheel. Many tools already exist to do this and are available in open-source.

We will use the Kafka Connect framework, which is part of the Apache Kafka project. Kafka Connect has been designed to move data in and out of Kafka using connectors.

Kafka Connect File Pulse connector

The Kafka Connect File Pulse connector makes it easy to parse, transform, and stream data file into Kafka. It supports several formats of files, but we will focus on CSV.

For a broad overview of FilePulse, I suggest you read this article :

For more information, you can check-out the documentation here.

How to use the connector

The easiest and fastest way to get started with Connect the FilePulse connector is to use the Docker image available on Docker Hub.

$ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
Enter fullscreen mode Exit fullscreen mode

You can download the docker-compose.yml file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.

$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
$ docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Once all Docker containers are started, you can check that the connector is installed on the Kafka Connect worker accessible on http://localhost:8083.

$ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse

"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
Enter fullscreen mode Exit fullscreen mode

You can also install the connector either from GitHub Releases Page or from Confluent Hub.

Ingesting data

Let's create a first connector instance :

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'
Enter fullscreen mode Exit fullscreen mode

You can check the connector is properly started by executing:

$ curl -s localhost:8083/connectors/source-csv-filepulse-00/status|jq  '.connector.state'

"RUNNING"
Enter fullscreen mode Exit fullscreen mode

Right now our connector is not processing any data at all. Let's copy a CSV file into the input directory /tmp/kafka-connect/examples/ :

// Download CSV file
$ export GITHUB_REPO_MASTER=https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/
$ curl -sSL $GITHUB_REPO_MASTER/examples/quickstart-musics-dataset.csv -o musics-dataset.csv

// Copy CSV file to docker-container
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples
$ docker cp musics-dataset.csv connect://tmp/kafka-connect/examples/musics-dataset-00.csv
Enter fullscreen mode Exit fullscreen mode

Let's check if we have some data into the output topic musics-filepulse-csv-00 :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t musics-filepulse-csv-00 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

{
  "message": {
    "string": "Zoo Station;Achtung Baby;04:36;1991;U2;Rock"
  },
  "headers": {
    "array": [
      {
        "string": "title;album;duration;release;artist;type"
      }
    ]
  }
}
Enter fullscreen mode Exit fullscreen mode

Note: In the example above, we are using kafkacat to consume messages. The option -o-1 is used to only consume the latest message.

As we can see, our topic contains one Avro message for each line of the CSV file. The reason for this is because we have configured our connector to use the RowFileInputReader (see: task.reader.class configuration)

Each record contains two fields:

  • message: This field is of type string and represents a single line of the input file.

  • headers: This field is of type array and contains the first header line of the input file. This field is automatically added by the RowFileInputReader because we have configured it with skip.headers=1.

Headers

The FilePulse connector will add headers to each record containing metadata about the source file from which the data was extracted. This can be useful for debugging but also for data lineage.

$ docker run --tty  --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t musics-filepulse-csv-00 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .headers

[
  "connect.file.name",
  "musics-dataset-00.csv",
  "connect.file.path",
  "/tmp/kafka-connect/examples",
  "connect.file.hash",
  "1466679696",
  "connect.file.size",
  "6588",
  "connect.file.lastModified",
  "1597337097000",
  "connect.hostname",
  "57a2fb6213f9"
]
Enter fullscreen mode Exit fullscreen mode

Parsing data

The File Pulse connector allows us to define complex pipelines to parse, transform, and enrich data through the use of processing Filters.

You can use the built-in DelimitedRowFilter to parse each line. Also, because the first line of the CSV file is a header you can set the property extractColumnName to name the record's fields based on the headers field.

Let's create a new connector with this new configuration :

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-01/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-01",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "filters":"ParseLine",
        "filters.ParseLine.extractColumnName": "headers",
        "filters.ParseLine.trimColumn": "true",
        "filters.ParseLine.separator": ";",
        "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter"
    }'
Enter fullscreen mode Exit fullscreen mode

Then, let's consume the topic musics-filepulse-csv-01 :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t musics-filepulse-csv-01 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault

{
  "title": {
    "string": "Zoo Station"
  },
  "album": {
    "string": "Achtung Baby"
  },
  "duration": {
    "string": "04:36"
  },
  "release": {
    "string": "1991"
  },
  "artist": {
    "string": "U2"
  },
  "type": {
    "string": "Rock"
  }
}
Enter fullscreen mode Exit fullscreen mode

Filtering data

Sometimes you may want to keep only the lines in a file that have a field with a particular value. For this article, let's imagine we need to only keep the song from the band AC/DC.

For doing this we will extend our filter chain to use the DropFilter and create a new connector (so that our file can be reprocessed):

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-02/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "filters":"ParseLine,KeepACDC",
        "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
        "filters.ParseLine.extractColumnName": "headers",
        "filters.ParseLine.trimColumn": "true",
        "filters.ParseLine.separator": ";",
        "filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
        "filters.KeepACDC.if":"{{ equals($value.artist, '\''AC/DC'\'') }}",
        "filters.KeepACDC.invert":"true"
    }'
Enter fullscreen mode Exit fullscreen mode

The propery if is set with a Simple Connect Expression Language (SCEL) which is a basic expression language provided by the Connect FilePulse connector to access and manipulate records fields.

Finally, you can check that you get the expected result by executing:

$ docker run --tty  --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t musics-filepulse-csv-02 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault.artist

{
  "string": "AC/DC"
}

Enter fullscreen mode Exit fullscreen mode

Changing the field types

You have probably noticed that at no time have we defined the type of our fields. By default, the connector assumes that all fields are of type string and maybe you are happy with that. But in most cases, you will want to convert a few fields. For example, we can convert the field year to be of type "Integer".

For doing this, you can use the AppendFilter with the Simple connect Expression.

Lets' create our final configuration :

$ curl \
    -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-03/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.csv$",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader",
        "offset.strategy":"name",
        "skip.headers": "1",
        "topic":"musics-filepulse-csv-03",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "filters":"ParseLine,KeepACDC,ReleaseToInt",
        "filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.DelimitedRowFilter",
        "filters.ParseLine.extractColumnName": "headers",
        "filters.ParseLine.trimColumn": "true",
        "filters.ParseLine.separator": ";",
        "filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
        "filters.KeepACDC.if":"{{ equals($value.artist, '\''AC/DC'\'') }}",
        "filters.KeepACDC.invert":"true",
        "filters.ReleaseToInt.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
        "filters.ReleaseToInt.field": "$value.release",
        "filters.ReleaseToInt.value": "{{ converts($value.release, '\''INTEGER'\'') }}",
        "filters.ReleaseToInt.overwrite": "true"
    }'
Enter fullscreen mode Exit fullscreen mode

Finally, you can check converted field :

$ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
        -b localhost:9092 \
        -t musics-filepulse-csv-03 \
        -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault.release

{
  "release": {
    "int": 1980
  }
}
Enter fullscreen mode Exit fullscreen mode

The AppendFilter is a very handy filter that allows us to quickly modify a record. For example, we could also use it to set the key of each record to be equal to the album name by adding the following configuration:

"filters.SetKey.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetKey.field": "$key",
"filters.SetKey.value": "{{ uppercase($value.album)}}"
Enter fullscreen mode Exit fullscreen mode

Note: For this article, we have used here the filter chain mechanism provided by the Connect File Pulse connector. But it is also possible to use the Kafka Connect Single Message Transformations (SMT) to perform the same task using the org.apache.kafka.connect.transforms.Cast.

Conclusion

We have seen in this article that it is fairly easy to load a CSV file into Kafka without writing a single line of code using Kafka Connect. Also, the Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before sending it into Apache Kafka. Please do not hesitate to share this article. If you like this project then add a ⭐ to the GitHub repository to support us. Thank you.

Discussion (2)

Collapse
gvdonovan profile image
Greg Donovan

Nice article Florian! I'm currently working on a project whereby customers send us csv files and then we send a subset of these files in various formats to our vendors. How can I combine the file metadata (i.e. file name, last modified date) with the file in the csv so that I can insert each row into a db. Assume I have a file name foo.csv with 3 records:
first_name,last_name
Biff,Tannen
Florian,Hussonnois

I'm looking to insert the following into a db:
foo.csv,,Biff,Tannen
foo.csv,,Florian,Hussonnois

in the form of
[
{file_name: "foo.csv", file_date: , first_name: "Biff", last_name: "Tannen"},
{file_name: "foo.csv", file_date: , first_name: "Florian", last_name: "Hussonnois"}
]

How might I configure the connector to do this?

Collapse
to_bullmarket profile image
Mariano Green Marubozu πŸ“ˆ

What configuration would you tell a source connector to process an Apache log file line by line as new lines are added to the log file, with the fields separated by spaces?

I do not know if it is possible with this connector, I do not see any example with the class io.streamthoughts.kafka.connect.filepulse.reader.RowFileInputReader