Title: Building a RabbitMQ Consumer Base Class
Table of Contents:
- Introduction
- Code
- Code Explanation
- Conclusion
Introduction
In this blog post, we will explore a code snippet that implements a base class for a RabbitMQ consumer. The RabbitMQConsumerBase
class provides a foundation for building custom RabbitMQ consumers by defining the necessary methods and functionality to consume messages from a queue.
Code
from abc import ABC, abstractmethod
from connection import RabbitMQConnection
from config import Config
class RabbitMQConsumerBase(ABC):
def __init__(self, queue_name, binding_key, host, port, username, password):
self.queue_name = queue_name
self.binding_key = binding_key
self.host = host
self.port = port
self.username = username
self.password = password
@abstractmethod
def process_message(self, channel, method, properties, body):
pass
def on_message_callback(self, channel, method, properties, body):
self.process_message(channel, method, properties, body)
channel.basic_ack(delivery_tag=method.delivery_tag)
def consume(self):
config = Config()
with RabbitMQConnection(self.host, self.port, self.username, self.password) as connection:
channel = connection.get_channel()
channel.exchange_declare(exchange=config.EXCHANGE_NAME, exchange_type="topic")
channel.queue_declare(queue=self.queue_name)
channel.queue_bind(queue=self.queue_name, exchange=config.EXCHANGE_NAME, routing_key=self.binding_key)
channel.basic_consume(
queue=self.queue_name,
auto_ack=False,
on_message_callback=self.on_message_callback,
)
print(f"Started consuming messages from queue: {self.queue_name}")
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
print("Consumer stopped...")
Code Explanation
The code snippet introduces the RabbitMQConsumerBase
class, which serves as a base class for implementing RabbitMQ consumers. Here's a detailed explanation of the code:
- Class Structure
The RabbitMQConsumerBase
class is defined as an abstract base class (ABC
) using the abc
module. This means it cannot be instantiated directly and must be subclassed to provide specific implementations.
The class constructor (__init__
) accepts parameters such as queue_name
, binding_key
, host
, port
, username
, and password
, which define the necessary connection and queue details.
- Abstract Method
The class defines an abstract method called process_message
. Subclasses must override this method to provide custom processing logic for received messages. The method takes the channel
, method
, properties
, and body
as parameters.
- Message Processing
The on_message_callback
method is responsible for invoking the process_message
method of the subclass and acknowledging the message. It receives the channel
, method
, properties
, and body
as parameters.
Within the on_message_callback
method, the process_message
method is called, passing the received parameters. After processing the message, the consumer acknowledges it using channel.basic_ack
to inform RabbitMQ that the message has been successfully processed.
- Consumption Logic
The consume
method encapsulates the logic for consuming messages from the RabbitMQ queue.
- It creates an instance of the
Config
class to retrieve configuration settings. - Within a
with
block, aRabbitMQConnection
instance is created, using the provided connection details. - The
get_channel
method is called to obtain
a channel for communication with RabbitMQ.
- The necessary exchange, queue, and binding configurations are performed using the channel.
- The
basic_consume
method is invoked to start consuming messages from the specified queue. - The consumer then enters a consuming loop using
channel.start_consuming()
. - The loop can be interrupted by a keyboard interrupt (
KeyboardInterrupt
), which triggers thechannel.stop_consuming()
method. - Once the loop is exited, a message is printed to indicate that the consumer has stopped.
Conclusion
The RabbitMQConsumerBase
class provides a foundation for implementing RabbitMQ consumers. By subclassing this base class and overriding the process_message
method, you can define custom logic for processing received messages. The class abstracts away the complexities of establishing a RabbitMQ connection, declaring exchanges and queues, and handling message consumption, allowing you to focus on implementing the specific business logic of your consumer.
Top comments (0)