Kafka streams is an amazing abstraction for working with Kafka and has amazing JVM support. However, if stream application does I/O operations, it may not be right fit. Alpakka Kafka uses Flow control optimizations, well suited for creating streams where I/O operations are taking place. For a detailed comparison on Kafka Streams and Alpakka Kafka, refer this Medium story.
Alpakka Kafka uses Akka under the hood and offers a rich Scala and Java API. However, Clojure story is pretty lackluster. Using Alpakka Kafka essentially means, doing a whole lot of interop with Alpakka Kafka’s Java API.
I have created a Clojure wrapper that abstracts the interop with Alpakka Kafka Java API and allows library consumer to use Alpakka Kafka without much hassle.
I will demo an example of setting up a Alpakka Kafka stream using aforementioned Clojure wrapper library:
-
Import required dependencies in the (Clojure deps) project.
net.clojars.fr33m0nk/clj-alpakka-kafka {:mvn/version "0.1.6"} org.apache.kafka/kafka-clients {:mvn/version "3.3.2"}
-
Import required namespaces in the demo namespace
alpakka-kafka-demo
(ns alpakka-kafka-demo (:require [fr33m0nk.akka.actor :as actor] [fr33m0nk.akka.stream :as s] [fr33m0nk.alpakka-kafka.committer :as committer] [fr33m0nk.alpakka-kafka.consumer :as consumer] [fr33m0nk.alpakka-kafka.producer :as producer] [fr33m0nk.utils :as utils]) (:import [org.apache.kafka.common.serialization StringDeserializer StringSerializer]))
-
We will create a new stream topology.
-
This stream topology will consume message from a Kafka Topic, transform it and then publish to another Kafka topic.
(defn test-stream-with-producer [actor-system consumer-settings committer-settings producer-settings consumer-topics producer-topic] (-> (consumer/->committable-source consumer-settings consumer-topics) (s/map-async 2 (fn [message] (let [_key (consumer/key message) value (consumer/value message) committable-offset (consumer/committable-offset message) message-to-publish (producer/->producer-record producer-topic (str/upper-case value))] (producer/single-producer-message-envelope committable-offset message-to-publish)))) (s/to-mat (producer/committable-sink producer-settings committer-settings) consumer/create-draining-control) (s/run actor-system)))
s/map-async
executes mapping function with 2 messages being processed in parallelThen we will publish messages to another topic and commit offsets to Kafka via
s/to-mat
andproducer/committable-sink
Finally, we run the stream with our actor-system using
s/run
-
-
Let’s create required dependencies.
(def actor-system (actor/->actor-system "test-actor-system")) (def committer-settings (committer/committer-settings actor-system {:batch-size 1})) (def consumer-settings (consumer/consumer-settings actor-system {:group-id "a-test-consumer" :bootstrap-servers "localhost:9092" :key-deserializer (StringDeserializer.) :value-deserializer (StringDeserializer.)})) (def producer-settings (producer/producer-settings actor-system {:bootstrap-servers "localhost:9092" :key-serializer (StringSerializer.) :value-serializer (StringSerializer.)}))
-
Let’s run the stream and see it in action
(def consumer-control (test-stream-with-producer actor-system consumer-settings committer-settings producer-settings ["testing_stuff"] "output-topic"))
-
We can shut this Alpakka Kafka stream down via following.
;; shutdown streams using consumer-control var @(consumer/drain-and-shutdown consumer-control (CompletableFuture/supplyAsync (utils/->fn0 (fn [] ::done))) (actor/get-dispatcher actor-system))
utils/->fn0
reifiesjava.util.function.Supplier
interface to a Clojure function with 0 arity. -
We can shutdown Akka actor-system as well.
@(actor/terminate actor-system)
I hope this story introduced Alpakka Kafka as a viable alternate to Kafka streams using Clojure. I have documented more examples on using Alpakka Kafka with my Clojure wrapper library here.
Top comments (0)