DEV Community

Francesco Tisiot
Francesco Tisiot

Posted on • Originally published at aiven.io

Using Kafka Connect JDBC Source: a PostgreSQL example

Football Hero Image

Using Kafka Connect JDBC Source: a PostgreSQL example

If we go back in history few years, the typical data pipeline was an app creating events and pushing them to a backend database. Data was then propagated to downstream applications via dedicated ETL flows at regular intervals, usually daily.
In these modern times, Apache Kafka has become the default data platform. Apps write events to Kafka, which then distributes them in near-real-time to downstream sinks like databases or cloud storages. Kafka Connect, a framework to stream data into and out of Apache Kafka, represents a further optimisation that makes the ingestion and propagation of events just a matter of config files settings.

What if we're facing an old app-to-database design? How can we bring it to 2021 and include Kafka in the game? Instead of batch exporting to the database at night, we can add Kafka to the existing system. Kafka Connect lets us integrate to an existing system and make use of more up-to-date tech tools, without disrupting the original solution.

One way to do this is to use the Kafka Connect JDBC Connector. This post will walk you through an example of sourcing data from an existing table in PostgreSQL and populating a Kafka topic with only the changed rows. This is a great approach for many use cases. But when no additional query load to the source system is allowed, you could also make use of change data capture solutions based on tools like Debezium. As we'll see later, Aiven provides Kafka Connect as a managed service for both options. You can start your connectors without the hassle of managing a dedicated cluster.

Architecture Overview

This blog post provides an example of the Kafka Connect JDBC Source based on a PostgreSQL database. A more detailed explanation of the connector is provided in our help article

In our example, we first create a PostgreSQL database to act as backend data storage for our imaginary application. Then we create a Kafka cluster with Kafka Connect and show how any new or modified row in PostgreSQL appears in a Kafka topic.

Creating the PostgreSQL Source system

We'll create the whole setup using the Aiven Command Line Interface. Follow the instructions in the help article to install and log in. All you need is Python 3.5+ and an Internet connection.

Once you've logged in to the Aiven client, we can create a PostgreSQL database with the following avn command in our terminal:

avn service create pg-football    \
    -t pg                         \
    --cloud google-europe-west3   \
    -p business-4
Enter fullscreen mode Exit fullscreen mode

This command creates a PostgreSQL database (flag -t pg) named pg-football on region google-europe-west3. The selected plan driving the amount of resources available and associated billing is business-4.

The create command returns immediately, Aiven received the request and started creating the instance. We can wait for the database to be ready with the following command:

avn service wait pg-football
Enter fullscreen mode Exit fullscreen mode

The wait command can be executed against any Aiven instance, and returns only when the service is in RUNNING mode.

Time to Scout Football Players

Now let's create our playground: we are a football scouting agency, checking players all over the world and our app pushes the relevant data to a PostgreSQL table. Let's login to PostgreSQL from the terminal:

avn service cli pg-football
Enter fullscreen mode Exit fullscreen mode

Or agency doesn't do a great job at scouting, all we are able to capture is the player's name, nationality and a flag is_retired showing the activity status.
We create a simple football_players table containing the above information together with two control columns:

  • created_at keeping the record's creation time
  • modified_at for the row's last modification time

These two columns will later be used from the Kafka Connect connector to select the recently changed rows.
Now it's time to create the table from the PostgreSQL client:

CREATE TABLE football_players (
  name VARCHAR ( 50 ) PRIMARY KEY,
  nationality VARCHAR ( 255 ) NOT NULL,
  is_retired BOOLEAN DEFAULT false,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  modified_at TIMESTAMP
  )
;
Enter fullscreen mode Exit fullscreen mode

The created_at field will work as expected immediately, with the DEFAULT NOW() definition.
The modified_at on the other side, requires a bit more tuning to be usable. We'll need to create a trigger that inserts the current timestamp in case of updates. The following SQL can be executed from the PostgreSQL client:

CREATE OR REPLACE FUNCTION change_modified_at()
  RETURNS TRIGGER
  LANGUAGE PLPGSQL
  AS
$$
BEGIN
    NEW.modified_at := NOW();
    RETURN NEW;
END;
$$
;
CREATE TRIGGER modified_at_updates
  BEFORE UPDATE
  ON football_players
  FOR EACH ROW
  EXECUTE PROCEDURE change_modified_at();
Enter fullscreen mode Exit fullscreen mode

The first statement creates the change_modified_at function that will later be used by the modified_at_updates trigger.

Football Scouting App at Work

We can now simulate our football scouting app behaviour by manually inserting three rows in the football_players table from the PostgreSQL client with

insert into football_players (name, nationality, is_retired) values ('Andrea Pirlo','Italian', true);
insert into football_players (name, nationality, is_retired) values ('Cristiano Ronaldo','Portuguese', false);
insert into football_players (name, nationality, is_retired) values ('Megan Rapinoe','American', true);
Enter fullscreen mode Exit fullscreen mode

We can verify that the created_at column is successfully populated in PostgreSQL with

select * from football_players;
Enter fullscreen mode Exit fullscreen mode

Which will output

       name        | nationality | is_retired |         created_at         | modified_at
-------------------+-------------+------------+----------------------------+-------------
 Andrea Pirlo      | Italian     | t          | 2021-03-11 10:35:52.04076  |
 Cristiano Ronaldo | Portuguese  | f          | 2021-03-11 10:35:52.060104 |
 Megan Rapinoe     | American    | t          | 2021-03-11 10:35:52.673554 |
(3 rows)
Enter fullscreen mode Exit fullscreen mode

Perfect, the app is working when inserting new rows. If only we could have an update to an existing row...

Breaking News Pirlo

Well, this was somehow expected, Juventus FC went out of Champions League and needed new energy in the midfield. We can update the relevant row with

update football_players set is_retired=false where name='Andrea Pirlo';
Enter fullscreen mode Exit fullscreen mode

We can check that the modified_at is correctly working by issuing the same select * from football_players; statement in the PostgreSQL client and checking the following output

      name        | nationality | is_retired |         created_at         |        modified_at         
------------------+-------------+------------+----------------------------+----------------------------
Cristiano Ronaldo | Portuguese  | f          | 2021-03-11 10:35:52.060104 |
Megan Rapinoe     | American    | t          | 2021-03-11 10:35:52.673554 |
Andrea Pirlo      | Italian     | f          | 2021-03-11 10:35:52.04076  | 2021-03-11 10:39:49.198286
(3 rows)
Enter fullscreen mode Exit fullscreen mode

Ok, we recreated the original setup: our football scouting app is correctly storing data in the football_players table. In the old days the extraction of that data was demanded to an ETL flow running overnight and pushing it to the downstream applications. Now, as per our original aim, we want to include Apache Kafka in the game, so... let's do it!

Creating a Kafka environment

As stated initially, our goal is to base our data pipeline on Apache Kafka without having to change the existing setup. We don't have a Kafka environment available right now, but we can easily create one using Aiven's CLI from the terminal with the following avn command

avn service create kafka-football         \
  -t kafka                                \
  --cloud google-europe-west3             \
  -p business-4                           \
  -c kafka.auto_create_topics_enable=true \
  -c kafka_connect=true
Enter fullscreen mode Exit fullscreen mode

The command creates an Apache Kafka instance (-t kafka) in google-europe-west3 with the business-4 plan.
Additionally it enables the topic auto-creation (-c kafka.auto_create_topics_enable=true) so our applications can create topics on the fly without forcing us to create them beforehand.
Finally, it enables Kafka Connect (-c kafka_connect=true) on the same Kafka instance. We can use the avn wait command mentioned above to pause until the Kafka cluster is in RUNNING state.

Note that on Kafka instances part of the startup plans, you'll be forced to create a standalone Kafka Connect instance. For production systems, we recommend using standalone Kafka Connect for the separation of concerns principle.

Connecting the dots

The basic building blocks are ready: our source system represented by the pg-football PostgreSQL database with the football_players table and the kafka-football Apache Kafka instances are running. It's now time to connect the two: creating a new event in Kafka every time an insert or modified row appears in PostgreSQL. That can be achieved by creating a Kafka Connect JDBC source connector.

Create a JSON configuration file

Start by creating a JSON configuration file like the following:

{
    "name": "pg-timestamp-source",
    "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?sslmode=require",
    "connection.user": "<PG_USER>",
    "connection.password": "<PG_PASSWORD>",
    "table.whitelist": "football_players",
    "mode": "timestamp",
    "timestamp.column.name":"modified_at,created_at",
    "poll.interval.ms": "2000",
    "topic.prefix": "pg_source_"
}
Enter fullscreen mode Exit fullscreen mode

Where the important parameters are:

  • name: the name of the Kafka Connect connector, in our case pg-timestamp-source
  • connection.url: the connection URL pointing to the PostgreSQL database, in the form of jdbc:postgresql://<HOSTNAME>:<PORT>/<DATABASE>?<ADDITIONAL_PARAMETERS>, we can create it with the dbname, host, port output of the following avn command
avn service get pg-football --format '{service_uri_params}'
Enter fullscreen mode Exit fullscreen mode
  • connection.user and connection.user: PostgreSQL credentials, the default avnadmin credentials are available as user and password output of the avn command above
  • table.whitelist: list of tables to source from PostgreSQL, in our case is football_players
  • mode: Kafka Connect JDBC mode. Three modes are available: bulk, incrementing, timestamp. For this post we'll use the timestamp one. For a more detailed description of modes, please refer to the help article
  • timestamp.column.name: list of timestamp column names: The value for this setting should be modified_at,created_at since modified_at will contain the most recent update timestamp, and in case of null value, we can rely on the created_at column.
  • poll.interval.ms: time between database polls
  • topic.prefix: prefix for topic, the full topic name will be a concatenation of topic.prefix and the PostgreSQL table name.

Start the JDBC connector

After storing the above JSON in a file named kafka_jdbc_config.json, we can now start the Kafka Connect JDBC connector in our terminal with the following command:

avn service connector create kafka-football @kafka_jdbc_config.json
Enter fullscreen mode Exit fullscreen mode

We can verify the status of the Kafka Connect connector with the following avn command:

avn service connector status kafka-football pg-timestamp-source
Enter fullscreen mode Exit fullscreen mode

Note that the last parameter pg-timestamp-source in the avn command above refers to the Kafka Connect connector name defined in the name setting of the kafka_jdbc_config.json configuration file. If all settings are correct, the above command will show our healthy connector being in RUNNING mode

{
    "status": {
        "state": "RUNNING",
        "tasks": [
            {
                "id": 0,
                "state": "RUNNING",
                "trace": ""
            }
        ]
    }
}
Enter fullscreen mode Exit fullscreen mode

Check the data in Kafka with Kafkacat

The data should now have landed in Apache Kafka. How can we check it?
We can use Kafkacat a nice command line utility.

Once Kafkacat is installed (see our help article for detailed instructions), we'll need to set up the connection to our Kafka environment.

Aiven by default enables SSL certificate based authentication. The certificates are available from the Aiven console for manual download. In Aiven CLI you can avoid the clicking with the following avn command in our terminal:

mkdir -p kafkacerts
avn service user-creds-download kafka-football \
    -d kafkacerts \
    --username avnadmin
Enter fullscreen mode Exit fullscreen mode

These commands create a kafkacerts folder (if not existing already) and download in it the ca.pem, service.cert and service.key SSL certificates required to connect.

The last missing piece of information that Kafkacat needs is where to find our Kafka instance in terms of hostname and port. This information can be displayed in our terminal with the following avn command

avn service get kafka-football --format '{service_uri}'
Enter fullscreen mode Exit fullscreen mode

Once we collected the required info we can create a kafkacat.config file with the following entries

bootstrap.servers=<KAFKA_SERVICE_URI>
security.protocol=ssl
ssl.key.location=kafkacerts/service.key
ssl.certificate.location=kafkacerts/service.cert
ssl.ca.location=kafkacerts/ca.pem
Enter fullscreen mode Exit fullscreen mode

Remember to substitute the <KAFKA_SERVICE_URI> with the output of the avn service get command mentioned above.

Now we are ready to read the topic from Kafka by pasting the following command in our terminal:

kafkacat -F kafkacat.config -C -t pg_source_football_players
Enter fullscreen mode Exit fullscreen mode

Note that we are using Kafkacat in consumer mode (flag -C) reading from the topic pg_source_football_players which is the concatenation of the topic.prefix setting in Kafka Connect and the name of our football_players PostgreSQL table.

As expected, since the connector is working, Kafkacat will output the three messages present in the Kafka topic matching the three rows in the football_players PostgreSQL table

{"name":"Cristiano Ronaldo","nationality":"Portuguese","is_retired":false,"created_at":1615458952060,"modified_at":null}
{"name":"Megan Rapinoe","nationality":"American","is_retired":true,"created_at":1615458952673,"modified_at":null}
{"name":"Andrea Pirlo","nationality":"Italian","is_retired":false,"created_at":1615458952040,"modified_at":1615459189198}
% Reached end of topic pg_source_football_players [0] at offset 3
Enter fullscreen mode Exit fullscreen mode

Updating the listings

Now, let's see if our football scouts around the world can fetch some news for us

Breaking News Two Updates

Wow, we found a new talent named Enzo Gorlami and Cristiano Rolando officially retired today from professional football (please be aware this post is not reflecting football reality). Let's push the two news to PostgreSQL:

insert into football_players (name, nationality, is_retired) values ('Enzo Gorlami','Italian', false);
update football_players set is_retired=true where name='Cristiano Ronaldo';
Enter fullscreen mode Exit fullscreen mode

We can verify that the data is correctly stored in the database:

defaultdb=> select * from football_players;
       name        | nationality | is_retired |         created_at         |        modified_at         
-------------------+-------------+------------+----------------------------+----------------------------
 Megan Rapinoe     | American    | t          | 2021-03-11 10:35:52.673554 |
 Andrea Pirlo      | Italian     | f          | 2021-03-11 10:35:52.04076  | 2021-03-11 10:39:49.198286
 Enzo Gorlami      | Italian     | f          | 2021-03-11 11:09:49.411885 |
 Cristiano Ronaldo | Portuguese  | t          | 2021-03-11 10:35:52.060104 | 2021-03-11 11:11:36.790781
(4 rows)
Enter fullscreen mode Exit fullscreen mode

And in Kafkacat we receive the following two updates:

{"name":"Enzo Gorlami","nationality":"Italian","is_retired":false,"created_at":1615460989411,"modified_at":null}
% Reached end of topic pg_source_football_players [0] at offset 4
{"name":"Cristiano Ronaldo","nationality":"Portuguese","is_retired":true,"created_at":1615458952060,"modified_at":1615461096790}
% Reached end of topic pg_source_football_players [0] at offset 5
Enter fullscreen mode Exit fullscreen mode

Further reading

If you want to know more about Aiven, Kafka, Kafka Connect or PostgreSQL, check the references below:

Wrapping up

This blog post showed how to easily integrate PostgreSQL and Kafka with a fully managed, config-file-driven Kafka Connect JDBC connector. We used a timestamp-based approach to retrieve the changed rows since the previous poll and push them to a Kafka topic increasing the query load to the source database.

An alternative method is represented by Change Data Capture solutions like Debezium, which, in case of PostgreSQL, reads changes directly from WAL files avoiding any additional query load on the source database. A guide on how to setup CDC for Aiven PostgreSQL is provided in this help article.

Top comments (0)