Introduction
I will describe how to use plain Kafka Consumer in Play Framework (Kafka Producer is out of the scope of this post). Please, be aware you can easily find libraries offering API which can better suit to your needs. For example Alpakka provides reactive and stream-aware API. But before you start with more advanced solutions it's always worth to get to know how plain component works.
Start by adding apache kafka dependency to your build.sbt
file:
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "3.0.0"
Eager Singleton
Firstly, I will create SampleKafkaCustomer
class according to kafka documentation and I will annotate it with javax.inject.Singletion
annotation. SampleKafkaCustomer
is as simple as possible: it polls messages every 3 seconds from kafka server exposed at localhost:6003
(if you use default configuration it probably is localhost:9092
). Kafka Message consists of two parts: key and value. In our example I assume both are simple String
(but they can have more complex structure, for example described in json format). SampleKafkaConsumer
subscribes sample-topic
topic and has assigned sample-group-id
consumer group id.
@Singleton
class SampleKafkaConsumer extends Logging {
logger.info("Starting SampleKafkaConsumer")
private val properties = new Properties()
properties.put("bootstrap.servers", "localhost:6003")
properties.put("group.id", s"sample-group-id")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](properties)
kafkaConsumer.subscribe(Set("sample-topic").asJava)
Try {
while (true) {
kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
.foreach(r => {
logger.info(s"SampleKafkaConsumer receive record $r")
})
}
} match {
case Success(_) => logger.info(s"SampleKafkaConsumer succeed.")
case Failure(e) => logger.error(s"SampleKafkaConsumer fail.", e)
}
}
I want to start up Kafka Consumer when application starts. This type of requirements can be easily satisfied using eager singleton
. To do this I will define play module which will mark SampleKafkaConsumer
singleton as eager:
class KafkaModule extends AbstractModule with Logging {
override def configure(): Unit = {
logger.info("Starting KafkaModule")
bind(classOf[SampleKafkaConsumer]).asEagerSingleton()
}
}
To start KafkaModule
it must be enabled in play configuration (by default it is application.conf
file):
play.modules.enabled += "kafka.KafkaModule
If you now start application you will notice that it is not starting :) In logs everything is looking ok:
--- (Running the application, auto-reloading is enabled) ---
[info] p.c.s.AkkaHttpServer - Listening for HTTP on /0:0:0:0:0:0:0:0:9000
Server started, use Alt+D to stop
But after opening desired localization in browser (in my case http://localhost:9000
) , the page is still loading all the time. It is because SampleConsumerKafka
contains while(true)
loop in which consumer polls Kafka messages every 3 seconds. As a result SampleConsumerKafka
constructor never finish and application can't start. To resolve this problem I will run while(true)
loop in asynchronous manner.
Polling in dedicated thread-pool
To allow application start I will add dedicated thread for KafkaConsumer
logic (while(true)
loop). KafkaConsumer
loop uses single thread anyway, so it is a good idea to create dedicated ExecutionContext
for this purpose. In the following listing I don't mark ExecutionContext
implicit, that you can follow how it is used. I also replaced Try
code block with Future
code block.
@Singleton
class SampleKafkaConsumer extends Logging {
logger.info("Starting SampleKafkaConsumer")
val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
private val properties = new Properties()
properties.put("bootstrap.servers", "localhost:6003")
properties.put("group.id", s"sample-group-id")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](properties)
kafkaConsumer.subscribe(Set("sample-topic").asJava)
Future {
while (true) {
kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
.foreach(r => {
logger.info(s"SampleKafkaConsumer receive record $r")
})
}
}(executionContext).andThen {
case Success(_) => logger.info(s"SampleKafkaConsumer succeed.")
case Failure(e) => logger.error(s"SampleKafkaConsumer fail.", e)
}(executionContext)
}
Now it works great! ...almost. Application starts, but I need to do one thing more. KafkaConsumer
is never closed, even if application stops!
Stopping KafkaConsumer
KafkaConsumer
is not thread-safe, so I need to stop it in the same thread as while(true)
loop runs. To do it I must define loop quit condition. I will follow kafka documentation and I will add AtomicBoolean
field to keep information if consumer should be stopped (default field value is set to false
). If while(!stopConsumer.get())
loop will quit, KafkaConsumer
will stop.
@Singleton
class SampleKafkaConsumer extends Logging {
logger.info("SampleKafkaConsumer starts")
private val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
private val stopConsumer: AtomicBoolean = new AtomicBoolean(false)
private val properties = new Properties()
properties.put("bootstrap.servers", "localhost:6003")
properties.put("group.id", s"sample-group-id")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](properties)
kafkaConsumer.subscribe(Set("sample-topic").asJava)
Future {
while (!stopConsumer.get()) {
kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
.foreach(r => {
logger.info(s"SampleKafkaConsumer receives record: $r")
})
}
logger.info(s"SampleKafkaConsumer quits 'while(true)' loop.")
}(executionContext)
.andThen(_ => kafkaConsumer.close())(executionContext)
.andThen {
case Success(_) =>
kafkaConsumer.close()
case Failure(e) =>
kafkaConsumer.close()
}(executionContext)
}
All I need to do now is change stopConsumer
flag value from false to true during application stop. Play Framework provides CoordinatedShutdown
component to such requirements, which originally is Akka solution (Play Framework is build on Akka). CoordinatedShutdown
allows you to define tasks to be executed during the shutdown process. It is more flexible component than earlier used ApplicationLifecycle
component. ApplicationLifecycle
is not deprecated (you can use it instead), it just offers you possibility to add task in only one phase of shutdown process - phase named service-stop
. I need to add stop task to exactly this phase, but I chose to use CoordinatedShutdown
. The task will change stopConsumer
flag value to true.
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "SampleKafkaConsumer-stop"){() =>
logger.info("Shutdown-task[SampleKafkaConsumer-stop] starts.")
stopConsumer.set(true)
Future{ Done }(executionContext).andThen{
case Success(_) => logger.info("Shutdown-task[SampleKafkaConsumer-stop] succeed.")
case Failure(e) => logger.error("Shutdown-task[SampleKafkaConsumer-stop] fails.", e)
}(executionContext)
}
Pay attention added task returns Future{ Done }
using defined earlier executionContext. It gives confidence, that Future
will complete after while
loop quits and KafkaConsumer
stops (executionContext
contains single thread, so it must quit loop first to do something else). Please, be also aware that every phase of shutdown process has timeout after which next phase begin, even if current phase didn’t end. Default service-stop
phase timeout is 5 seconds. Final version of SampleKafkaConsumer
@Singleton
class SampleKafkaConsumer @Inject()(coordinatedShutdown: CoordinatedShutdown) extends Logging {
logger.info("SampleKafkaConsumer starts")
private val executionContext: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor())
private val stopConsumer: AtomicBoolean = new AtomicBoolean(false)
private val properties = new Properties()
properties.put("bootstrap.servers", "localhost:6003")
properties.put("group.id", s"sample-group-id")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
val kafkaConsumer = new KafkaConsumer[String, String](properties)
kafkaConsumer.subscribe(Set("sample-topic").asJava)
Future {
while (!stopConsumer.get()) {
kafkaConsumer.poll(Duration.ofSeconds(3)).asScala
.foreach(r => {
logger.info(s"SampleKafkaConsumer receives record: $r")
})
}
logger.info(s"SampleKafkaConsumer quits 'while(true)' loop.")
}(executionContext)
.andThen(_ => kafkaConsumer.close())(executionContext)
.andThen {
case Success(_) =>
logger.info(s"SampleKafkaConsumer succeed.")
case Failure(e) =>
logger.error(s"SampleKafkaConsumer fails.", e)
}(executionContext)
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceStop, "SampleKafkaConsumer-stop"){() =>
logger.info("Shutdown-task[SampleKafkaConsumer-stop] starts.")
stopConsumer.set(true)
Future{ Done }(executionContext).andThen{
case Success(_) => logger.info("Shutdown-task[SampleKafkaConsumer-stop] succeed.")
case Failure(e) => logger.error("Shutdown-task[SampleKafkaConsumer-stop] fails.", e)
}(executionContext)
}
}
Some logs
When application starts you will find log:
[info] k.SampleKafkaConsumer play-dev-mode-akka.actor.default-dispatcher-7 - SampleKafkaConsumer starts
Now, I will start command line KafkaProducer
and I will send some messages to sample-topic
topic:
$ bin/kafka-console-producer.sh --topic sample-topic --bootstrap-server localhost:6003
>Hello Readers!
>Thank you!
SampleKafkaConumer
logs all messages:
[info] k.SampleKafkaConsumer pool-10-thread-1 - SampleKafkaConsumer receives record: ConsumerRecord(topic = sample-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1640519882833, serialized key size = -1, serialized value size = 15, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Hello Readers!)
[info] k.SampleKafkaConsumer pool-10-thread-1 - SampleKafkaConsumer receives record: ConsumerRecord(topic = sample-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1640519898470, serialized key size = -1, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Thank you!)
Now I will stop application (PID=26054) with SIGTERM
:
kill 26054
SampleKafkaConsumer stops gracefully:
[info] p.c.s.AkkaHttpServer play-dev-mode-shutdown-hook-1 - Stopping Akka HTTP server...
[info] p.c.s.AkkaHttpServer play-dev-mode-akka.actor.internal-dispatcher-2 - Terminating server binding for /0:0:0:0:0:0:0:0:9000
[info] s.ApplicationTimer application-akka.actor.internal-dispatcher-5 - ApplicationTimer demo: Stopping application at 2021-12-26T11:58:35.988474Z after 129s.
[info] k.SampleKafkaConsumer application-akka.actor.internal-dispatcher-5 - Shutdown-task[SampleKafkaConsumer-stop] starts.
[info] k.SampleKafkaConsumer pool-10-thread-1 - SampleKafkaConsumer quits 'while(true)' loop.
[info] o.a.k.c.c.i.ConsumerCoordinator pool-10-thread-1 - [Consumer clientId=consumer-sample-group-id-1, groupId=sample-group-id] Revoke previously assigned partitions sample-topic-0
[info] o.a.k.c.c.i.ConsumerCoordinator pool-10-thread-1 - [Consumer clientId=consumer-sample-group-id-1, groupId=sample-group-id] Member consumer-sample-group-id-1-52faa9ac-a6c3-42e5-99aa-33c992070bea sending LeaveGroup request to coordinator localhost:6003 (id: 2147483646 rack: null) due to the consumer is being closed
[info] o.a.k.c.c.i.ConsumerCoordinator pool-10-thread-1 - [Consumer clientId=consumer-sample-group-id-1, groupId=sample-group-id] Resetting generation due to: consumer pro-actively leaving the group
[info] o.a.k.c.c.i.ConsumerCoordinator pool-10-thread-1 - [Consumer clientId=consumer-sample-group-id-1, groupId=sample-group-id] Request joining group due to: consumer pro-actively leaving the group
[info] o.a.k.c.m.Metrics pool-10-thread-1 - Metrics scheduler closed
[info] o.a.k.c.m.Metrics pool-10-thread-1 - Closing reporter org.apache.kafka.common.metrics.JmxReporter
[info] o.a.k.c.m.Metrics pool-10-thread-1 - Metrics reporters closed
[info] o.a.k.c.u.AppInfoParser pool-10-thread-1 - App info kafka.consumer for consumer-sample-group-id-1 unregistered
[info] k.SampleKafkaConsumer pool-10-thread-1 - SampleKafkaConsumer succeed.
[info] k.SampleKafkaConsumer pool-10-thread-1 - Shutdown-task[SampleKafkaConsumer-stop] succeed.
[info] p.c.s.AkkaHttpServer play-dev-mode-akka.actor.internal-dispatcher-15 - Running provided shutdown stop hooks
Summary
In this post I described how to use plain KafkaConsumer
in Play Framework. It is always worth to know how lower-level components works, but after all I strongly encourage you to consider use libraries like Alpakka
. For example in my last project I decided to use akka-projection which is build on Alpakka. It composes with Akka nicely and give you some extra value like easy configuration of restart strategy in case of failure.
All code examples from this post you will find at my github. If you want to follow code changes section after section, check git log
. Every commit corresponds to single post section.
Originally published at https://stepniewski.tech
Top comments (0)