DEV Community

SSK
SSK

Posted on • Updated on

How to use PyZMQ / PUB-SUB /PUSH-PULL

Hello world, Let's get into the topic, The Best way to Use pyZMQ.
A few days ago I started using PyZMQ from the blog tutorial in dev.to,
and also I found some code online with flask optimized I thought of reusing, I have reused the code and it worked fine, after that, I started coding for other functions when testing I found that the method PUB-SUB in PYZMQ is not suitable for that task, I will explain about the task in the bottom.

This is how Messaging Queue Works:

Two things we need to remember
1.Client
2.Server

The Client sends a message to the server.
The Server process the message(DB actions or Sending emails etc... )

Let's code

client.py

#importing ZMQ
import zmq
#Creating a context
context = zmq.Context()

#creating and connceting to a socket.
socket = context.socket(zmq.PUSH)
socket.connect('tcp://localhost:5555')

#function that sends data to the server
def send_data(data):
    # we're sending string to the server as of now..
    socket.send_string(f'{data}')

    #socket.send_json({"data":[1,2,3,4]})
    #you can send lot of things, check out ZMQ official docs

#Closing connection(We don't need this one, EXTRA).
def exit():
    socket.close()
    context.term()

while 1:
#getting data from user to send to there server
    send_data(input("Enter data to send: "))
exit()
Enter fullscreen mode Exit fullscreen mode

Server.py

import time
#importing
import zmq
#creating context
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5555')
#Creating poller
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)

while True:
    socks = dict(poller.poll())
    #if any new message in poller it will PULL that message form the poller.
    if socket in socks:
        message = socket.recv_string()

        #message = socket.recv_json()
        #you can reccieve lot of things, check out ZMQ official docs
        print("sleeping 20 seconds")
        time.sleep(20)#or Call some other function to change or modify
        print(message)

        #If you're using SQLAlachemy
        #session.commit() or db.session.commit()

Enter fullscreen mode Exit fullscreen mode

for saving and sharing links online..

checkout sharely.in

The Tutorial ends here. You can stop here, if you want to.


Why and when I used messaging Queue:-

I'm working in a project which is transferring a task to the like-minded people, It fetches all the like-minded users with some filters and sends this task to those targeted users,
So we decided to do this in with ZMQ I did that, but suddenly I found that when doing one task it rejects/not capturing all the triggering tasks at that particular time. I have the PUB-SUB model in ZMQ.
The code for that one is(Correct me if I'm wrong)
sender.py

# import time
# import zmq

# HOST = '127.0.0.1'
# PORT = '6666'

# _context = zmq.Context()
# _publisher = _context.socket(zmq.PUB)
# url = 'tcp://{}:{}'.format(HOST, PORT)

# # socket =_context.socket(zmq.REQ)
# # socket.connect('tcp://{}:{}'.format(HOST, PORT))

# def publish_message(message):

#     try:
#         _publisher.bind(url)
#         # time.sleep(1)
#         # print(url , message)
#         _publisher.send_string(message)
#         # socket.send_string(message)

#     except Exception as e:
#         print ("error {}".format(e))

#     # finally:
#         # _publisher.unbind(url)
#while 1:
    #publish_message('response..')
Enter fullscreen mode Exit fullscreen mode

server.py

# import sys
# import time
# import logging
# import os
# import zmq
# import time

# HOST = '127.0.0.1'
# PORT = '6666'

# logging.basicConfig(filename='subscriber.log', level=logging.INFO)


# class ZClient(object):

#     def __init__(self, host=HOST, port=PORT):

#         self.host = host
#         self.port = port
#         self._context = zmq.Context()
#         self._subscriber = self._context.socket(zmq.SUB)
#         # print ("Client Initiated")


#     def receive_message(self):
#         """Start receiving messages"""
#         self._subscriber.connect('tcp://{}:{}'.format(self.host, self.port))
#         self._subscriber.setsockopt(zmq.SUBSCRIBE, b"")

#         while True:
#             print("LISTENING:-")
#             # print( 'listening on tcp://{}:{}'.format(self.host, self.port))
#             self.message = self._subscriber.recv_string()
#             print(self.message)
#             logging.info(
#             '{}   - {}'.format(self.message, time.strftime("%Y-%m-%d %H:%M")))
#             # time.sleep(1)
#             self.result()


#     def result(self):
#         time.sleep(10)
#         print(self.message)


# if __name__ == '__main__':
#     zc = ZClient()
#     zc.receive_message()
Enter fullscreen mode Exit fullscreen mode

correct the code if it's wrong...

Top comments (0)