DEV Community

Cover image for Learning Microservices With Go(Part 5). Asynchronous Communication. (Kafka)
Manav Kushwaha
Manav Kushwaha

Posted on

Learning Microservices With Go(Part 5). Asynchronous Communication. (Kafka)

Intro

This post is the fifth part of the "Learning Microservices with Go". I'm writing these posts as I'm learning the concepts. If you haven't checked out the part 4, here's the link for it: Part 4.
Link for Git Repo: Github

In last part, we added gRPC code to our project. We used gRPC logic for synchronous communication with different services.

In this part we'll be learning about asynchronous communication, what are it's benefits, where to use it and also it's implementation (using Kafka).

Let's start!

SYNCHRONOUS VS ASYNCHRONOUS COMMUNICATION

You might be asking what's the difference between the two types of communication.

  • For synchronous communication, the sender sends a request and waits for the reply from the receiver. Only if the reply comes(or request times out) does the sender continue with it's work.
  • For asynchronous communication, the sender sends a request and doesn't wait for the reply from the receiver(it only waits for the confirmation that the message was sent successfully). The receiver can these choose to send the reply(or not send any reply) based on the implementation.

An analogy is like a phone call vs email.
Both parties need to be sending and receiving actively in case of phone call, but in case of email, the receiver can reply at it's own pace or not even reply.

BENEFITS AND CHALLENGES TO ASYNCHRONOUS COMMUNICATION

Benefits

  1. Streamlined approach to processing messages:

    • Eg: A service processing messages and sending them, can focus on processing and not focus on the response from other services.
    • It just needs confirmation that the message was sent.
  2. Decouple sending and processing requests:

    • Eg: When one service requests to download a video from a server, it may take minutes/hours to do that.
    • If we use the asynchronous model, it can just send the task to the server, get the acknowledgement from the server. The service can continue it's work and the downloading can happen asynchronously in the background.
    • Then when the task is complete, it'll receive a notification of completion.
  3. Better Load Balancing:

    • Some applications don't have even request loads and may be prone to spikes.
    • Having synchronous communication would require it to handle all of the requests at the same time.
    • This would make the service unable to handle requests properly, also affecting the clients. It might cause delays in response.
    • Eg: Trying to handle thousands of calls at the same time.

PLACES TO USE ASYNC COMMUNICATION

Some tasks do require immediate response and hence synchronous communication needs to be used. Hence we should know when to use Async. Here are some places where async communication is recommended:

  1. Long running tasks
    eg: Video processing. The caller wouldn't need to wait and would be notified when the task is completed or failed.

  2. Send once, processed by multiple components
    Certain types of messages, eg: status reports can be processed by multiple independent components.
    Instead of sending it to each client independently, the message can get published to a component, able to be consumed by everyone.

  3. High performance sequential tasks:
    Some operations when performed sequentially and/or (in batches) are more effecient.
    Eg: Writing to HDD. If performed sequentially, result in lower latency(increase performance).
    Async communication provides these improvements as the server can process the incoming request in it's own speed and process tasks one after the other.

CHALLENGES

There are some challenges to using the asynchronous communication also. Some of these are:

  1. Complex Error Handling
    If any message is lost, we need to think about many cases, was the message sent properly? was the message recieved? did the response get lost?
    If we're using synchronous communication, we'd have immediately known if this happened and could have retried/took other measures.

  2. Additional Components for Message delivery
    Certain use cases require additional software for delivering the message.
    These software also perform additional operations eg: message batching, storing.
    This causes increase in the complexity in the system.

  3. Async data flow may seem non-intuitive and more complex
    Unlike the synchronous communcation, where every request is followed by a response, in Async, there might not be any response.
    Or the response may require additional steps to be taken.

TECHNIQUES AND PATTERNS OF ASYNC COMMUNICATION

Lets describe some patterns and concepts that'll help us in the asynchronous interactions.

Message Broker

Message broker is an intermediary component. It can be used for:

  • Message delivery: Delivery of message to one or more servers.
  • Message transformation: Transform the message to a format that's consumable by other services.
  • Message aggregation: Combines multiple message to send a single message improving efficiency in delivery/processing.
  • Message routing: Routing the messages to their appropriate destinations using some set rules.

Eg: Sending an email, the mail platform(eg: Google for gmail) would be acting like message broker.
We don't need to know how to send the message to other person. Google takes care of it.

Message Broker

Guarantees for message delivery:
A message broker can provide different types of guarantees for message delivery. These may be:

  • Atleast once: Message delivered. May deliver more than once in case of failure. Broker just resends in case of failure.
  • Exactly once: Message delivered exactly once. Broker needs to perform additional checks for exactly once delivery.
  • Atmost once: Message delivered 0 or 1 time.

Based on the guarantees, we can also classify the message broker as Lossy or Lossless.

  • Lossy: When message broker can sometimes(in failures) lose the messages. Eg: Atmost-once.
  • Lossless: When message broker will not lose the messages. Eg: Atleast, exactly once.

Publisher-Subcriber model

This is a model of communication between microservices(not limited to that) where every microservices can publish messages or subscribe to the relavant ones.
Eg: Following someone on twitter will make you subscribe to them. Whenever they publish(post) something, you'll get that in your feed.

The pub-sub model provides a flexible solution for sending messages in a system where the messages can be produced and consumed by multiple components.
The publisher doesn't need to worry about the delivery of the messages and about any failures.
Also the subscriber can just subscribe to relavant message and get them delivered without needing to talk to the publishers directly.

Pub Sub Model

APACHE KAFKA

Apache Kafka is an open source message broker system. It was developed at LinkedIn. Very widely used around the world.

Kafka Basics

  • The component that produces messages is called a producer.
  • Messages are published in a sequential order to objects called topics.
  • Each message in a topic, has a unique numerical offset in it.
  • Kafka provides the APIs for consuming messages for consuming existing topics(that component is called Consumer)
  • Topics can also be partitioned to allow multiple consumers to consume from them(eg: Parallel data processing)

Benefits

  • High write and read throughput: Kafka is optimized for highly performant read & writes. It does this by doing as many sequential writes and reads as possible, allowing it to make use of HDD, as well as sequentially sending large data over the network.
  • Scalability: Using topic partitioning, devs can achieve parallel processing of their data.
  • Flexible durability: Devs can configure policies for storing data in Kafka, eg: message retention. Messages can be configured to be stored for a fixed duration(eg: 1day) or indefinitely(till storage is available)

ADOPTING KAFKA FOR OUR MOVIE-APP

  • We want to now work with a ratings provider(eg: IMDB), which will provide ratings for a movie to us.
  • We need to ingest these ratings along with the rating API that we'd created earlier. These two can work side by side.
  • We can make this communication*async*as we don't need to process this bulk information immediately.
  • Our rating service could do this periodically or even immediately when there's new data published.
  • We'll use Apache Kafka as our message broker.

Data Flow Model

First we'll implement:

  1. Application to produce the ratings data.
  2. Consumption of data from Apache Kafka and storing those ratings in our ratins repository.

Let's assume that the data provider will provide us JSON formatted data(Most external APIs use JSON format).

[
 {"userId":"105","recordId":"1","recordType":1,"value":5,"providerId":"test-provider","eventType":"put"},
 {"userId":"105","recordId":"2","recordType":1,"value":4,"providerId":"test-provider","eventType":"put"}
]
Enter fullscreen mode Exit fullscreen mode

We'll start by adding a Go model for these rating records to src/rating/pkg/model/rating.go file which already contains the rating structure.

// RatingEvent defines an event containing a rating record
type RatingEvent struct {
    RecordID   RecordID        `json:"recordId"`
    RecordType RecordType      `json:"recordType"`
    UserID     UserID          `json:"userId"`
    Value      RatingValue     `json:"ratingValue"`
    EventType  RatingEventType `json:"eventType"`
}

// RatingEventType defines a type of rating event
type RatingEventType string

// Rating Event Types
const (
    RatingEventTypePut    = RatingEventType("put")
    RatingEventTypeDelete = RatingEventType("delete")
)
Enter fullscreen mode Exit fullscreen mode

Now we'll add an example application, that'll read the data from the given file and publish it to Kafka.
Create a file cmd/ratingingester/main.go with the content:

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "movieexample.com/rating/pkg/model"
)

// The main function creates a Kafka producer, reads rating events from a file, and sends them to a Kafka topic.
func main() {
    fmt.Println("Creating a Kafka producer")

    producer, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        log.Fatalf("Failed to create producer: %s\n", err)
    }
    defer producer.Close()

    const fileName = "ratingsdata.json"
    log.Println("Reading data from file", fileName)

    ratingEvents, err := readRatingEvents(fileName)
    log.Printf("RatingData read from file: %v\n", ratingEvents)
    if err != nil {
        log.Fatalf("Failed to read rating events: %s\n", err)
    }

    const topic = "ratings"
    log.Println("Sending rating events to topic", topic)

    if err := produceRatingEvents(producer, topic, ratingEvents); err != nil {
        panic(err)
    }

    const timeout = 10 * time.Second
    log.Printf("Waiting for %s to send rating events\n", timeout)

    producer.Flush(int(timeout.Milliseconds()))
}

func readRatingEvents(fileName string) ([]model.RatingEvent, error) {
    f, err := os.Open(fileName)
    if err != nil {
        return nil, err
    }
    defer f.Close()

    var ratingEvents []model.RatingEvent
    if err := json.NewDecoder(f).Decode(&ratingEvents); err != nil {
        return nil, err
    }

    return ratingEvents, nil
}

func produceRatingEvents(producer *kafka.Producer, topic string, ratingEvents []model.RatingEvent) error {
    for _, ratingEvent := range ratingEvents {
        encodedEvent, err := json.Marshal(ratingEvent)
        if err != nil {
            return err
        }

        message := &kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          encodedEvent,
        }
        if err := producer.Produce(message, nil); err != nil {
            return err
        }
    }
    return nil
}
Enter fullscreen mode Exit fullscreen mode

We initialized a Kafka.Producer via NewProducer, read the file having data, produced rating events to Kafka.

Note: We're using a Kafka client for Go for this. See the import in the above snippet.

Key points about the code that we just wrote:

  • We create a kafka producer by creating a new producer and using the localhost as the kafka address.
  • When producing the kafka events, we specified the topic(ratings) to which the events are produced to. So, if we send a message to a particular topic, only the services subscribed to that topic will get that message.
  • We flush out the messages at the end of our program to make sure that all the messages are sent to the Kafka.

Also add the ratingsdata.json file in the same directory with the following content:

[
    {
        "userId":"105",
        "recordId":"1",
        "recordType": "movie",
        "ratingValue": 5,
        "providerId":"test-provider",
        "eventType":"put"
    },
    {
        "userId":"105",
        "recordId":"2",
        "recordType": "movie",
        "ratingValue": 4,
        "providerId":"test-provider",
        "eventType":"put"
    }
]
Enter fullscreen mode Exit fullscreen mode

Now we've completed the logic for publishing events to the Kafka. We'll now write logic for consuming these events.
Create a file rating/internal/ingester/kafka/ingester.go with the following content:

package kafka

import (
    "context"
    "encoding/json"
    "log"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "movieexample.com/rating/pkg/model"
)

// Ingester defines a Kafka ingester
type Ingester struct {
    consumer *kafka.Consumer
    topic    string
}

// NewIngester creates a new Kafka ingester
func NewIngester(adrr string, groupID string, topic string) (*Ingester, error) {
    consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": adrr,
        "group.id":          groupID,
    })
    if err != nil {
        return nil, err
    }
    return &Ingester{consumer, topic}, nil
}

// Ingest starts ingesting from Kafka and returns a channel of rating events
// representing the data consumed from topics
func (i *Ingester) Ingest(ctx context.Context) (<-chan model.RatingEvent, error) {
    if err := i.consumer.SubscribeTopics([]string{i.topic}, nil); err != nil {
        return nil, err
    }

    ch := make(chan model.RatingEvent, 1)
    go func() {
        for {
            select {
            case <-ctx.Done():
                close(ch)
                i.consumer.Close()
            default:
            }

            // Waiting indefinitely
            msg, err := i.consumer.ReadMessage(-1)
            if err != nil {
                log.Println("Consumer error:", err)
                continue
            }
            var event model.RatingEvent
            if err := json.Unmarshal(msg.Value, &event); err != nil {
                log.Println("Failed to unmarshal event:", err)
                continue
            }
            ch <- event
        }
    }()

    return ch, nil
}
Enter fullscreen mode Exit fullscreen mode

We implemented a NewIngester function to create a new Kafka ingester. This will consume from a topic in background and then return a Go channel with the rating events.

To use this ingester we'll add to our rating/internal/controller/rating/controller.go

type ratingIngester interface {
    Ingest(ctx context.Context) (<-chan model.RatingEvent, error)
}

// Controller defines a rating service controller
type Controller struct {
    repo     ratingRepository
    ingester ratingIngester
}

// New creates a rating service controller
func New(repo ratingRepository, ingester ratingIngester) *Controller {
    return &Controller{repo, ingester}
}
Enter fullscreen mode Exit fullscreen mode

We'll also modify the rating/cmd/main.go to use this new Controller struct.

    repo := memory.New()
    ingester, err := kafka.NewIngester("localhost", "rating", "ratings")
    if err != nil {
        log.Fatalf("Failed to create ingester: %s\n", err)
        ingester = nil
    }
    ctrl := rating.New(repo, ingester)

    // Start the ingester in a separate go routine
    // This will keep trying to ingest the rating events from the kafka topic every 10 seconds
    go func() {
        for {
            ratingEventChannel, err := ingester.Ingest(ctx)
            if err != nil {
                log.Fatalf("Failed to ingest: %s\n", err)
            }
            for ratingEvent := range ratingEventChannel {
                // Put the rating event in the repository
                recordId := ratingEvent.RecordID
                recordType := ratingEvent.RecordType
                rating := &model.Rating{
                    RecordID:   ratingEvent.RecordID,
                    RecordType: ratingEvent.RecordType,
                    UserID:     ratingEvent.UserID,
                    Value:      ratingEvent.Value,
                }
                log.Printf("Putting rating: %v for recordId: %s recordType: %s\n", rating, recordId, recordType)
                err := ctrl.PutRating(ctx, recordId, recordType, rating)
                if err != nil {
                    log.Fatalf("Failed to put rating: %s\n", err)
                }
            }
            time.Sleep(10 * time.Second)
        }
    }()
Enter fullscreen mode Exit fullscreen mode

Now we've completed the modifications and our rating service is now able to consume the data using both synchronous and asynchronous communication.

But we still haven't installed Kafka. To do this we'll be using a docker image of Kafka.

Pull the docker image of Kafka. Note: The latest version might be different as to what it now. Check out the website for latest version.

docker pull apache/kafka:3.7.0
Enter fullscreen mode Exit fullscreen mode

Then run it using

docker run -p 9092:9092 apache/kafka:3.7.0
Enter fullscreen mode Exit fullscreen mode

This will get the Kafka up and running.

Testing

Now we'll test our implementation.

  • First run all the services and the service discovery consul container as we've done multiple times previously.
  • Now first add the movie metadata.
    Add movie metadata

  • If you now try to get the rating using GetMovieDetails, it'll show you No rating found for the record.

Get Movie Details

Get Movie Details Response

  • Now we'll run the rating ingester. Go to the rating ingester directory(cmd/ratingingester/) and run go run main.go. This will publish the rating events to the Kafka.

These rating events will automatically be added to our repository because our rating controller checks for any new rating event every 10 seconds.

Now let's do the same postman request of getting movie details. This time the result is a bit different.

Get Movie Details

We can see that the rating is the same as that present in the file because there is only one rating record in the repository.

Tadaaaa, we've completed this part. We learnt about asynchronous communication, it's benefits, drawbacks, use-cases. Also we incorporated Kafka in our movie-app to see what we'd learnt in practical way.

If you found something useful, do Like and Share. It really helps the article.

Here's the github repo: Github

See you!!

Top comments (0)