In this blog, we’ll dive into Kafka, a distributed streaming platform, and learn how to create a Producer and Consumer in .NET 6 using ASP.NET Core. We’ll cover Kafka’s core concepts, provide detailed explanations for each code snippet, and build a functional application that sends and receives messages.
What is Kafka?
Kafka is a high-throughput, distributed messaging system designed to handle real-time data streams. It has three key components:
- Producer: Sends data (messages) to Kafka topics.
- Consumer: Reads data (messages) from Kafka topics.
- Broker: A Kafka server that stores and manages incoming messages. Kafka typically runs in a cluster with multiple brokers.
Kafka organizes data into topics, which are like categories for storing messages. Messages within topics are immutable and ordered.
Core Kafka Concepts
- Topic: Logical channel to which messages are sent.
- Partition: Each topic is divided into partitions for parallel processing. Partitions ensure scalability.
- Offset: Unique identifier for messages within a partition.
- Broker: Kafka server managing topics and partitions.
- Producer: Sends data to Kafka topics.
- Consumer: Reads data from Kafka topics and processes it.
- Group: Consumers are organized into groups to share load and ensure each message is processed by one consumer within the group.
Prerequisites
- .NET 6 SDK: Download and install from the official .NET website.
- Kafka Installed: Follow the Kafka installation guide or use Docker to set up Kafka.
Getting Started with Kafka in .NET 6
Step 1: Set up Kafka
1.Start Zookeeper (Kafka’s dependency):
zookeeper-server-start.bat ..\..\config\zookeeper.properties
2.Start Kafka:
kafka-server-start.bat ..\..\config\server.properties
3.Create a Kafka topic for this example:
kafka-topics.bat --create --topic fruit --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1
Step 2: Create a .NET 6 Project
Run the following command to create an ASP.NET Core Web API project:
dotnet new webapi -n KafkaProducerConsumer
cd KafkaProducerConsumer
Step 3: Install Kafka Library
Kafka communication in .NET is enabled by the Confluent.Kafka library. Install it via NuGet:
dotnet add package Confluent.Kafka
dotnet add package Swashbuckle.AspNetCore
Kafka in .NET 6: Step-by-Step Implementation
We will build two services:
- Kafka Producer Service: Sends messages to Kafka topics.
- Kafka Consumer Service: Continuously listens to and processes messages from Kafka topics.
Step 4: Configure Kafka Settings
Add Kafka configurations in appsettings.json to simplify access throughout the application:
{
"Kafka": {
"BootstrapServers": "localhost:9092"
},
}
Explanation:
- BootstrapServers: Address of the Kafka broker.
- Topic: The topic where messages will be sent or received.
- GroupId: Identifies the consumer group for message processing.
Step 5: Create the Kafka Producer Service
Create a folder named Services
and add KafkaProducerService.cs
.
using Confluent.Kafka;
namespace KafkaExample.Services;
public interface IKafkaProducerService
{
Task SendMessageAsync(string topic, string message);
}
public class KafkaProducerService : IKafkaProducerService
{
private readonly IProducer<Null, string> _producer;
// Constructor to initialize Kafka producer with configuration
public KafkaProducerService()
{
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092" // Kafka server details (ensure this is correct)
};
_producer = new ProducerBuilder<Null, string>(config).Build();
}
// Method to send message to Kafka topic
public async Task SendMessageAsync(string topic, string message)
{
try
{
// Send message to the specified Kafka topic
await _producer.ProduceAsync(topic, new Message<Null, string> { Value = message });
Console.WriteLine($"Message '{message}' sent to topic '{topic}'.");
}
catch (Exception ex)
{
// Log any errors encountered while sending message
Console.WriteLine($"Error sending message to Kafka: {ex.Message}");
throw;
}
}
}
Explanation:
- ProducerConfig: Configures the producer, specifying the Kafka broker.
- ProduceAsync: Sends a message to Kafka asynchronously.
- Null: Key is set to null since our example doesn’t use keyed messages.
- _producer: The Kafka producer instance sends messages to the specified topic.
Step 6: Create the Kafka Consumer Service
Add KafkaConsumerService.cs to the Services folder.
using Confluent.Kafka;
using System;
using System.Threading.Tasks;
namespace KafkaExample.Services
{
public class KafkaConsumerService
{
private readonly IConsumer<Null, string> _consumer;
public KafkaConsumerService()
{
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "my-consumer-group",
AutoOffsetReset = AutoOffsetReset.Earliest
};
_consumer = new ConsumerBuilder<Null, string>(config).Build();
}
public void ConsumeMessages(string topic)
{
_consumer.Subscribe(topic);
try
{
while (true)
{
var consumeResult = _consumer.Consume();
Console.WriteLine($"Consumed message: {consumeResult.Message.Value}");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error consuming message: {e.Error.Reason}");
}
}
}
}
Explanation:
- ConsumerConfig: Configures the consumer to connect to the broker and specify the consumer group.
- Subscribe: Subscribes the consumer to a topic.
- Consume: Reads messages from the topic.
- AutoOffsetReset.Earliest: Ensures the consumer starts reading messages from the beginning of the topic if no offsets exist.
Step 7: Register Services in Program.cs
Add both Kafka services to the application in Program.cs.
using KafkaExample.Services;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
var builder = WebApplication.CreateBuilder(args);
// Register Kafka producer service with Dependency Injection
builder.Services.AddSingleton<IKafkaProducerService, KafkaProducerService>();
// Add controllers (required for API endpoints)
builder.Services.AddControllers();
// Add Swagger for API documentation
builder.Services.AddEndpointsApiExplorer(); // For Swagger UI
builder.Services.AddSwaggerGen(); // For Swagger UI
var app = builder.Build();
// Configure Swagger in development environment
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
// Use HTTPS Redirection
app.UseHttpsRedirection();
// Configure the HTTP request pipeline to use controllers
app.MapControllers();
app.Run();
Step 8: Create an API Endpoint for Sending Messages.
Add a KafkaController.cs in the Controllers folder to handle message requests.
using KafkaExample.Services;
using Microsoft.AspNetCore.Mvc;
namespace KafkaProducerConsumer.Controllers
{
[ApiController]
[Route("api/[controller]")]
public class KafkaController : ControllerBase
{
private readonly IKafkaProducerService _producerService;
public KafkaController(IKafkaProducerService producerService)
{
_producerService = producerService;
}
[HttpPost("send")]
public async Task<IActionResult> SendMessage([FromQuery] string topic, [FromQuery] string message)
{
if (string.IsNullOrEmpty(topic) || string.IsNullOrEmpty(message))
{
return BadRequest("Both 'topic' and 'message' query parameters are required.");
}
await _producerService.SendMessageAsync(topic, message);
return Ok($"Message '{message}' sent successfully to topic '{topic}'.");
}
}
}
Explanation:
This API accepts a message as a query parameter and passes it to the Kafka producer.
This will be your solution explorer should look like:
Step 9: Run and Test
1.Start the Kafka and Zookeeper servers.
2.Run the .NET application:
dotnet run
3.Use a REST client like Postman to send a message
POST
http://localhost:5292/api/kafka/send?topic=fruit&message=apple
Kafka Workflow Recap
- The producer sends the message to the fruits topic.
- The Kafka broker receives and stores the message.
- The consumer reads the message from the topic and processes it.
This demonstrates the basic producer-consumer pattern in Kafka, integrated with a .NET 6 application.
You will get source code from github.
Conclusion
In this blog, we explored Kafka concepts and implemented a real-time producer-consumer application in .NET 6. Kafka is highly scalable, fault-tolerant, and suitable for distributed systems. This example can be extended to include advanced features like message keying, batch processing, error handling, and monitoring.
Happy coding!
Top comments (0)