Often times, you create a startup function in your services that creates data objects at various resources during the initialization of the service. A common example is creating tables in a database that various components of your service expects to be present before they can read and write data. SQLAlchemy library in Python allows you to interact with the database programmatically using database-agnostic Object Relational Mapper. Let's see how we can create database and tables by using SQLAlchemy ORM with async APIs.
Note: All examples below are in async version and written with SQLAlchemy 2.0+.
Setup
I'm using postgreSQL server running in docker.
$ docker run --name postgres-server -e POSTGRES_USER=admin POSTGRES_PASSWORD=admin -d postgres:latest
Database Creation
You may want a desired behaviour in your service that the startup hook should create database if it does not exist yet. Here's how:
import contextlib
from sqlalchemy import text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.ext.asyncio import create_async_engine
async def init_app() -> None:
async with create_async_engine(
url='postgresql+asyncpg://admin:admin@localhost:5432/postgres',
isolation_level='AUTOCOMMIT').begin() as conn:
# If db already exists, suppress the exception.
with contextlib.suppress(ProgrammingError):
await conn.execute(text(
'create database testdatabase owner admin'))
if __name__ == '__main__':
import asyncio
asyncio.run(init_app())
Here's what's happening:
-
ProgrammingError
exception is suppressed in case the database already exists. - We are connecting as a superuser to the default database
postgres
to execute create database query as a superuser. - Postgres does not allow you to create a database inside transactions, and SQLAlchemy always tries to run queries in a transaction. To get around this,
isolation_level
is set toAUTOCOMMIT
.
Tables
Declare Models
Picking an example from its rich documentation:
import typing
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import (DeclarativeBase,
Mapped,
mapped_column,
relationship)
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String, nullable=False)
fullname: Mapped[typing.Optional[str]] = mapped_column(String)
addresses: Mapped[list["Address"]] = relationship(
back_populates="user", cascade="all, delete-orphan"
)
class Address(Base):
__tablename__ = "address"
id: Mapped[int] = mapped_column(primary_key=True)
email_address: Mapped[str] = mapped_column(String, nullable=False)
user_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
user: Mapped["User"] = relationship(back_populates="addresses")
Creating An Async Engine
If you have created the database dynamically or manually, pass the database URI to the engine.
from sqlalchemy.ext.asyncio.engine import AsyncEngine
async_engine: AsyncEngine = create_async_engine(url='postgresql+asyncpg://admin:admin@localhost:5432/testdatabase')
Creating Tables
async def init_app() -> None:
async with async_engine.begin() as conn:
await conn.run_sync(lambda sync_conn: Base.metadata.create_all(
bind=sync_conn, checkfirst=True))
Here's what's happening:
We are using
run_sync
method becausecreate_all
method directly on anAsyncConnection
orAsyncEngine
object is not currently supported, as there is not yet an awaitable form. So, we are passing a synchronous connectionsync_conn
to thebind
argument.checkfirst
argument checks if the table already exists.
Usecase
Usually, services are not given superuser access but if you have a service that onboards new tenants in your multi-tenancy cluster, it is required to dynamically create database during the onboarding of tenants. You can combine dynamic database and table creation here.
When creating tables, you will have to create another engine object that points to your newly created database. You don't have to pass isolation_level
to this object unless you want to change the default one. The default is READ_COMMITTED
for postgres.
import contextlib
from sqlalchemy import text
from sqlalchemy.exc import ProgrammingError
from sqlalchemy.ext.asyncio import create_async_engine
from .models import Base
async def init_app():
async with create_async_engine(
url='postgresql+asyncpg://admin:admin@localhost:5432/postgres',
isolation_level='AUTOCOMMIT').begin() as conn:
# If db already exists, suppress the exception.
with contextlib.suppress(ProgrammingError):
await conn.execute(text('create database mydb owner admin'))
# Create another engine object which points to the newly created database.
async with create_async_engine(
url='postgresql+asyncpg://admin:admin@localhost:5432/testdatabase').begin() as conn:
await conn.run_sync(lambda sync_conn: Base.metadata.create_all(
bind=sync_conn, checkfirst=True))
if __name__ == '__main__':
import asyncio
asyncio.run(init_app())
Top comments (0)