Hereβs a comprehensive example for a Kafka
integration with a Golang
service that includes a Producer
, a Consumer Group
, error and success channel handling, and logging with Uberβs Zap
(Popular for high-performance, structured JSON logging, often used in production). The example includes configuration
, retry logic
, metadata
, and logging
, while minimizing message loss with Kafka best practices.
To prepare for this section you can refer to Kafka Quick Setup and Common Kafka Commands
Project Structure
kafka-example/
βββ config/
β βββ config.go # Parsed config from YAML
β βββ config.yaml # YAML config file
βββ constant/
β βββ constant.go # App constants
βββ consumer/
β βββ main.go # Entry point for the consumer
βββ kafka/
β βββ kafka_consumer_group.go # Consumer Group implementation
β βββ kafka_producer.go # Producer implementation
βββ listener/
β βββ listener.go # Async Producer listener implementation
βββ logger/
β βββ logger.go # Zap logger configuration
βββ logs/ # Folder to store generated log files
βββ producer/
β βββ main.go # Entry point for the producer
βββ go.mod
βββ go.sum
1. Configuration (config/config.yaml
)
The configuration file will store Kafka credentials, log rotation settings, and producer/consumer options.
kafka:
brokers:
- "localhost:29092"
username: "dev-user"
password: "dev-password"
topic: "latestMsgToRedis"
retries: 10
producer_return_successes: true
log:
rotation_size: 50 # 50MB
rotation_count: 7 # 7 days
level: "info"
2. Config Parsing (config/config.go
)
These structs read the configuration from config.yaml
:
package config
import (
"log"
"os"
"gopkg.in/yaml.v3"
)
type KafkaConfig struct {
Brokers []string `yaml:"brokers"`
Username string `yaml:"username"`
Password string `yaml:"password"`
Topic string `yaml:"topic"`
Retries int `yaml:"retries"`
ProducerReturnSuccesses bool `yaml:"producer_return_successes"`
}
type LogConfig struct {
RotationSize int `yaml:"rotation_size"`
RotationCount int `yaml:"rotation_count"`
}
type Config struct {
Kafka KafkaConfig `yaml:"kafka"`
Log LogConfig `yaml:"log"`
}
func LoadConfig(configPath string) (*Config, error) {
_, err := os.Stat(configPath)
if os.IsNotExist(err) {
log.Fatalf("Config file does not exist: %v", err)
}
file, err := os.Open(configPath)
if err != nil {
return nil, err
}
defer file.Close()
var cfg Config
decoder := yaml.NewDecoder(file)
if err := decoder.Decode(&cfg); err != nil {
return nil, err
}
return &cfg, nil
}
3. Constant (constant/constant.go
)
package constant
// Define custom key types to avoid key collisions
type ContextKey string
const (
OperationID ContextKey = "operationID" // For tracking, debugging
OpUserID ContextKey = "opUserID" // For indentifying user accross micro services
)
4. Logger Setup (logger/logger.go
)
Configuring Uber Zap for file logging with rotation:
package logger
import (
"fmt"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
)
// NewLogger initializes a new zap.Logger with log rotation settings
func NewLogger(processID string, rotationSize int, rotationCount int) *zap.Logger {
// Configure lumberjack to handle log rotation by size and age
w := zapcore.AddSync(&lumberjack.Logger{
Filename: fmt.Sprintf("./logs/%s.log", processID), // Log file path based on processID
MaxAge: rotationCount, // Number of days to retain old log files
MaxSize: rotationSize, // Rotate log when it reaches rotationSize MB
})
// Set up the core logging configuration
core := zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), // Use JSON format for log entries
w, // Set log writer with rotation settings
zapcore.InfoLevel, // Set minimum log level to Info
)
// Return the logger with caller information enabled
return zap.New(core, zap.AddCaller())
}
5. Kafka Producer Inplementation (kafka/kafka_producer.go
)
package kafka
import (
"context"
"errors"
"fmt"
"kafka-example/config"
"kafka-example/constant"
"time"
"github.com/IBM/sarama"
"go.uber.org/zap"
)
type MProducer struct {
// producer sarama.AsyncProducer
producer sarama.SyncProducer
topic string
logger *zap.Logger
config *config.KafkaConfig
}
func NewProducer(cfg *config.KafkaConfig, topic string, log *zap.Logger) (*MProducer, error) {
saramaConfig := sarama.NewConfig()
// The total number of times to retry sending a message (default 3)
// the producer will stop retrying to send the message after 5 failed attempts.
// This means the message could be dropped if it hasn't successfully been sent after these retries, potentially resulting in message loss unless other safeguards (like error handling or dead-letter queues) are in place.
saramaConfig.Producer.Retry.Max = 5
// WaitForAll waits for all in-sync replicas to commit before responding.
// The minimum number of in-sync replicas is configured on the broker via the `min.insync.replicas` configuration key.
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
// Setting saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner configures the Kafka producer to use a hash-based partitioner for determining which partition a message should go to.
// This partitioner applies a hash function to the message key, ensuring messages with the same key consistently go to the same partition. This is useful for maintaining ordering for specific keys, as all messages with that key will always be sent to the same partition.
// When sending a message, we must specify the key value of the message. If there is no key, the partition will be selected randomly
saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner
// In sarama.SyncProducer, setting Producer.Return.Successes = true is required to receive message acknowledgments after successful sends.
// Without this, SyncProducer wonβt wait for broker acknowledgment, making it impossible to return partition and offset information for sent messages. Setting this option ensures that SendMessage can confirm successful delivery with metadata, enhancing reliability.
saramaConfig.Producer.Return.Successes = true
if cfg.Username != "" && cfg.Password != "" {
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = cfg.Username
saramaConfig.Net.SASL.Password = cfg.Password
}
// Following only for working with AsyncProducer, where we handle Errors and Succcess asynchronously
// saramaConfig.Producer.Return.Errors = true
// saramaConfig.Producer.Return.Successes = cfg.Kafka.ProducerReturnSuccesses
// ListenAsyncProducerStatus(asyncProcuder,log)
var prod sarama.SyncProducer
var err error
for i := 0; i <= cfg.Retries; i++ {
// prod, err := sarama.NewAsyncProducer(cfg.Kafka.Brokers, saramaConfig)
prod, err = sarama.NewSyncProducer(cfg.Brokers, saramaConfig)
if err == nil {
break
} else {
log.Error("Failed to create producer", zap.Int("tryTime", i), zap.Error(err))
}
time.Sleep(time.Duration(1) * time.Second)
}
if err != nil {
log.Error("Failed to create producer after many tries", zap.Error(err))
return nil, err
}
log.Info("Success to create producer")
// The main differences between sarama.SyncProducer and sarama.AsyncProducer are:
// Message Delivery Mechanism:
// SyncProducer: Sends messages synchronously. Each SendMessage call waits for the brokerβs acknowledgment, making it blocking and ensuring delivery order.
// AsyncProducer: Sends messages asynchronously through channels (Input() for messages, Errors() for errors, and optionally Successes() for successful deliveries). Itβs non-blocking and faster for high-throughput needs.
// Use Cases:
// SyncProducer: Ideal for low-throughput scenarios where message delivery guarantees and ordering are critical.
// AsyncProducer: Suitable for high-throughput applications where latency is prioritized, and managing message acknowledgment and error handling is feasible.
return &MProducer{producer: prod, topic: topic, config: cfg, logger: log}, nil
}
// Send context Data between producer consumers via Header
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
operationID, ok := ctx.Value(constant.OperationID).(string)
if !ok {
err := errors.New("ctx missing operationID")
return nil, err
}
opUserID, ok := ctx.Value(constant.OpUserID).(string)
if !ok {
err := errors.New("ctx missing userID")
return nil, err
}
return []sarama.RecordHeader{
{Key: []byte(constant.OperationID), Value: []byte(operationID)},
{Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
}, nil
}
func (p *MProducer) SendMessage(ctx context.Context, key, msgValue string) error {
header, err := GetMQHeaderWithContext(ctx)
if err != nil {
p.logger.Error("Failed to get Header", zap.Error(err))
}
kafkaMsg := &sarama.ProducerMessage{
Topic: p.topic,
Key: sarama.StringEncoder(key),
Value: sarama.StringEncoder(msgValue),
Headers: header,
}
partition, offset, err := p.producer.SendMessage(kafkaMsg)
if err != nil {
p.logger.Error("Failed to send message", zap.Error(err))
return err
}
fmt.Println("[Message Sent] ", "topic:", p.topic, " - key:", key, " - msg:", msgValue, " - partition:", partition, " - offset:", offset)
// Logging message sent
// p.logger.Info("Message sent",
// zap.String("topic", p.topic),
// zap.String("key", key),
// zap.String("msg", msgValue),
// zap.Int32("partition", partition),
// zap.Int64("offset", offset),
// )
return nil
}
func (p *MProducer) Close() error {
return p.producer.Close()
}
5. Producer Entry Point (producer/main.go
)
The producer sends a message every 3 seconds and includes headers like OperationID
and UserID
for management.
package main
import (
"context"
"fmt"
"math/rand"
"kafka-example/config"
"kafka-example/constant"
"kafka-example/kafka"
"kafka-example/logger"
"time"
"go.uber.org/zap"
)
func startProducer(ctx context.Context, cfg *config.Config, log *zap.Logger) error {
producer, err := kafka.NewProducer(&cfg.Kafka, cfg.Kafka.Topic, log)
if err != nil {
return err
}
defer producer.Close()
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
// Send Messages: A loop that sends messages to Kafka every 3 seconds.
for counter := 1; ; counter++ {
<-ticker.C
producer.SendMessage(ctx, fmt.Sprintf("msg-key-%d", counter), fmt.Sprintf("Counter message %d", counter))
}
}
func main() {
cfg, _ := config.LoadConfig("config/config.yaml")
log := logger.NewLogger("producer", cfg.Log.RotationSize, cfg.Log.RotationCount)
ctx := context.Background()
opID := fmt.Sprintf("op-%d", rand.Intn(1000))
ctx = context.WithValue(ctx, constant.OperationID, opID)
ctx = context.WithValue(ctx, constant.OpUserID, "user-396")
if err := startProducer(ctx, cfg, log); err != nil {
log.Fatal("Failed to start producer", zap.Error(err))
}
}
6. Kafka Consumer Group Inplementation (kafka/kafka_consumer_group.go
)
package kafka
import (
"context"
"kafka-example/config"
"kafka-example/constant"
"time"
"github.com/IBM/sarama"
"go.uber.org/zap"
)
type MConsumerGroup struct {
config *config.KafkaConfig
topic string
group sarama.ConsumerGroup
logger *zap.Logger
}
func NewConsumerGroup(cfg *config.KafkaConfig, topic string, groupId string, consumerId string, logger *zap.Logger) (*MConsumerGroup, error) {
saramaConfig := sarama.NewConfig()
// OffsetOldest stands for the oldest offset available on the broker for a partition.
// We can send this to a client's GetOffset method to get this offset, or when calling ConsumePartition to start consuming from the oldest offset that is still available on the broker.
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
// If enabled, any errors that occurred while consuming are returned on the Errors channel (default disabled).
saramaConfig.Consumer.Return.Errors = true
// Setting saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} specifies how Kafka partitions are assigned to consumers within a consumer group.
// The Range strategy (NewBalanceStrategyRange) divides partitions among consumers by assigning consecutive partitions to each consumer.
// This ensures a balanced distribution of partitions, especially when the number of partitions is divisible by the number of consumers. This strategy is often used to maintain a predictable partition assignment.
// Alternative strategies, like RoundRobin, distribute partitions more evenly in cases with mismatched partition-consumer counts.
saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
if cfg.Username != "" && cfg.Password != "" {
saramaConfig.Net.SASL.Enable = true
saramaConfig.Net.SASL.User = cfg.Username
saramaConfig.Net.SASL.Password = cfg.Password
}
group, err := sarama.NewConsumerGroup(cfg.Brokers, groupId, saramaConfig)
if err != nil {
logger.Error("Failed to create consumer group", zap.Error(err))
return nil, err
}
logger.Info("Success to create or connect to existed consumerGroup", zap.String("consumerID", consumerId))
// Handle errors in consumer group
go func() {
// Handle Errors: Listen for errors in the consumer group by checking the Errors() method on the consumer group session, which provides error events.
for err := range group.Errors() {
logger.Error("Consumer group error", zap.Error(err))
}
}()
return &MConsumerGroup{config: cfg, topic: topic, group: group, logger: logger}, nil
}
func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
var values []string
for _, recordHeader := range cMsg.Headers {
values = append(values, string(recordHeader.Value))
}
mapper := []constant.ContextKey{constant.OperationID, constant.OpUserID}
ctx := context.Background()
for i, value := range values {
ctx = context.WithValue(ctx, mapper[i], value)
}
return ctx
}
func (c *MConsumerGroup) RegisterHandlerAndConsumeMessages(ctx context.Context, handler sarama.ConsumerGroupHandler) {
defer c.group.Close()
for {
if err := c.group.Consume(ctx, []string{c.topic}, handler); err != nil {
c.logger.Error("Error consuming messages", zap.Error(err))
time.Sleep(2 * time.Second) // retry delay
}
}
}
func (c *MConsumerGroup) Close() error {
return c.group.Close()
}
7. Kafka Consumer Entry Point (consumer/main.go
)
Each consumer instance uses a unique clientID
and joins a common consumer group
.
package main
import (
"context"
"fmt"
"math/rand"
"os"
"os/signal"
"kafka-example/config"
"kafka-example/kafka"
"kafka-example/logger"
"syscall"
"time"
"github.com/IBM/sarama"
"go.uber.org/zap"
)
type ConsumerGroupHandler struct {
clientID string
Logger *zap.Logger
consumerGroup *kafka.MConsumerGroup
}
func (handler ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (handler ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (handler ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := handler.consumerGroup.GetContextFromMsg(msg)
fmt.Println("[Message Recieved] ", " timeStamp:", msg.Timestamp.Format("2006-01-02 15:04:05"), "consumerId:", handler.clientID, "context:", ctx, " - topic:", msg.Topic, " - key:", string(msg.Key), " - msgValue:", string(msg.Value), " - partition:", msg.Partition, " - offset:", msg.Offset)
// handler.Logger.Info("Message received",
// zap.String("consumerId", handler.clientID),
// zap.Any("context", ctx),
// zap.String("topic", msg.Topic),
// zap.ByteString("key", msg.Key),
// zap.ByteString("value", msg.Value),
// zap.Int32("partition", msg.Partition),
// zap.Int64("offset", msg.Offset),
// zap.Time("timestamp", msg.Timestamp),
// )
session.MarkMessage(msg, "")
}
return nil
}
func startConsumer(ctx context.Context, cfg *config.Config, log *zap.Logger) error {
clientID := fmt.Sprintf("consumer-%d", rand.Intn(1000))
group, err := kafka.NewConsumerGroup(&cfg.Kafka, cfg.Kafka.Topic, "my-consumer-group", clientID, log)
if err != nil {
return err
}
defer group.Close()
handler := ConsumerGroupHandler{Logger: log, clientID: clientID, consumerGroup: group}
group.RegisterHandlerAndConsumeMessages(ctx, handler)
return nil
}
func main() {
cfg, _ := config.LoadConfig("config/config.yaml")
log := logger.NewLogger("consumer", cfg.Log.RotationSize, cfg.Log.RotationCount)
// Start Consumer in Background
ctx, cancel := context.WithCancel(context.Background())
go func() {
if err := startConsumer(ctx, cfg, log); err != nil {
log.Fatal("Failed to start consumer", zap.Error(err))
}
}()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
cancel()
time.Sleep(2 * time.Second)
log.Info("Shutting down gracefully")
}
8. Kafka Async Producer Listener (Optional) (listener/listener.go
)
The background jobs listens to Return.Errors
and Return.Successes
channels of Async Producer, logging message status.
package listener
import (
"github.com/IBM/sarama"
"go.uber.org/zap"
)
// In case work with asyncProducer
func ListenAsyncProducerStatus(producer sarama.AsyncProducer, log *zap.Logger) {
go func() {
for err := range producer.Errors() {
// Convert sarama.Encoder to []byte, then to string
valueBytes, _ := err.Msg.Value.Encode()
log.Error("Producer error", zap.Error(err.Err), zap.String("msg", string(valueBytes)))
}
}()
go func() {
for msg := range producer.Successes() {
log.Info("Message acknowledged", zap.String("topic", msg.Topic), zap.Int32("partition", msg.Partition), zap.Int64("offset", msg.Offset))
}
}()
}
9. Run Producer & Comsumer
Run Producer
, check to see console log and auto generated logging files in logs
folder
go run producer/main.go
Run Comsumer
in another Terminal Window
go run consumer/main.go
This setup includes a reusable Kafka producer and consumer service with error handling, retry, logging to both file and console, and metadata (headers). Demonstrating a Kafka producer
and Consumer Group
integration using sarama
and logs withuber/zap
. The components are modular, making it easy to extend the functionality to new Kafka topics
and reuse the logger
setup.
If you found this helpful, let me know by leaving a π or a comment!, or if you think this post could help someone, feel free to share it! Thank you very much! π
Top comments (1)
Hello, excuse me, can I ask if you use Go language in your company? I'm trying to find some job opportunities.
I am also a Go language developer.