DEV Community

Akarshan Gandotra
Akarshan Gandotra

Posted on • Updated on

Asynchronous Database Sessions in FastAPI with SQLAlchemy

FastAPI, a modern, fast web framework for building APIs with Python 3.7+ based on standard Python type hints, provides excellent support for working with databases. SQLAlchemy, a powerful SQL toolkit and Object-Relational Mapping (ORM) library, seamlessly integrates with FastAPI to handle database operations.

In this blog post, we'll explore how to use asynchronous database sessions in SQLAlchemy with FastAPI. We'll focus on creating an AsyncSession and managing its lifecycle using the asyncio module, along with demonstrating the use of dependency injection for cleaner and more maintainable code.

Understanding SQLAlchemy's Asynchronous Engine and Session

create_async_engine

The create_async_engine function is responsible for creating an asynchronous database engine. In the context of FastAPI and SQLAlchemy, this engine handles database connections and communication. Let's break down its parameters:

  1. database_url: The URL specifying the database connection details. It typically follows the format "dialect+driver://username:password@host:port/database".

  2. pool_size: The maximum number of database connections to pool. In our case, it's set to 100.

  3. max_overflow: The maximum number of connections to allow in the connection pool above pool_size. It's set to 0, meaning no overflow connections are allowed.

  4. pool_pre_ping: If True, the connection pool will check for stale connections and refresh them. In our example, it's set to False.

async_sessionmaker

The async_sessionmaker function is used to create an asynchronous session class. It's similar to the traditional sessionmaker but adapted for asynchronous operations. Let's explore its parameters:

  1. autocommit: If True, each transaction will be automatically committed. In our case, it's set to False.

  2. autoflush: If True, the session will automatically flush changes to the database. Here, it's set to False.

  3. bind: The database engine to which the session will be bound. In our example, it's bound to the asynchronous engine created using create_async_engine.

async_scoped_session

The async_scoped_session function creates a scoped session for the current context. It allows you to work with a single session within a particular scope, such as a request in FastAPI. Here are the parameters:

  1. sessionmaker: The asynchronous session class created with async_sessionmaker.

  2. scopefunc: A function that returns a hashable value identifying the scope. In our case, we use current_task, tying the scope to the current asyncio task.

# Import statements...
class DatabaseSessionManager:
    def __init__(self):
        self.engine: AsyncEngine | None = None
        self.session_maker = None
        self.session = None

    def init_db(self):
        # Database connection parameters...

        # Creating an asynchronous engine
        self.engine = create_async_engine(
            database_url, pool_size=100, max_overflow=0, pool_pre_ping=False
        )

        # Creating an asynchronous session class
        self.session_maker = async_sessionmaker(
            autocommit=False, autoflush=False, bind=self.engine
        )

        # Creating a scoped session
        self.session = async_scoped_session(self.session_maker, scopefunc=current_task)

    async def close(self):
        # Closing the database session...
        if self.engine is None:
            raise Exception("DatabaseSessionManager is not initialized")
        await self.engine.dispose()

# Initialize the DatabaseSessionManager
sessionmanager = DatabaseSessionManager()
Enter fullscreen mode Exit fullscreen mode

Initializing and Closing the Database Session

In FastAPI, it's crucial to initialize and close the database session properly. The get_db function ensures that we have a valid and scoped database session for each request:

# Import statements...

async def get_db() -> AsyncIterator[AsyncSession]:
    session = sessionmanager.session()
    if session is None:
        raise Exception("DatabaseSessionManager is not initialized")
    try:
        # Setting the search path and yielding the session...
        await session.execute(
            text(f"SET search_path TO {SCHEMA}")
        )
        yield session
    except Exception:
        await session.rollback()
        raise
    finally:
        # Closing the session after use...
        await session.close()
Enter fullscreen mode Exit fullscreen mode

Using Dependency Injection

FastAPI supports dependency injection, making it easier to manage dependencies across different parts of your application. To use the database session as a dependency, you can utilize the Depends function. Here's an example of how you can inject the AsyncSession into your FastAPI route:

# Import statements...

async def some_route(session: AsyncSession = Depends(get_db)):
    # Your route logic using the database session...
    # ...

# Using the route with dependency injection
app.include_router(some_route)
Enter fullscreen mode Exit fullscreen mode

Practical Example in Scripts

Now, let's see how you can use the asynchronous database session in practical scenarios, such as in scripts:

# Import statements...

async def process_user_data(user_ids: List[int]):
    async for db_session in get_db():
        async with db_session as session:
            await clear_users_from_tables(session, user_ids)
Enter fullscreen mode Exit fullscreen mode

In the above example, get_db ensures that you get a scoped session, and the clear_users_from_tables function can use this session to interact with the database.

By following these practices, you can build efficient and scalable FastAPI applications that leverage asynchronous database sessions provided by SQLAlchemy. This approach not only improves performance but also ensures proper management of database resources.

Top comments (1)

Collapse
 
itisguptak profile image
Karan Gupta

Hi,

Do we still need to do session.begin in the crud functions with this approach?

So say if I have a route, that does 2 crud calls at 2 points in the function, so should they have different transactions (with session.begin)? In that case, I believe 2 commits will be done Vs only 1 commit with approach described in this article where session.commit happens at get_db essentially after request has completed and exited route function.

Could you let me know which approach is better?