DEV Community

Cover image for 🐹 Golang Integration with Kafka and Uber ZapLog πŸ“¨
Truong Phung
Truong Phung

Posted on

🐹 Golang Integration with Kafka and Uber ZapLog πŸ“¨

Here’s a comprehensive example for a Kafka integration with a Golang service that includes a Producer, a Consumer Group, error and success channel handling, and logging with Uber’s Zap (Popular for high-performance, structured JSON logging, often used in production). The example includes configuration, retry logic, metadata, and logging, while minimizing message loss with Kafka best practices.

To prepare for this section you can refer to Kafka Quick Setup and Common Kafka Commands

Project Structure

kafka-example/
β”œβ”€β”€ config/
β”‚   β”œβ”€β”€ config.go     # Parsed config from YAML
β”‚   β”œβ”€β”€ config.yaml   # YAML config file
β”œβ”€β”€ constant/
β”‚   β”œβ”€β”€ constant.go   # App constants
β”œβ”€β”€ consumer/
β”‚   β”œβ”€β”€ main.go       # Entry point for the consumer 
β”œβ”€β”€ kafka/
β”‚   β”œβ”€β”€ kafka_consumer_group.go  # Consumer Group implementation
β”‚   β”œβ”€β”€ kafka_producer.go        # Producer implementation
β”œβ”€β”€ listener/
β”‚   β”œβ”€β”€ listener.go    # Async Producer listener implementation
β”œβ”€β”€ logger/
β”‚   β”œβ”€β”€ logger.go      # Zap logger configuration
β”œβ”€β”€ logs/              # Folder to store generated log files
β”œβ”€β”€ producer/
β”‚   β”œβ”€β”€ main.go        # Entry point for the producer  
β”œβ”€β”€ go.mod
β”œβ”€β”€ go.sum
Enter fullscreen mode Exit fullscreen mode

1. Configuration (config/config.yaml)

The configuration file will store Kafka credentials, log rotation settings, and producer/consumer options.

kafka:
  brokers:
    - "localhost:29092"
  username: "dev-user"
  password: "dev-password"
  topic: "latestMsgToRedis"
  retries: 10
  producer_return_successes: true
log:
  rotation_size: 50 # 50MB
  rotation_count: 7 # 7 days
  level: "info"
Enter fullscreen mode Exit fullscreen mode

2. Config Parsing (config/config.go)

These structs read the configuration from config.yaml:

package config

import (
    "log"
    "os"

    "gopkg.in/yaml.v3"
)

type KafkaConfig struct {
    Brokers                 []string `yaml:"brokers"`
    Username                string   `yaml:"username"`
    Password                string   `yaml:"password"`
    Topic                   string   `yaml:"topic"`
    Retries                 int      `yaml:"retries"`
    ProducerReturnSuccesses bool     `yaml:"producer_return_successes"`
}

type LogConfig struct {
    RotationSize  int `yaml:"rotation_size"`
    RotationCount int `yaml:"rotation_count"`
}

type Config struct {
    Kafka KafkaConfig `yaml:"kafka"`
    Log   LogConfig   `yaml:"log"`
}

func LoadConfig(configPath string) (*Config, error) {
    _, err := os.Stat(configPath)
    if os.IsNotExist(err) {
        log.Fatalf("Config file does not exist: %v", err)
    }

    file, err := os.Open(configPath)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    var cfg Config
    decoder := yaml.NewDecoder(file)
    if err := decoder.Decode(&cfg); err != nil {
        return nil, err
    }

    return &cfg, nil
}
Enter fullscreen mode Exit fullscreen mode

3. Constant (constant/constant.go)

package constant

// Define custom key types to avoid key collisions
type ContextKey string

const (
    OperationID ContextKey = "operationID" // For tracking, debugging
    OpUserID    ContextKey = "opUserID"    // For indentifying user accross micro services
)
Enter fullscreen mode Exit fullscreen mode

4. Logger Setup (logger/logger.go)

Configuring Uber Zap for file logging with rotation:

package logger

import (
    "fmt"

    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
    "gopkg.in/natefinch/lumberjack.v2"
)

// NewLogger initializes a new zap.Logger with log rotation settings
func NewLogger(processID string, rotationSize int, rotationCount int) *zap.Logger {
    // Configure lumberjack to handle log rotation by size and age
    w := zapcore.AddSync(&lumberjack.Logger{
        Filename: fmt.Sprintf("./logs/%s.log", processID), // Log file path based on processID
        MaxAge:   rotationCount,                           // Number of days to retain old log files
        MaxSize:  rotationSize,                            // Rotate log when it reaches rotationSize MB
    })

    // Set up the core logging configuration
    core := zapcore.NewCore(
        zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), // Use JSON format for log entries
        w,                 // Set log writer with rotation settings
        zapcore.InfoLevel, // Set minimum log level to Info
    )

    // Return the logger with caller information enabled
    return zap.New(core, zap.AddCaller())
}
Enter fullscreen mode Exit fullscreen mode

5. Kafka Producer Inplementation (kafka/kafka_producer.go)

package kafka

import (
    "context"
    "errors"
    "fmt"
    "kafka-example/config"
    "kafka-example/constant"
    "time"

    "github.com/IBM/sarama"
    "go.uber.org/zap"
)

type MProducer struct {
    // producer sarama.AsyncProducer
    producer sarama.SyncProducer
    topic    string
    logger   *zap.Logger
    config   *config.KafkaConfig
}

func NewProducer(cfg *config.KafkaConfig, topic string, log *zap.Logger) (*MProducer, error) {
    saramaConfig := sarama.NewConfig()

    // The total number of times to retry sending a message (default 3)
    // the producer will stop retrying to send the message after 5 failed attempts.
    // This means the message could be dropped if it hasn't successfully been sent after these retries, potentially resulting in message loss unless other safeguards (like error handling or dead-letter queues) are in place.
    saramaConfig.Producer.Retry.Max = 5

    // WaitForAll waits for all in-sync replicas to commit before responding.
    // The minimum number of in-sync replicas is configured on the broker via the `min.insync.replicas` configuration key.
    saramaConfig.Producer.RequiredAcks = sarama.WaitForAll

    // Setting saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner configures the Kafka producer to use a hash-based partitioner for determining which partition a message should go to.
    // This partitioner applies a hash function to the message key, ensuring messages with the same key consistently go to the same partition. This is useful for maintaining ordering for specific keys, as all messages with that key will always be sent to the same partition.
    // When sending a message, we must specify the key value of the message. If there is no key, the partition will be selected randomly
    saramaConfig.Producer.Partitioner = sarama.NewHashPartitioner

    // In sarama.SyncProducer, setting Producer.Return.Successes = true is required to receive message acknowledgments after successful sends.
    // Without this, SyncProducer won’t wait for broker acknowledgment, making it impossible to return partition and offset information for sent messages. Setting this option ensures that SendMessage can confirm successful delivery with metadata, enhancing reliability.
    saramaConfig.Producer.Return.Successes = true

    if cfg.Username != "" && cfg.Password != "" {
        saramaConfig.Net.SASL.Enable = true
        saramaConfig.Net.SASL.User = cfg.Username
        saramaConfig.Net.SASL.Password = cfg.Password
    }

    // Following only for working with AsyncProducer, where we handle Errors and Succcess asynchronously
    // saramaConfig.Producer.Return.Errors = true
    // saramaConfig.Producer.Return.Successes = cfg.Kafka.ProducerReturnSuccesses
    // ListenAsyncProducerStatus(asyncProcuder,log)

    var prod sarama.SyncProducer
    var err error

    for i := 0; i <= cfg.Retries; i++ {
        // prod, err := sarama.NewAsyncProducer(cfg.Kafka.Brokers, saramaConfig)
        prod, err = sarama.NewSyncProducer(cfg.Brokers, saramaConfig)
        if err == nil {
            break
        } else {
            log.Error("Failed to create producer", zap.Int("tryTime", i), zap.Error(err))
        }

        time.Sleep(time.Duration(1) * time.Second)
    }

    if err != nil {
        log.Error("Failed to create producer after many tries", zap.Error(err))
        return nil, err
    }

    log.Info("Success to create producer")

    // The main differences between sarama.SyncProducer and sarama.AsyncProducer are:
    // Message Delivery Mechanism:
    // SyncProducer: Sends messages synchronously. Each SendMessage call waits for the broker’s acknowledgment, making it blocking and ensuring delivery order.
    // AsyncProducer: Sends messages asynchronously through channels (Input() for messages, Errors() for errors, and optionally Successes() for successful deliveries). It’s non-blocking and faster for high-throughput needs.
    // Use Cases:
    // SyncProducer: Ideal for low-throughput scenarios where message delivery guarantees and ordering are critical.
    // AsyncProducer: Suitable for high-throughput applications where latency is prioritized, and managing message acknowledgment and error handling is feasible.

    return &MProducer{producer: prod, topic: topic, config: cfg, logger: log}, nil
}

// Send context Data between producer consumers via Header
func GetMQHeaderWithContext(ctx context.Context) ([]sarama.RecordHeader, error) {
    operationID, ok := ctx.Value(constant.OperationID).(string)
    if !ok {
        err := errors.New("ctx missing operationID")
        return nil, err
    }
    opUserID, ok := ctx.Value(constant.OpUserID).(string)
    if !ok {
        err := errors.New("ctx missing userID")
        return nil, err
    }
    return []sarama.RecordHeader{
        {Key: []byte(constant.OperationID), Value: []byte(operationID)},
        {Key: []byte(constant.OpUserID), Value: []byte(opUserID)},
    }, nil
}

func (p *MProducer) SendMessage(ctx context.Context, key, msgValue string) error {

    header, err := GetMQHeaderWithContext(ctx)
    if err != nil {
        p.logger.Error("Failed to get Header", zap.Error(err))
    }

    kafkaMsg := &sarama.ProducerMessage{
        Topic:   p.topic,
        Key:     sarama.StringEncoder(key),
        Value:   sarama.StringEncoder(msgValue),
        Headers: header,
    }

    partition, offset, err := p.producer.SendMessage(kafkaMsg)
    if err != nil {
        p.logger.Error("Failed to send message", zap.Error(err))
        return err
    }

    fmt.Println("[Message Sent] ", "topic:", p.topic, " - key:", key, " - msg:", msgValue, " - partition:", partition, " - offset:", offset)

    // Logging message sent
    // p.logger.Info("Message sent",
    //  zap.String("topic", p.topic),
    //  zap.String("key", key),
    //  zap.String("msg", msgValue),
    //  zap.Int32("partition", partition),
    //  zap.Int64("offset", offset),
    // )

    return nil
}

func (p *MProducer) Close() error {
    return p.producer.Close()
}
Enter fullscreen mode Exit fullscreen mode

5. Producer Entry Point (producer/main.go)

The producer sends a message every 3 seconds and includes headers like OperationID and UserID for management.

package main

import (
    "context"
    "fmt"
    "math/rand"
    "kafka-example/config"
    "kafka-example/constant"
    "kafka-example/kafka"
    "kafka-example/logger"
    "time"

    "go.uber.org/zap"
)

func startProducer(ctx context.Context, cfg *config.Config, log *zap.Logger) error {

    producer, err := kafka.NewProducer(&cfg.Kafka, cfg.Kafka.Topic, log)
    if err != nil {
        return err
    }
    defer producer.Close()

    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    // Send Messages: A loop that sends messages to Kafka every 3 seconds.
    for counter := 1; ; counter++ {
        <-ticker.C

        producer.SendMessage(ctx, fmt.Sprintf("msg-key-%d", counter), fmt.Sprintf("Counter message %d", counter))

    }
}

func main() {
    cfg, _ := config.LoadConfig("config/config.yaml")
    log := logger.NewLogger("producer", cfg.Log.RotationSize, cfg.Log.RotationCount)

    ctx := context.Background()

    opID := fmt.Sprintf("op-%d", rand.Intn(1000))

    ctx = context.WithValue(ctx, constant.OperationID, opID)
    ctx = context.WithValue(ctx, constant.OpUserID, "user-396")

    if err := startProducer(ctx, cfg, log); err != nil {
        log.Fatal("Failed to start producer", zap.Error(err))
    }
}
Enter fullscreen mode Exit fullscreen mode

6. Kafka Consumer Group Inplementation (kafka/kafka_consumer_group.go)

package kafka

import (
    "context"
    "kafka-example/config"
    "kafka-example/constant"
    "time"

    "github.com/IBM/sarama"
    "go.uber.org/zap"
)

type MConsumerGroup struct {
    config *config.KafkaConfig
    topic  string
    group  sarama.ConsumerGroup
    logger *zap.Logger
}

func NewConsumerGroup(cfg *config.KafkaConfig, topic string, groupId string, consumerId string, logger *zap.Logger) (*MConsumerGroup, error) {
    saramaConfig := sarama.NewConfig()

    // OffsetOldest stands for the oldest offset available on the broker for a partition.
    // We can send this to a client's GetOffset method to get this offset, or when calling ConsumePartition to start consuming from the oldest offset that is still available on the broker.
    saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

    // If enabled, any errors that occurred while consuming are returned on the Errors channel (default disabled).
    saramaConfig.Consumer.Return.Errors = true

    // Setting saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()} specifies how Kafka partitions are assigned to consumers within a consumer group.
    // The Range strategy (NewBalanceStrategyRange) divides partitions among consumers by assigning consecutive partitions to each consumer.
    // This ensures a balanced distribution of partitions, especially when the number of partitions is divisible by the number of consumers. This strategy is often used to maintain a predictable partition assignment.
    // Alternative strategies, like RoundRobin, distribute partitions more evenly in cases with mismatched partition-consumer counts.
    saramaConfig.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}

    if cfg.Username != "" && cfg.Password != "" {
        saramaConfig.Net.SASL.Enable = true
        saramaConfig.Net.SASL.User = cfg.Username
        saramaConfig.Net.SASL.Password = cfg.Password
    }

    group, err := sarama.NewConsumerGroup(cfg.Brokers, groupId, saramaConfig)
    if err != nil {
        logger.Error("Failed to create consumer group", zap.Error(err))
        return nil, err
    }

    logger.Info("Success to create or connect to existed consumerGroup", zap.String("consumerID", consumerId))

    // Handle errors in consumer group
    go func() {
        // Handle Errors: Listen for errors in the consumer group by checking the Errors() method on the consumer group session, which provides error events.
        for err := range group.Errors() {
            logger.Error("Consumer group error", zap.Error(err))
        }
    }()

    return &MConsumerGroup{config: cfg, topic: topic, group: group, logger: logger}, nil
}

func (mc *MConsumerGroup) GetContextFromMsg(cMsg *sarama.ConsumerMessage) context.Context {
    var values []string
    for _, recordHeader := range cMsg.Headers {
        values = append(values, string(recordHeader.Value))
    }
    mapper := []constant.ContextKey{constant.OperationID, constant.OpUserID}
    ctx := context.Background()
    for i, value := range values {
        ctx = context.WithValue(ctx, mapper[i], value)
    }
    return ctx
}

func (c *MConsumerGroup) RegisterHandlerAndConsumeMessages(ctx context.Context, handler sarama.ConsumerGroupHandler) {
    defer c.group.Close()
    for {
        if err := c.group.Consume(ctx, []string{c.topic}, handler); err != nil {
            c.logger.Error("Error consuming messages", zap.Error(err))
            time.Sleep(2 * time.Second) // retry delay
        }
    }
}

func (c *MConsumerGroup) Close() error {
    return c.group.Close()
}
Enter fullscreen mode Exit fullscreen mode

7. Kafka Consumer Entry Point (consumer/main.go)

Each consumer instance uses a unique clientID and joins a common consumer group.

package main

import (
    "context"
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "kafka-example/config"
    "kafka-example/kafka"
    "kafka-example/logger"
    "syscall"
    "time"

    "github.com/IBM/sarama"
    "go.uber.org/zap"
)

type ConsumerGroupHandler struct {
    clientID      string
    Logger        *zap.Logger
    consumerGroup *kafka.MConsumerGroup
}

func (handler ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (handler ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (handler ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        ctx := handler.consumerGroup.GetContextFromMsg(msg)

        fmt.Println("[Message Recieved] ", " timeStamp:", msg.Timestamp.Format("2006-01-02 15:04:05"), "consumerId:", handler.clientID, "context:", ctx, " - topic:", msg.Topic, " - key:", string(msg.Key), " - msgValue:", string(msg.Value), " - partition:", msg.Partition, " - offset:", msg.Offset)

        // handler.Logger.Info("Message received",
        //  zap.String("consumerId", handler.clientID),
        //  zap.Any("context", ctx),
        //  zap.String("topic", msg.Topic),
        //  zap.ByteString("key", msg.Key),
        //  zap.ByteString("value", msg.Value),
        //  zap.Int32("partition", msg.Partition),
        //  zap.Int64("offset", msg.Offset),
        //  zap.Time("timestamp", msg.Timestamp),
        // )
        session.MarkMessage(msg, "")
    }
    return nil
}

func startConsumer(ctx context.Context, cfg *config.Config, log *zap.Logger) error {
    clientID := fmt.Sprintf("consumer-%d", rand.Intn(1000))

    group, err := kafka.NewConsumerGroup(&cfg.Kafka, cfg.Kafka.Topic, "my-consumer-group", clientID, log)
    if err != nil {
        return err
    }
    defer group.Close()

    handler := ConsumerGroupHandler{Logger: log, clientID: clientID, consumerGroup: group}
    group.RegisterHandlerAndConsumeMessages(ctx, handler)

    return nil
}

func main() {
    cfg, _ := config.LoadConfig("config/config.yaml")
    log := logger.NewLogger("consumer", cfg.Log.RotationSize, cfg.Log.RotationCount)

    // Start Consumer in Background
    ctx, cancel := context.WithCancel(context.Background())
    go func() {
        if err := startConsumer(ctx, cfg, log); err != nil {
            log.Fatal("Failed to start consumer", zap.Error(err))
        }
    }()

    // Handle graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan
    cancel()
    time.Sleep(2 * time.Second)
    log.Info("Shutting down gracefully")
}
Enter fullscreen mode Exit fullscreen mode

8. Kafka Async Producer Listener (Optional) (listener/listener.go)

The background jobs listens to Return.Errors and Return.Successeschannels of Async Producer, logging message status.

package listener

import (
    "github.com/IBM/sarama"
    "go.uber.org/zap"
)

// In case work with asyncProducer
func ListenAsyncProducerStatus(producer sarama.AsyncProducer, log *zap.Logger) {
    go func() {
        for err := range producer.Errors() {
            // Convert sarama.Encoder to []byte, then to string
            valueBytes, _ := err.Msg.Value.Encode()
            log.Error("Producer error", zap.Error(err.Err), zap.String("msg", string(valueBytes)))
        }
    }()

    go func() {
        for msg := range producer.Successes() {
            log.Info("Message acknowledged", zap.String("topic", msg.Topic), zap.Int32("partition", msg.Partition), zap.Int64("offset", msg.Offset))
        }
    }()
}
Enter fullscreen mode Exit fullscreen mode

9. Run Producer & Comsumer

Run Producer, check to see console log and auto generated logging files in logs folder

go run producer/main.go
Enter fullscreen mode Exit fullscreen mode

Run Comsumer in another Terminal Window

go run consumer/main.go
Enter fullscreen mode Exit fullscreen mode

This setup includes a reusable Kafka producer and consumer service with error handling, retry, logging to both file and console, and metadata (headers). Demonstrating a Kafka producer and Consumer Group integration using sarama and logs withuber/zap. The components are modular, making it easy to extend the functionality to new Kafka topics and reuse the logger setup.

If you found this helpful, let me know by leaving a πŸ‘ or a comment!, or if you think this post could help someone, feel free to share it! Thank you very much! πŸ˜ƒ

Top comments (1)

Collapse
 
jiejaitt profile image
黄英捷 • Edited

Hello, excuse me, can I ask if you use Go language in your company? I'm trying to find some job opportunities.

I am also a Go language developer.