Hello world, Let's get into the topic, The Best way to Use pyZMQ.
A few days ago I started using PyZMQ from the blog tutorial in dev.to,
and also I found some code online with flask optimized I thought of reusing, I have reused the code and it worked fine, after that, I started coding for other functions when testing I found that the method PUB-SUB
in PYZMQ is not suitable for that task, I will explain about the task in the bottom.
This is how Messaging Queue Works:
Two things we need to remember
1.Client
2.Server
The Client sends a message to the server.
The Server process the message(DB actions or Sending emails etc... )
Let's code
client.py
#importing ZMQ
import zmq
#Creating a context
context = zmq.Context()
#creating and connceting to a socket.
socket = context.socket(zmq.PUSH)
socket.connect('tcp://localhost:5555')
#function that sends data to the server
def send_data(data):
# we're sending string to the server as of now..
socket.send_string(f'{data}')
#socket.send_json({"data":[1,2,3,4]})
#you can send lot of things, check out ZMQ official docs
#Closing connection(We don't need this one, EXTRA).
def exit():
socket.close()
context.term()
while 1:
#getting data from user to send to there server
send_data(input("Enter data to send: "))
exit()
Server.py
import time
#importing
import zmq
#creating context
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5555')
#Creating poller
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
while True:
socks = dict(poller.poll())
#if any new message in poller it will PULL that message form the poller.
if socket in socks:
message = socket.recv_string()
#message = socket.recv_json()
#you can reccieve lot of things, check out ZMQ official docs
print("sleeping 20 seconds")
time.sleep(20)#or Call some other function to change or modify
print(message)
#If you're using SQLAlachemy
#session.commit() or db.session.commit()
for saving and sharing links online..
checkout sharely.in
The Tutorial ends here. You can stop here, if you want to.
Why and when I used messaging Queue:-
I'm working in a project which is transferring a task to the like-minded people, It fetches all the like-minded users with some filters and sends this task to those targeted users,
So we decided to do this in with ZMQ I did that, but suddenly I found that when doing one task it rejects/not capturing all the triggering tasks at that particular time. I have the PUB-SUB
model in ZMQ.
The code for that one is(Correct me if I'm wrong)
sender.py
# import time
# import zmq
# HOST = '127.0.0.1'
# PORT = '6666'
# _context = zmq.Context()
# _publisher = _context.socket(zmq.PUB)
# url = 'tcp://{}:{}'.format(HOST, PORT)
# # socket =_context.socket(zmq.REQ)
# # socket.connect('tcp://{}:{}'.format(HOST, PORT))
# def publish_message(message):
# try:
# _publisher.bind(url)
# # time.sleep(1)
# # print(url , message)
# _publisher.send_string(message)
# # socket.send_string(message)
# except Exception as e:
# print ("error {}".format(e))
# # finally:
# # _publisher.unbind(url)
#while 1:
#publish_message('response..')
server.py
# import sys
# import time
# import logging
# import os
# import zmq
# import time
# HOST = '127.0.0.1'
# PORT = '6666'
# logging.basicConfig(filename='subscriber.log', level=logging.INFO)
# class ZClient(object):
# def __init__(self, host=HOST, port=PORT):
# self.host = host
# self.port = port
# self._context = zmq.Context()
# self._subscriber = self._context.socket(zmq.SUB)
# # print ("Client Initiated")
# def receive_message(self):
# """Start receiving messages"""
# self._subscriber.connect('tcp://{}:{}'.format(self.host, self.port))
# self._subscriber.setsockopt(zmq.SUBSCRIBE, b"")
# while True:
# print("LISTENING:-")
# # print( 'listening on tcp://{}:{}'.format(self.host, self.port))
# self.message = self._subscriber.recv_string()
# print(self.message)
# logging.info(
# '{} - {}'.format(self.message, time.strftime("%Y-%m-%d %H:%M")))
# # time.sleep(1)
# self.result()
# def result(self):
# time.sleep(10)
# print(self.message)
# if __name__ == '__main__':
# zc = ZClient()
# zc.receive_message()
Top comments (0)