Intro
By now almost everyone knows or at least has heard about "async/await" paradigm on JavaScript/Node.JS. But it seems that async processing is a lot less common in Python. Python feels like "old school" language when compared to JavaScript, it really feels old sometimes. But it's very clear, concise and developers can be really productive in it. It's also very similar to JS with its somewhat loose-but-not-so-loose typing, variable assignment etc.
One big difference is syntax - in JS whitespaces in the code (almost) don't matter while in Python these are important.
However, I try to keep this post short and will focus on a specific task:
- how to use MongoDB asynchronously in Python
Async/await - why bother at all?
First and foremost reason for me is concurrency. With async processing developer can run things in parallel without too much hassle and tinkering with threads or processes. It has its limits and risks but for simple cases it does the job. Async/await will come extra handy in those cases when there's a lot of I/O operations involved. Be it disk reads/writes or accessing the database. Here we are talking about accessing the database, and namely MongoDB.
MongoDB and Python
MongoDB itself feels very "javascriptish" so to say. If you're a seasoned Node.js developer like myself it feels very comfortable and cozy. MongoDB shell has the same language that your code runs in - JavaScript. You can copy-paste code and objects between your IDE/project and MongoDB shell or MongoDB IDE. And so on and on.
For Python-only developers it may be a bit hard to switch between languages-contexts but it's doable. My brain still has 2 halves but these are not called "left half" and "right half" any more but "JS half" and "Python half" ;) Half a brain still does the job, haaa:)
There's an old good Python API for MongoDB - it's called PyMongo. It gets the job done and even more but where's the fun in all that old school serial, non-async processing?
Motor to the rescue
There's a really good async driver API for MongoDB: Motor
Here are some examples.
Connecting to database
from motor.motor_asyncio import AsyncIOMotorClient
uri = "mongodb://dev:dev@localhost:27017/mydatabase?authSource=admin"
client = AsyncIOMotorClient(uri)
The same with some connection args
from motor.motor_asyncio import AsyncIOMotorClient
uri = "mongodb://dev:dev@localhost:27017/?authSource=admin"
connection_args = {
"zlibCompressionLevel": 7,
"compressors": "zlib"
}
client = AsyncIOMotorClient(uri, **connection_args)
Getting database and collection instance from client:
db = client.get_database("mydatabase")
collection = db.get_collection("mycollection")
Most common database operations
Find
async def find():
"""
This method finds items from MongoDB collection and
processes these by using another asynchronous method
:return:
"""
collection = db.get_collection("mycollection")
filter_ = {
"someField": "someValue"
}
projection_ = {
"_id": False # don't return the _id
}
cursor = collection.find(filter=filter_, projection=projection_)
# it gets interesting here now. Iterate over the cursor asynchronously
async for item in cursor:
await do_something_in_an_async_worker_method(item)
async def find_cursor_to_list():
"""
This method finds items from MongoDB collection and
asynchronously converts cursor to a list with items
:return:
"""
collection = db.get_collection("mycollection")
filter_ = {
"someField": "someValue"
}
projection_ = {
"_id": False # don't return the _id
}
cursor = collection.find(filter=filter_, projection=projection_)
# Convert the cursor to a list of items right away.
# NB! Dangerous with large result sets
items = await cursor.to_list(length=500)
return items
Update / upsert
from datetime import datetime
async def update():
"""
This method updates data in MongoDB asynchronously
:return:
"""
collection = db.get_collection("mycollection")
filter_ = {
"someField": "someValue"
}
data = {
"someNewField": "aNewValue"
}
update_ = {
"$set": data,
"$currentDate": {
"updatedAt": True # set field updatedAt to current date automagically. Good practice ;)
},
"$setOnInsert": {
"createdAt": datetime.utcnow()
# set field createdAt to current date automagically ONLY IF it's a new record
}
}
# if upsert=True and record is not found then it is created
await collection.update_one(filter_, update_, upsert=True)
Aggregation
It's also possible to run aggregation pipeline with Motor.
async def aggregate():
"""
This method run async aggregation
:return:
"""
collection = db.get_collection("mycollection")
pipeline = [
{
"$match": {
"foo": "bar"
}
},
{
"$group": {
"_id": None,
"total": {"$sum": 1}
}
},
{
"$sort": {"foo": -1}
}
]
# mind the use of **allowDisk** argument. It's necessary for all
# bigger result sets that are sorted
cursor = collection.aggregate(pipeline=pipeline, allowDiskUse=True)
results = await cursor.to_list(length=1000)
return results
Bulk writes
Motor can be used for bulk writes that would improve performance
when storing may items from big lists of results of some kind of processing.
Here's an example:
from pymongo import UpdateOne
from datetime import datetime
async def bulk_save(very_big_list: list = list()):
"""
:param very_big_list: List with a lot of items. Or an item generator ... ?
:return:
"""
collection = db.get_collection("mycollection")
# this defines batch size that will be added to database at once
BATCH_SIZE = 500
updates = list()
for item in very_big_list:
filter_ = {
"foo": item.get("foo")
}
update_ = {
"$set": item,
"$currentDate": {
"updatedAt": True
},
"$setOnInsert": {
"createdAt": datetime.utcnow()
}
}
u = UpdateOne(filter=filter_, update=update_, upsert=True)
updates.append(u)
# if list of updates is filled up to BATCH_SIZE push data to database
if len(updates) >= BATCH_SIZE:
await collection.bulk_write(updates, ordered=False)
# re-initialize list of updates
updates = list()
# add all remaining items to database
if len(updates) > 0:
await collection.bulk_write(updates, ordered=False)
Summary
Since Python is by default "synchronous" and not event-loop-based language it may be a bit hard to get accustomed to async/await pattern in Python.
In this article I gave an overview how to access MongoDB asynchronously in Python by giving examples on connecting the database with Motor client library, on querying and updating data, running aggregations and bulk updates.
Thanks for reading!
Top comments (0)