DEV Community

Cover image for From Setup to Implementation: Integrating Kafka with a .NET Web API (Part 1)
Showmen Dasgupta
Showmen Dasgupta

Posted on

From Setup to Implementation: Integrating Kafka with a .NET Web API (Part 1)

Introduction:

In today’s fast-paced digital landscape, real-time data processing has become essential for building responsive and scalable applications. Whether it’s tracking user interactions, processing transactions, or managing logs, the ability to handle large volumes of data in real-time is crucial for modern software systems.

What is Kafka?

Apache Kafka is an open-source distributed event streaming platform designed for high-throughput, low-latency data streaming. Originally developed by LinkedIn and later open-sourced under the Apache License, Kafka has quickly become the de facto standard for building real-time data pipelines and streaming applications.

Key Features of Kafka:

Scalability:
Kafka can handle high volumes of data with ease, allowing you to scale horizontally across multiple servers.

Durability:
Kafka ensures data is stored reliably by replicating it across different nodes in the cluster.

Fault-Tolerance:
Kafka’s distributed architecture is designed to continue operating seamlessly, even in the event of server failures.

Real-Time Processing:
Kafka can process streams of data in real-time, making it ideal for applications that require immediate insights.

Where is Kafka Used?

Kafka is used across various industries and by many leading tech companies to build robust, real-time data pipelines. Here are some common use cases:

Log Aggregation:
Collecting and storing logs from different sources in a central location for monitoring and analysis.

Real-Time Analytics:
Processing streams of data in real-time to generate analytics and insights for immediate decision-making.

Event Sourcing:
Capturing every change to an application state as an event stream, which can be replayed to reconstruct past states.

Stream Processing:
Continuous processing and transformation of data streams, often using Kafka Streams or other stream processing frameworks like Apache Flink.

Kafka broker, producer and consumer overview

Figure 1: Kafka Architecture Overview. Image source: Original Source Name

In the diagram above:

  • Producers are client applications that publish messages to Kafka topics.

  • Topics are categories to which messages are sent and stored. Kafka divides these topics into Partitions for scalability and parallel processing.

  • Brokers are Kafka servers that store the data and serve it to consumers. Kafka typically runs as a cluster of multiple brokers.

  • Consumers are client applications that read messages from Kafka topics, often processing the data or forwarding it to other systems.

Sources:

Official Documentation:

  1. Kafka: Apache Kafka Documentation
  2. ZooKeeper Documentation: Apache ZooKeeper’s official documentation explains its purpose and its application in distributed systems like Kafka.ZooKeeper Overview

Books:

  1. "Kafka: The Definitive Guide" by Neha Narkhede, Gwen Shapira, and Todd Palino – This book provides in-depth knowledge about Kafka.
  2. O'Reilly Media - Kafka: The Definitive Guide: This book is an excellent resource for understanding the architecture of Kafka, including the detailed explanation of ZooKeeper's function.

Articles and Tutorials:

  1. Confluent Blog: Offers numerous articles on Kafka and its integration with various technologies.
  2. Microsoft Learn: Microsoft Learn .NET is a great resource for tutorials and practical guides.
  3. Medium Articles and Technical Blogs: Various engineers and developers who work with Kafka often write about their experiences and the technical details of ZooKeeper and Kafka. Example: Kafka’s Evolution: ZooKeeper to KRaft

Series Overview:

This three-part series, will guide you through the process of integrating Kafka with a .NET Web API application. By the end of this series, you’ll have a solid understanding of how to set up Kafka, produce and consume messages, and implement a real-world use case involving a product cart system.

Part 1: Setting up Kafka and your .NET Application

In this first part, we will lay the groundwork by setting up Apache Kafka on your local machine and configuring a .NET Web API project. Whether you're new to Kafka or just need a refresher, this section will guide you through the installation process and help you get started with the necessary tools and dependencies.

Kafka setup diagram
Figure 2: Kafka Setup Overview. Image source: Kafka Cluster Architecture Overview

Understanding Kafka ZooKeeper:
Kafka ZooKeeper plays a critical role in the architecture of Apache Kafka, acting as a centralised service for maintaining configuration information, naming, providing distributed synchronisation, and offering group services. ZooKeeper’s primary function within Kafka is to manage and coordinate the Kafka brokers, handle leader election for Kafka partitions, and track the status of distributed resources, making it essential for ensuring high availability and fault tolerance within Kafka clusters.

Role in Kafka Ecosystem

  1. Broker Management:
    ZooKeeper keeps track of all Kafka brokers within a cluster, ensuring that each broker is aware of the others. It helps manage the dynamic membership of the cluster by tracking which brokers are active or have failed.

  2. Leader Election:
    In Kafka, each partition of a topic has a leader broker that handles all reads and writes. ZooKeeper is responsible for the leader election process, ensuring that if a broker fails, a new leader is promptly elected from the available replicas.

  3. Topic Configuration and Metadata:
    ZooKeeper stores the metadata about Kafka topics, partitions, and the associated replicas. This centralized management helps Kafka brokers retrieve and update topic configurations efficiently.

  4. Consumer Group Coordination:
    In earlier versions of Kafka, ZooKeeper was used to manage consumer group coordination, including keeping track of offsets. This responsibility has since been migrated to Kafka brokers themselves in more recent versions, but ZooKeeper’s role was foundational in earlier implementations.

Importance and Evolution
ZooKeeper's role is indispensable in Kafka’s operation, particularly for ensuring consistency and fault tolerance across the distributed system. However, as Kafka has evolved, there has been a movement towards reducing the dependency on ZooKeeper, leading to the introduction of Kafka's own built-in consensus protocol called KRaft (Kafka Raft). KRaft aims to simplify the architecture by handling broker metadata management directly within Kafka itself, eliminating the need for an external ZooKeeper cluster.

Setting up Kafka:
Let's begin by walking through the steps to install Kafka on your local development environment. You’ll learn about the core components of Kafka, including topics, partitions, brokers, and how to configure them.

To do that lets setup a docker file on our project folder. The docker compose will provide us all the necessary development environments needed to work with Kafka locally.

The project structure looks like this:

Project Structure

Lets check the docker compose file:

networks:
  kafka-net:
    driver: bridge

services:
  zookeeper-server:
    image: bitnami/zookeeper:latest
    networks:
      - kafka-net
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafdrop:
    image: obsidiandynamics/kafdrop:3.28.0
    networks:
      - kafka-net
    restart: "no"
    ports:
      - 9000:9000
    environment:
      KAFKA_BROKERCONNECT: PLAINTEXT://kafka-server:29092
      JVM_OPTS: -Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify
      SCHEMAREGISTRY_CONNECT: http://schema-registry:8081
    depends_on:
      - kafka-server
  kafka-server:
    image: bitnami/kafka:latest
    networks:
      - kafka-net
    ports:
      - 9092:9092
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka-server:29092,PLAINTEXT_HOST://127.0.0.1:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:29092,PLAINTEXT_HOST://:9092
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper-server
  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    networks:
      - kafka-net
    ports:
      - 8081:8081
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=PLAINTEXT://kafka-server:29092
      - SCHEMA_REGISTRY_HOST_NAME=localhost
      - SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081
    depends_on:
      - kafka-server
Enter fullscreen mode Exit fullscreen mode

To create my blog, I drew inspiration from another blog that provided a clear guide on setting up my local environment:
https://thecloudblog.net/post/event-driven-architecture-with-apache-kafka-for-net-developers-part-1-event-producer/

The docker compose file includes:

  1. Apache Zookeeper settings and image
  2. Kafka settings and image
  3. Kafdrop settings and image.(An Open-Source Kafka Web UI)
  4. Schema Registry settings and image.

Now we will run the docker compose file to download all the images and run the containers locally:

docker-compose up -d
Enter fullscreen mode Exit fullscreen mode

Once all the images have been downloaded and the containers are running, you can view them in Docker Desktop.

Docker images Kafka

or you can see their status on your console too using:

docker ps
Enter fullscreen mode Exit fullscreen mode

Docker ps

We can also take a look at the Kafdrop UI running on localhost at port 9000, as shown in the screenshots below:

Kafdrop

Now since our kafka environment is ready let's install necessary libraries for our project:

dotnet add package Confluent.Kafka
dotnet add package Confluent.SchemaRegistry.Serdes.Avro
dotnet restore
Enter fullscreen mode Exit fullscreen mode

Our application's architecture will be represented by the following diagram:
kafka cart

In the image, you'll notice a topic named cart-item, which receives items, and another topic, cart-item-processed, that displays the status of processed cart items. The cart-item-group-id represents our consumer group ID.

Now lets setup a KafkaConfiguration class:

public static class KafkaConfiguration
{
    public static void ConfigureServices(IServiceCollection services, IConfiguration configuration)
    {
        // Add controllers and other services to the container
        services.AddControllers(static options =>
        {
            var formatter = options.InputFormatters.OfType<SystemTextJsonInputFormatter>()
                .First(static formatter => formatter.SupportedMediaTypes.Contains("application/json"));

            formatter.SupportedMediaTypes.Add("application/csp-report");
            formatter.SupportedMediaTypes.Add("application/reports+json");
        });

        services.AddEndpointsApiExplorer();
        services.AddSwaggerGen();

        // Register Kafka services with configurable settings
        services.AddSingleton<KafkaProducerService>(sp =>
        {
            var kafkaConfig = sp.GetRequiredService<IOptions<KafkaConfigSecrets>>().Value;
            return new KafkaProducerService(kafkaConfig.KafkaBroker, kafkaConfig.KafkaTopic, kafkaConfig.SchemaRegistry);
        });

        services.AddSingleton<KafkaConsumerService>(sp =>
        {
            var kafkaConfig = sp.GetRequiredService<IOptions<KafkaConfigSecrets>>().Value;
            return new KafkaConsumerService(kafkaConfig.KafkaBroker, kafkaConfig.KafkaGroupId, kafkaConfig.KafkaTopic, kafkaConfig.SchemaRegistry, kafkaConfig.KafkaProcessedTopic);
        });

        services.AddSingleton<IShoppingCartRepository, ShoppingCartRepositoryRepository>();
    }

    public static void ConfigureMiddleware(WebApplication application)
    {
        // Only use Swagger in development
        if (application.Environment.IsDevelopment())
        {
            application.UseSwagger();
            application.UseSwaggerUI();
        }

        application.UseHttpsRedirection();
        application.UseAuthorization();
        application.MapControllers();
    }

    public static async void InitiateKafkaConsumer(KafkaConsumerService kafkaConsumerService,
        CancellationTokenSource? cancellationTokenSource)
    {
        // will be implemented on the next part of the series
    }
}
Enter fullscreen mode Exit fullscreen mode

Now we will implement a KafkaConfigSecrets model which will be used to read kafka settings:

public class KafkaConfigSecrets
{
    public string SchemaRegistry { get; set; } = default!;
    public string KafkaBroker { get; set; } = default!;
    public string KafkaGroupId { get; set; } = default!;
    public string KafkaTopic { get; set; } = default!;
    public string KafkaProcessedTopic { get; set; } = default!;
}
Enter fullscreen mode Exit fullscreen mode

Now, let's enhance our Program.cs class by incorporating the Kafka configurations when the application runs.

using ApacheKafkaBasics.Configuration;

var builder = WebApplication.CreateBuilder(args);

builder.Configuration.AddJsonFile("secret.json", true, true);
// Registering configuration section as a strongly typed class
builder.Services.Configure<KafkaConfigSecrets>(builder.Configuration.GetSection("KafkaConfig"));
// Configure services
KafkaConfiguration.ConfigureServices(builder.Services, builder.Configuration);

var app = builder.Build();
// Configure the HTTP request pipeline
KafkaConfiguration.ConfigureMiddleware(app);

app.Run();
Enter fullscreen mode Exit fullscreen mode

Now lets setup a service for Kafka producer:

public class KafkaProducerService : IKafkaProducerService
{
    private readonly IAdminClient _adminClient;
    private readonly string _topicName;
    private readonly object _queueLock = new();

    public KafkaProducerService(string brokerList, string kafkaTopic, string schemaRegistryUrl)
    {
        var adminConfig = new AdminClientConfig { BootstrapServers = brokerList };
        var schemaRegistryConfig = new SchemaRegistryConfig { Url = schemaRegistryUrl };
        var producerConfig = new ProducerConfig
        {
            BootstrapServers = brokerList,
            // Guarantees delivery of message to topic.
            EnableDeliveryReports = true,
            ClientId = Dns.GetHostName()
        };

        var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

        _adminClient = new AdminClientBuilder(adminConfig).Build();
        _topicName = kafkaTopic;
    }
}

Enter fullscreen mode Exit fullscreen mode

we need to implement another service for Kafka consumer:

public class KafkaConsumerService : IKafkaConsumerService
{

    private readonly string _topicName;
    private readonly string _processedTopicName;
    private readonly ConsumerConfig _consumerConfig;
    private readonly object _queueLock = new();


    private record KafkaMessage(string? Key, int? Partition, CartItem Message);

    public KafkaConsumerService(string brokerList, string groupId, string topic, string schemaRegistryUrl,
        string processedTopic)
    {
        var schemaRegistryConfig = new SchemaRegistryConfig { Url = schemaRegistryUrl };

        _consumerConfig = new ConsumerConfig
        {
            BootstrapServers = brokerList,
            GroupId = groupId,
            EnableAutoCommit = false,
            EnableAutoOffsetStore = false,
            SessionTimeoutMs = 10000, // 10 seconds
            // Read messages from start if no commit exists.
            AutoOffsetReset = AutoOffsetReset.Earliest,
            MaxPollIntervalMs = 500000
        };
        var schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);
        var cachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryConfig);

        _topicName = topic;
        _processedTopicName = processedTopic;

    }
}
Enter fullscreen mode Exit fullscreen mode

In the next installment of this blog series, we will further enhance these services.

In this first part, we covered the basics of Kafka, setting up Kafka locally using Docker, and configuring a .NET application to work with Kafka.

In the upcoming parts, we will implement a Product Cart API that integrates with Kafka to process messages using Kafka Producers and Consumers.

Happy coding! 😀

Top comments (0)