DEV Community

Anthony Ikeda
Anthony Ikeda

Posted on

Functional Kafka with Spring Cloud - Part 1

I've put this article together to create a working demo of Spring Cloud Kafka which I've been unable to find so far.

Prerequisites:

  • Java 11+ (I'm using Java 18)
  • Linux based OS (I'm on macOS)
  • Httpie

Setup:

  • Spring Cloud 2021.0.1
  • Confluent Schema Registry 7.1.0
  • Apache Kafka 2.13_3.1.0
  • Apache ZooKeeper 3.7.0

This article will first start with setting up a web API publishing events to Kafka as a string with a functional kafka consumer using Spring Cloud Stream.

Set up the environment

Download Apache ZooKeeper from here:

https://zookeeper.apache.org/releases.html#download
Enter fullscreen mode Exit fullscreen mode

Decompress it and move it to a working folder (we will use $ZOOKEEPER_HOME)

Download Kafka from here:

https://kafka.apache.org/downloads
Enter fullscreen mode Exit fullscreen mode

Again, decompress the archive and move it to a working folder, this time we will refer to the working folder as $KAFKA_HOME

Download and decompress the Schema Registry using the following:

$ wget https://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
$ tar -xf confluent-community-7.0.1.tar.gz
$ cd confluent-7.0.1
Enter fullscreen mode Exit fullscreen mode

We will refer to this applications location at $CONFLUENT_HOME

Configure the environment

First let's set up ZooKeeper!

ZooKeeper Config

Create the config file by copying the example config:

ZOOKEEPER_HOME $> cp conf/zoo_sample.cfg conf/zoo.cfg
Enter fullscreen mode Exit fullscreen mode

The only value you may want to edit is the dataDir directory to a location that you are comfortable storing the metadata to:

...
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=./data
Enter fullscreen mode Exit fullscreen mode

Save the file and we will move on to configuring Kafka.

Kafka Config

The file we are interested here is:

$KAFKA_HOME/config/server.properties
Enter fullscreen mode Exit fullscreen mode

There are 2 values we most likely will change:

  • log.dirs=./logs
  • zookeeper.connect=localhost:2181

This ensures our logs are in an accessible location as well as allows Kafka to connect to ZooKeeper to persist data.

Configure Schema Registry

Just one file needs to be checked here: $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties

All the default should be fine:

listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.topic=_schemas
debug=false
Enter fullscreen mode Exit fullscreen mode

Starting ZooKeeper and Kafka

We won't be using the Schema Registry to begin with since we are just working with String values in Kafka so let's get them started:

$ZOOKEEPER_HOME > bin/zkServer.sh start conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED

$KAFKA_HOME > bin/kafka-server-start.sh config/server.properties
...
[2022-04-25 11:19:17,742] INFO Kafka version: 3.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-25 11:19:17,743] INFO Kafka commitId: 37edeed0777bacb3 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-25 11:19:17,743] INFO Kafka startTimeMs: 1650910757737 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-25 11:19:17,745] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Enter fullscreen mode Exit fullscreen mode

Okay, let's get programming!!!

Employee API

This will be a simple API that will havea single POST endpoint to create a kafka message.

Just head to https://start.spring.io and create a basic web application:

Spring Initializer

Click download and expand the demo.zip file to a workspace

To get started we are going to need some dependencies to publish to Kafka.

To your pom.xml file add the following dependency:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.4</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Next we will create the controller to serve our endpoint.

package com.example.demo;

import com.example.demo.model.Employee;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import java.net.URI;
import java.util.UUID;

@RequestMapping("/employee")
@RestController
public class EmployeeController {

    @Autowired
    protected KafkaTemplate<String, String>  kafka;

    public ResponseEntity<Void> createEmployee(@RequestParam("firstname") String firstname,
                                               @RequestParam("lastname") String lastname) {

        String id = UUID.randomUUID().toString();
        kafka.send("employee", id, String.format("%s, %s", lastname, firstname));

        return ResponseEntity.created(URI.create(String.format("/employee/%s", id))).build();
    }

}
Enter fullscreen mode Exit fullscreen mode

Let's quickly walk through the code...

First, we inject the KafkaTemplate<String, String> template since we aren't doing anything fancy, this is going to just send the message key as a java.lang.String and the body of the message will also be of type java.lang.String, nothing fancy.

Our @PostMapping is going to simply be the endpoint with 2 query parameters:

  • firstname
  • lastname

So the URL will have the format:
http://localhost:8050/employee?firstname=Paula&lastname=Abdul

And when we call the API we will use Httpie with the following syntax:

http POST http://localhost:8070/employee firstname==Paula lastname==Abdul
Enter fullscreen mode Exit fullscreen mode

Next we are just generating a random UUID as the message id and send this along with a concatenated string of the lastname and firstname to the employee topic:

String id = UUID.randomUUID().toString();
kafka.send("employee", id, String.format("%s, %s", lastname, firstname));
Enter fullscreen mode Exit fullscreen mode

Before we start the application, lets tweak the configuration. In the src/main/resources/application.yml file, let's make it the same as:

spring:
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    bootstrap-servers:
      - localhost:9092

server:
  port: 8070
Enter fullscreen mode Exit fullscreen mode

Now we can run the application and hit the endpoint:

./mvnw spring-boot:run
...
: Tomcat initialized with port(s): 8070 (http)
: Starting service [Tomcat]
: Starting Servlet engine: [Apache Tomcat/9.0.54]
: Initializing Spring embedded WebApplicationContext
: Root WebApplicationContext: initialization completed in 754 ms
: Tomcat started on port(s): 8070 (http) with context path ''
: Started DemoApplication in 1.522 seconds (JVM running for 1.841)
Enter fullscreen mode Exit fullscreen mode

Call the endpoint:

$ http POST http://localhost:8070/employee firstname==Andrew lastname==Marshall
HTTP/1.1 201 
Connection: keep-alive
Content-Length: 0
Date: Wed, 27 Apr 2022 17:30:39 GMT
Keep-Alive: timeout=60
Location: /employee/830da346-38b9-4d5b-a051-a302c395333e
Enter fullscreen mode Exit fullscreen mode

You can track the message in Kafka using the kafka-console-consumer.sh command:

$ $KAFKA_HOME/bin/kafka-console-consumer  --topic employee --bootstrap-server localhost:9092 --from-beginning
Marshall, Andrew
Enter fullscreen mode Exit fullscreen mode

Next article we will setup a functional consumer using Spring Cloud Stream...

Discussion (0)