TL;DR – I built a python event driven application leveraging a publicly available real-time stream of COVID-19 data updates using Solace PubSub+ Event Broker as the messaging infrastructure. Check out the source code in this github repo under samples/python
SolaceLabs / covid19-stream-processors
Stream Information & Example Applications for Processing JHU and CovidTracking.com COVID-19 data available as streams over Solace
Phew! Now that the TL;DR elephant in the room is out of the way, let’s get to the core of it 🐘
Right now with the situation the world is currently in, some of us are probably thinking “hmm what can I do to contribute back to society? If I can't directly contribute something, what can I learn during quarantine to polish my skills and at the same time build something useful?” I don’t know about you, but I definitely enhanced my baking skills with the amount of banana bread loaves I baked (and ate). Apart from baking, I took this situation as an opportunity to learn new technologies and have something relatable to implement it on (i.e. COVID). I chose Python as the programming language of choice to learn and complement all what I know in event driven architectures.
In a previous blog post, I discussed how I built an event-driven NodeJS application on real-time COVID-19 data streams. I also covered some background on what event-driven is, why is it important and how to use the publish/subscribe messaging pattern to approach building and EDA application, so I recommend reading it if you haven’t already 🤗 . If JavaScript/NodeJs is not your cup of tea, then worry not! In this blogpost, I will be sharing how I used Python 🐍 to build a simple event driven application using the same COVID-19 streams of data with the same three basic requirements.
Tech Stack
Before building any application and work on a project, I start by drafting my architecture design, the different tools to be used and the programming language I need to successfully get an MVP up and running. With this approach in mind, I will be using the following tech stack:
- Programming Language: Python
- Messaging Protocol: MQTT
- Message Broker: Solace PubSub+
- Data source: COVID-19 data provided by the publisher application as per this documentation.
Note that in my tech stack, I didn’t list any REST API or client/server architecture 🤝 Done. Next!
1. Environment Setup & Prerequisites
To setup my Python environment, I will be using virtualenv. In a nutshell, virtualenv is a tool used to isolate Python environments by creating a directory structure encapsulating all the installed packages you need in the application you’re building. This way, you wont be contaminating your global Python packages with anything required by your application. You can read more about it here and on why it is a recommended approach for Python development.
Note: If you come from a NodeJS background, think in terms of node_modules.
In a new terminal, execute the following command to create a new directory for your project, setup the Python virtual environment and activate it:
mkdir covid_python && cd "$_" && virtualenv env && source env/bin/activate
Now install the paho-mqtt python package:
pip install paho_mqtt
2. Skeleton code setup
Create a new covid.py
file and initialize it with the following:
import paho.mqtt.client as mqtt
# Solace Broker Info
url = "mr2r9za6fwi0wf.messaging.solace.cloud"
username = "covid-public-client"
password = "covid19"
def on_connect(client, data, flags, rc):
assert (rc == 0), "Error Connecting. Return code: " + str(rc)
print("Connected to the COVID PubSub+ Solace Broker!")
client = mqtt.Client()
client.username_pw_set(username=username, password=password)
client.on_connect = on_connect
client.connect(url)
client.loop_forever()
What you did above is import the paho-mqtt
package and initialized an mqtt client using the configuration parameters.
It’s important to note the following:
The
client.loop_forever()
is a blocking function call that processes network traffic, dispatched callbacks and handles reconnecting. You can read more about other loop*() functions in the main documentation page..on_connect
is a function called when the broker responds to our connection request. We assign this callback to theon_connect(client, userdata, flags, rc)
function that will get executed upon a successful connection.
You now have a runnable skeleton to test out our connection. From your terminal execute the following
python covid.py
You should see Connected to the COVID PubSub+ Solace Broker!
on your terminal. Saweeet! Connection established 🎉. To stop the execution of the program, press CMD+c
(or Ctrl+c
if you swim in the Windows domain).
3. Handle events
To handle the incoming messages from the PubSub+ Broker in an event-driven manner, we have to first subscribe to the topic of interest (according to the available topics) and handle the messages with the second type of callback function: .on_message
.
import paho.mqtt.client as mqtt
def on_connect(client, data, flags, rc):
assert (rc == 0), "Error Connecting. Return code: " + str(rc)
print("Connected to the COVID PubSub+ Solace Broker!")
for t in topics:
print("Subscribing to: " + t)
client.subscribe(t)
def on_message(client, data, msg):
print("Received message on: %s\n %s" % (msg.topic, msg.payload.decode('ascii')))
# Broker Info
url = "mr2r9za6fwi0wf.messaging.solace.cloud"
username = "covid-public-client"
password = "covid19"
topics = [
"jhu/csse/covid19/test/cases/+/update/Canada/Ontario/#",
"com/covidtracking/states/current/update/New York",
]
client = mqtt.Client()
client.username_pw_set(username=username, password=password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(url)
client.loop_forever()
In the above code program:
- I added an
on_message(client, data, msg)
function to handle the.on_message
callback from the client. Note the decoding of the received message payload to ascii since we receive the payload in binary format. - There are subscriptions to two different topics
- All cases updates in Ontario, Canada obtained from the John Hopkins University data source. I used a test stream.
- All cases updates in New York obtained from The Covid Tracking project data source. Note you can use the topic
com/covidtracking/states/current/get/raw
to get raw updates from all the states in the United States
Run the code and wait for updates to come in!
A note on JSON
Since the returned format of the message payload is in JSON, you can use the python json package to handle, manipulate and BEAUTIFY your output. Here is the full code for your application
import paho.mqtt.client as mqtt
import json
def on_connect(client, data, flags, rc):
assert (rc == 0), "Error Connecting. Return code: " + str(rc)
print("Connected to the COVID PubSub+ Solace Broker!")
for t in topics:
print("Subscribing to: " + t)
client.subscribe(t)
def on_message(client, data, msg):
msg_json_object = json.loads(msg.payload.decode('ascii'))
msg_json_pretty = json.dumps(msg_json_object, indent=2)
print("Received message on: %s\n %s" % (msg.topic, msg_json_pretty))
# Broker Info
url = "mr2r9za6fwi0wf.messaging.solace.cloud"
username = "covid-public-client"
password = "covid19"
topics = [
"jhu/csse/covid19/test/cases/+/update/Canada/Ontario/#",
"com/covidtracking/states/current/update/New York",
]
client = mqtt.Client()
client.username_pw_set(username=username, password=password)
client.on_connect = on_connect
client.on_message = on_message
client.connect(url)
client.loop_forever()
What’s Next 🤔
I would like to explore more front-end frameworks that I can use in conjunction with Python. So ideally, I will have my Python application communicate with the Solace PubSub+ Event Broker as the backend piece of the application infrastructure and a cool front-end framework to present it. Hey what’s up Django?
Closing Remarks
So here is the real question, what did YOU learn during quarantine? Share in the comments below and let’s start a discussion!
Oh and if you have any recommendations for front end frameworks I would love to 👀 what you have and 👂 your suggestions.
And to end on a good note, I’d like to leave you with one of my favorite gifs
Top comments (2)
Nice! Thanks for sharing. I need to update my python game as well.
Thanks! There's alot I want to learn in python as well 🦄