ActiveMQ is an open source message-oriented middleware which is used to transfer messages between multiple applications. It consists of a queue which will hold in the messages and will only transfer when the receiver is available, hence we can be rest assured that the messages won’t be lost if the server is down. The applications could be entirely different and hosted on heterogenous platforms, ActiveMQ can be used to communicate between such systems as well.
For example, large applications built in microservices architecture and written in disparate programming languages, where communication is often but an offline service should not cause any loss of messages.
Working of ActiveMQ:
There are two endpoints in ActiveMQ,
- Producers – who send the messages to the message broker.
- Consumers – who read these messages one at a time.
ActiveMQ stores the messages in the form of queue or topic.
- Queue: A message is sent to a single consumer and if the consumer is unavailable then it is held by activemq until the consumer is available to receive the message.
- Topic: A message copy is sent to all of the subscribers who are interested in a particular topic. Unavailable consumers will not be receiving the past messages.
Now let us see how to create this consumer code and subscribe to an ActiveMQ topic in Python.
Python provides a client library called stomp.py for connecting with any type of message brokers using STOMP protocol.
Step1: install the latest version of stomp available.
pip install stomp.py
Step2: The consumer script.
import stomp
#Establish a connection
con = stomp.StompConnection([(‘Host’,port)])
#listener class to be instantiated.
listener = MsgListener()
con.set_listener(‘name_of_listener’, listener)
#wait will ensure it waits till connection is established and acknowledged.
con.connect(‘usernanme’, ‘password’, wait=True)
#subscribe to a particular topic or queue by giving the path and headers if required by the server.
con.subscribe(‘topic/path’, headers={})
Step3: Listener class for your processing.
Class MsgListener(stomp.ConnectionListener):
def __init__(self):
# to keep the count of messages received
self.msg_recieved = 0
def on_error(self, message):
logger.error(‘received an error “%s”’ %message)
def on_message(self, message):
message = json.loads(message)
self.msg_received +=1
# add your logic based on the message received here
If the con.subscribe
is being done to a topic, then a published message will go to all the subscribers who have active subscription at the time of message published.
Whereas, if it is a queue only one consumer will receive one message. Messages are load balanced among the available consumers.
Top comments (0)