DEV Community

Reza Alipour
Reza Alipour

Posted on

Handling variable-length Kafka tasks using Python

Let's say you have a service that takes customers' requests, distributes them as events amongst several workers, waits for the workers to perform a long-running process for each request, and gathers the results to send back to customers. If you intend to use Apache Kafka as the medium to populate events and you are not careful, chances are you are going to encounter a certain predicament: Your requests will be processed more than once by different workers!

TL;DR: Handling Kafka event consumption when processing events needs a long variable-length time is particularly challenging. This post is a discussion on this challenge and possible solutions for it. A simple implementation of a chosen solution, which is accessible via this link, is also described.

Introduction

Kafka is a prominent event-streaming infrastructure. In Kafka's world, events (or requests in our simple scenario) are generated by a multitude of entities called producers. Kafka classifies these events into a predefined set of semantically close groups called topics. Producers decide to which topic their event belongs. Consumers are entities that read these events and do something over them accordingly. Each topic can have multiple producers and consumers.

Consumers can cooperate and form a group to share a topic and read from it collectively. Producers and consumers are meant to be fully decoupled as they can write (push) or read (pull) events whenever they want. Kafka is responsible to manage and retain those events. Since topics are accessible to read from and write to through more than one interface called brokers, Kafka partitions events of a topic once more to avoid any inconsistencies between the topic's consumers connected to different brokers. Each topic-partition is assigned to a unique consumer. Consumers can have more than one topic-partition. Upon event receipt, each consumer of a group must send a delivery acknowledgment (commit) to the broker so Kafka doesn't send it again to the group.

Note that this design doesn't permit consumers of a topic to be more than partitions. Partitions are where Kafka respects the order of events and will serve events to the consumer in a FIFO1 manner. Ordering is not preserved at the topic level in Kafka.

The problem

While you are the one in charge of defining topics, the number of partitions, and the number of consumers up to the number of partitions, Kafka decides which topic-parition is assigned to which consumer from the consumer group. It constantly checks for live consumers and designates a topic-partition. If a consumer disconnects from a broker, Kafka reassigns its topic-partition to another consumer, a procedure called Rebalancing.

The point is, Kafka needs to know which of the consumers are alive and which of them have disconnected from the broker to be able to perform rebalancing. Therefore, Kafka mandates every consumer in the consumer group to send a heartbeat at least every max.poll.interval.ms milliseconds to affirm their presence. This is also the interval in which the consumer will request new events from the Kafka broker.

So far so good. If your consumers receive an event, perform a quick process on it, commit, and are ready to take the next event in a time much shorter than max.poll.interval.ms, congratulation! You are all set. The problem arises when your single-threaded application takes more than max.poll.interval.ms to process an event and consequently misses the heartbeat deadline.

Let's see why you don't want this to happen. Suppose events A is in topic t and topic-partition t-p and two single-threaded consumers alpha and beta have formed a group to read from topic t. Also, suppose Kafka assigns t-p to alpha and it consumes event A immediately. If alpha takes more than max.poll.interval.ms to send the next heartbeat, Kafka assumes alpha has been disconnected from the group and reassigns t-p to the beta at the next Rebalancing. Now since there is no commit to exclude A, Kafka will resend the A to beta. If the processing time for A is long enough to repeat the pattern for B as well, your service will be stuck in a livelock. It's important to mention that if alpha tries to send commits during or after rebalancing, it will fail.

How to mitigate the situation

The trivial solution would be to set max.poll.interval.ms big enough to avoid such a situation altogether. There are two downfalls to this quick patch. First, if the time to process your tasks varies significantly, as is indeed the case for many machine learning tasks where computation time depends on the size of the dataset or algorithm, your chances of finding a threshold that guarantees all your tasks will fall below it are slim. In addition, as discussed here, if you want to introduce new consumers to the group, or you need a rebalance due to failed consumers, you will have to wait a significant amount of time.

Another somewhat obvious workaround would be to commit right after event receipt and before processing it, but that exacts reducing processing acknowledgment to just a delivery acknowledgment. If the process fails for a reason, the event is lost. Some might argue having another pipeline from the consumer to the producer will notify the producer of the lost event. That will not be of much help since you won't get any response pertaining to the failed event. That means you have to specify a duration threshold to distinguish between in-process events from failed events and resend failed events to the broker, which is
not easy when your tasks are variable-length.

The third solution expands on the previous one and assumes the application has a dedicated main thread for dealing with the Kafka broker and a bunch of worker threads to process the incoming events. The main thread receives events and caches them in a local queue, from which it can feed the workers. The main
thread distributes events while sending constant heartbeats to the Kafka broker. It then gathers the results from workers, commits accomplished tasks, and reassigns failed tasks.

This is particularly tricky to implement for Kafka as Kafka doesn't accept commits of individual events. What you have to commit is just an offset that points to the last place where events have been processed. As a result, the main thread must ensure every event up to a certain point is processed before sending a commit of that point. Therefore, if the main thread crashes before all consecutive events up to a point are processed, those events will all be processed again after rebalance.

A workaround for the above problem would be to transform every worker into a unique consumer and assign at least one topic-partition to each worker discretely instead of having them obtain events from the main thread. Every worker would have two threads: one responsible to accept events and sending constant heartbeats and another to process events as they arrive. Events will be immediately acknowledged as their process completes and also the consumers won't occupy themselves with queue management and
separation of concerns will follow. This is our solution of choice in this post.

Next, we are going to talk about how to implement this solution in Python.

Python Implementation

This implementation is not meant to be bug-free and production-ready and is just a glimpse into what the proposed solution might look like. Additionally, we assume the producer application is already in place and produces events. The consumer application receives events as they are produced, process them, commits them on Kafka, and sends a new event containing the results to a separate topic to inform the producer of the result of the process. We also use Protocol Buffers to serialize our communication.

What every worker is about to do is cpu-bound. Therefore, separating workers as threads is not ideal as threads are meant for io-bound tasks rather than cpu-bound tasks. We need a module to provide a platform to spawn multiple worker instances as processes and execute them. The following module would do the job:

from pathos.multiprocessing import Pool
from tools.log.logger import Logger
from typing import Callable
import os
import signal


# to ignore events being sent to process children. The parent must
# take care of the event handling, e.g., keyboard interrupts
def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)



class WorkerPool(Logger):

    '''
    worker pool gets a worker routine function and spawns a certain 
    number of processes with that routine.

    worker_routine: the worker function to be spawn and executed
    number_of_workers: number of workers to be created and executed

    after instantiation call the execute method and pass along the argument
    you want to send to the worker. 
    '''

    def __init__(self, worker_routine: Callable, number_of_workers: int=os.cpu_count()) -> None:

        # instantiate a process-safe logger
        Logger.__init__(self, "worker_pool")

        # validate the inputs before proceeding
        if not isinstance(number_of_workers, int):
            raise TypeError("number of workers is not correct")

        if not callable(worker_routine):
            raise TypeError("no worker specified")

        # limit the number of worker to the number of cpus 
        self.__number_of_workers = min(number_of_workers, os.cpu_count())
        self.__worker_routine = worker_routine

        if(self.__number_of_workers < number_of_workers):
            self.logger.warning(f"number of cpus limit dictated {self.__number_of_workers} workers instead of {number_of_workers} you provided")



    def execute(self, *args, **kwargs):

        self.logger.info(f"starting worker pool with {os.cpu_count()} cpus and {self.__number_of_workers} workers")

        # augment worker's arguments with its id
        def worker_routine_wrapper(worker_number):
            kwargs["worker_number"] = worker_number
            self.__worker_routine(*args, **kwargs)


        # Use initializer to ignore SIGINT in child processes
        with Pool(initializer=init_worker) as mp_pool:

            try:

                mp_pool.map(
                        worker_routine_wrapper,
                        range(self.__number_of_workers)
                    )

            except KeyboardInterrupt:
                self.logger.info("worker pool shutting down")
                return
Enter fullscreen mode Exit fullscreen mode

Note that we have used the pathos fork to create the process pool. You can follow this discussion
to understand the reasoning behind it. If we were to use the native multiprocessing, we had to define the worker routine inside the module instead of getting it as an argument.

Next, we have to specify our worker routine. Therefore, we define a function that takes the Kafka configuration object, populates a template function object with the configuration, and returns the resultant function, which could be used as the routine for our WorkerPool. The config for our setup includes Kafka's broker endpoint, the topic where events will be sent, the consumer group's id, and a response topic to which consumers will write process results. We use pydantic to get this information from environment variables:

from pydantic import BaseSettings

class KafkaConfig(BaseSettings):
    kafka_bootstrap_server: str
    kafka_request_topic: str
    kafka_ack_topic: str
    kafka_consumer_group_id: str

    class Config:
        env_prefix = ''
        case_sensitive = False
        allow_mutation = False
Enter fullscreen mode Exit fullscreen mode

In addition, the Protocol Buffers to enact the communication is as follows:

syntax = "proto3";
package communication;

message Request {
    int32 sequence = 1;
    string payload = 2;
}

message Response {
    int32 sequence = 1;
    bool isprocessed = 2;
}
Enter fullscreen mode Exit fullscreen mode

We use the confluent_kafka library, a prominent Python client to connect to Kafka's broker. The worker routine creates a consumer and a producer to send acknowledgments. It loops to poll events from Kafka. When one is received, a service thread is spawned and given the event for processing. It then constantly sends heartbeats to the Kafka broker while waiting for service results. After receiving the result from the service thread, it resumes getting new messages. The handler function that returns the worker routine function seems like this:

def KafkaHandler(config: config.KafkaConfig) -> Callable:


    # the worker pool provides a worker_number to each worker 
    # for logging purposes. 
    def routine(worker_number, *args, **kwargs):

        # callback to be called when worker thread has done its work
        def callback(future):
            logger.info(f"{future.result()}")

        try:

            # instantiate a logger
            logger = Logger(f"kafka_handler[{worker_number}]").getLogger()

            # each worker is supposed to act as a unique consumer 
            # in the consumer group with one or more topic-partitions 
            # assigned to it 
            c = Consumer({
                    'bootstrap.servers': config.kafka_bootstrap_server,
                    'group.id': config.kafka_consumer_group_id,
                    'auto.offset.reset': 'earliest',
                    'enable.auto.commit': False,
                })

            # each worker will send acknowledgement events to producers 
            # to inform them of the process result. 
            p = Producer({
                'bootstrap.servers': config.kafka_bootstrap_server,
            })

            logger.info(f"starting kafka handler")

            # subscribe to the requst topic to receive events
            c.subscribe([config.kafka_request_topic])

            # iterate indefinitely to poll new events and send heartbeats
            # if the worker is through a process the polling must pause 
            # message reception and merely send heartbeats to the broker. 
            while True:

                msg = c.poll(2.0)

                if msg is not None: 

                    if msg.error():
                        logger.error(f"consumer error: {msg.error()}")
                        continue

                    # generate a thread for every event and get a future
                    # to check. 
                    executor = ThreadPoolExecutor(max_workers= 1)

                    # service is a Callabe containing the business logic 
                    # to process events. service will receive the event itself,
                    # the consumer, the producer, config, and every argument that 
                    # is passed to the worker routine at execution time. 
                    future = executor.submit(
                        service, 
                        msg=msg, 
                        consumer=c,
                        producer=p,
                        config=config,
                        logger= logger,
                        *args,
                        **kwargs
                        )

                    future.add_done_callback(callback)


        # graceful shutdown
        finally:
            p.flush()
            c.close()  

    return routine
Enter fullscreen mode Exit fullscreen mode

We define service in KafkaHandler in the same module. The service receives the event alongside the consumer and producer. First, it pauses event reception at the worker, then it proceeds to parse the incoming protobuf message and process it. Upon successful completion of the process, it tries to send the result in the ack topic and waits for the Kafka broker to confirm that the ack has been registered at the broker. If the ack is sent without error, the service thread commits to the request topic,
otherwise, it just terminates the service. The service function is as follows:

def service(msg, 
            consumer: Consumer, 
            producer: Producer, 
            config: config.KafkaConfig, 
            logger: Logger,
            *args, 
            **kwargs):

    # when worker is done processing the event, it has to send 
    # an acknowledgement event to another topic to inform producers
    # of the process result. The worker will commit the event only when
    # it is assured that this ack has been set in Kafka, otherwise the event
    # is not considered complete and worker won't commit. 
    # this is a callback function for the producer that sends the ack
    def acked(error, message):

        if error is None:

            # deserialize ack event
            parsedData = communication_pb2.Response()
            parsedData.ParseFromString(message.value())


            consumer.commit(asynchronous=False)
            logger.info(f"ack sent: {parsedData.sequence}")
        else:
            logger.error(f"error sending ack: {error}, will not commit")

    # main thread can pass arbitrary arguments to workers at
    # execution time to be used in workers. Note that these 
    # arguments are not worker specfic and each worker receives
    # them
    ### process defined arguments
    dummy = kwargs["dummy"]

    try:

        # pause event consumption until the process is complete
        consumer.pause(consumer.assignment())

        # deserialize incoming event 
        parsedData = communication_pb2.Request()
        parsedData.ParseFromString(msg.value())

        ### process data here 


        ### send the ack result to the kafka
        ack = communication_pb2.Response()
        ack.sequence = parsedData.sequence
        ack.isprocessed = True

        # each worker is also a producer that sends ack events 
        # to a response topic. Producers consume these events
        # to get a sense of their events. 
        producer.produce(
            config.kafka_ack_topic, 
            key=msg.key(), 
            value=ack.SerializeToString(),
            callback=acked
            )

        # wait a certain amount of time for the producer
        # to send the ack. if the operation fails somehow 
        # worker won't commit and event will be processed again 
        producer.poll(1)
        result = f"event has been processed: {parsedData.sequence}"

    except Exception as ex:
        result = f"error processing the event: {ex}"

    # resume reception of events
    finally:
        consumer.resume(consumer.assignment())

    return result
Enter fullscreen mode Exit fullscreen mode

Now we are ready to fire up our consumer in the main module:

from controller.kafka.kafka_handler import KafkaHandler

from tools.pool.worker_pool import WorkerPool
from tools.kafka.config import KafkaConfig



def runner():

    config = KafkaConfig()

    handler = KafkaHandler(config)

    pool = WorkerPool(handler)

    # every argument for the service functin needs to be passed here 
    pool.execute(dummy="dummy")

if __name__ == "__main__":

    runner()

Enter fullscreen mode Exit fullscreen mode

Thanks for reading.


  1. First-In First-Out 

Top comments (0)