DEV Community

Cover image for Rust: Simple usage of bb8 and tokio_postgres
Maximilien Di Dio
Maximilien Di Dio

Posted on • Edited on • Originally published at blog.protocod.dev

Rust: Simple usage of bb8 and tokio_postgres

Introduction

Hello, world!

We will see how very easy it's to create a simple todolist using bb8 and tokio_postgres for the data layer side.

Disclaimer: This example doesn't use any user interface. Feel free to add a library like egui or iced or to setup a REST API and a web front-end. I won't show you how to setup a TLS session for the database connections. It's easy, but I'll keep this thing for another article.

bb8 is a full-featured pool connection designed for asynchronous connections.
The name of this crate can be explained easily because it's originally based on r2d2.

Dependencies

In order to perform asynchronous statements to the database, I use the tokio_postgres crate.

I found an interesting crate called bb8_postgres which provides both crates (bb8 and tokio_postgres) in order to make our lives easier using a named struct PostgresConnectionManager.

Let's get started by install some dependencies in your Cargo.toml.

[dependencies]
bb8-postgres = "0.7.0"
once_cell = "1.9.0"
snafu = "0.6"

[dependencies.postgres-types]
version = "0.2.2"
features = ["derive"]

[dependencies.uuid]
version = "0.8.2"
features = ["v4", "serde"]

[dependencies.tokio]
version = "1"
features = ["full"]

[dependencies.chrono]
version = "0.4.19"
features = ["serde"]

[dependencies.tokio-postgres]
version = "0.7.5"
features = ['with-chrono-0_4', 'with-uuid-0_8', 'with-serde_json-1']
Enter fullscreen mode Exit fullscreen mode

Chrono is used to manage datetime creation and timestamptz SQL serialization and deserialization through serde. I have decided to identify a todo using UUIDs, because they are unique and useful to ensure collision avoidance if I have to merge databases of todos.

I added once_cell, a rusty way to implement a thread-safe singleton pattern to get a unique instance of the bb8 pool during the execution of our program.

Snafu will help us to handle errors using context.

Implement the DBManager

use bb8_postgres::bb8::{Pool, PooledConnection, RunError};
use bb8_postgres::PostgresConnectionManager;
use chrono::{DateTime, Utc};
use once_cell::sync::OnceCell;
use snafu::{ResultExt, Snafu};
use tokio_postgres::types::{FromSql, ToSql};
use tokio_postgres::{NoTls, Row, ToStatement};
use uuid::Uuid;

// Thread-safe instance of DBManager
static DB_MANAGER_INSTANCE: OnceCell<DBManager> = OnceCell::new();

// Alias to represent a postgres database connection
pub type DBConnection<'a> = PooledConnection<'a, PostgresConnectionManager<NoTls>>;

// Alias to represent a database pool connections
pub type DBPool = Pool<PostgresConnectionManager<NoTls>>;

// It can occur when your not able to get a connection from the pool
pub type PostgresConnectionError = RunError<tokio_postgres::error::Error>;

// Provide a contexts for better error handling
#[derive(Debug, Snafu)]
pub enum Error {
    #[snafu(display("ConnectionError: {}", source))]
    ConnectionError { source: PostgresConnectionError },

    #[snafu(display("PostgresError: {}", source))]
    PostgresError { source: tokio_postgres::Error },
}
Enter fullscreen mode Exit fullscreen mode

We start with some basics. Import the stuff we need and create type alias to work with. I also define here the main Error struct.
As you see, we will create a struct called DBManager. The idea is to setup a struct to manage bb8 pool connections and call tokio_postgres functions to prepare statements and perform queries to the database. This must be a singleton because we don't want to create several pool of connections, that doesn't make sense. DBManager is a kind of service pattern.

pub struct DBOptions {
    // see https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html"
    pub pg_params: String,
    pub pool_max_size: u32,
}

// We call the DBManager when required
// like a kind of singleton
pub struct DBManager {
    pool: DBPool,
}

impl DBManager {
    // Get an instance of DBManager
    pub async fn get() -> &'static DBManager {
        DB_MANAGER_INSTANCE.get().unwrap()
    }

    // Create the DBManager instance using DBOptions
    async fn new(config: DBOptions) -> Result<Self, Error> {
        let DBOptions {
            pg_params,
            pool_max_size,
        } = config;

        let manager = PostgresConnectionManager::new_from_stringlike(pg_params, NoTls)
            .expect("unable build PostgresConnectionManager");

        let pool = Pool::builder()
            .max_size(pool_max_size)
            .build(manager)
            .await
            .context(PostgresError)?;

        Ok(Self { pool })
    }

    // Helper to get a connection from the bb8 pool
    pub async fn connection(&self) -> Result<DBConnection<'_>, Error> {
        let conn = self.pool.get().await.context(ConnectionError)?;
        Ok(conn)
    }

    // Perform a query from a fetched bb8 connection
    pub async fn query<T>(
        &self,
        statement: &T,
        params: &[&(dyn ToSql + Sync)],
    ) -> Result<Vec<Row>, Error>
    where
        T: ?Sized + ToStatement,
    {
        let conn = self.connection().await?;
        let rows = conn.query(statement, params).await.context(PostgresError)?;
        Ok(rows)
    }

    // Perform a query_one from a fetched bb8 connection
    pub async fn query_one<T>(
        &self,
        statement: &T,
        params: &[&(dyn ToSql + Sync)],
    ) -> Result<Row, Error>
    where
        T: ?Sized + ToStatement,
    {
        let conn = self.connection().await?;
        let row = conn
            .query_one(statement, params)
            .await
            .context(PostgresError)?;
        Ok(row)
    }
}
Enter fullscreen mode Exit fullscreen mode

DBOptions represent required parameters to initialize the DBManager. I've added some method to perform secure SQL statement based on query and query_one method from tokio_postgres.

The code will rely on this struct to do any operations with the postgres database.
Another quite interesting thing, DBManager can be mockup to interact with a fake database for unit testing.

Add the Todo entity and operations

The database contains a todo table and a priority level enumeration.


todo.png

create extension if not exists "uuid-ossp";

create type priority_level as enum (
    'Low',
    'Medium',
    'High'
);

create table todo (
    id uuid primary key default uuid_generate_v4(), 
    task text,
    priority priority_level,
    created_at timestamptz not null,
    expired_at timestamptz,
    completed_at timestamptz
);
Enter fullscreen mode Exit fullscreen mode

A simple database for a simple example I'm thinking about writing a second article on this blog using a more complex database. But this example is enough to understand the basics.

#[derive(Debug, ToSql, FromSql)]
#[postgres(name = "priority_level")]
pub enum PriorityLevel {
    Low,
    Medium,
    High,
}

#[derive(Debug)]
pub struct Todo {
    id: uuid::Uuid,
    task: String,
    priority: PriorityLevel,
    created_at: DateTime<Utc>,
    expired_at: Option<DateTime<Utc>>,
    completed_at: Option<DateTime<Utc>>,
}
Enter fullscreen mode Exit fullscreen mode

I use entity pattern here. A SQL table is represented by a rust struct, a SQL enumeration is represented by a rust enumeration. Quite simple.
We deal with objects firstly. These struct can derivate serde struct for JSON serialization.

impl Todo {
    pub fn new(task: String, priority: PriorityLevel, expired_at: Option<DateTime<Utc>>) -> Self {
        Self {
            id: Uuid::new_v4(),
            task,
            priority,
            created_at: chrono::offset::Utc::now(),
            expired_at,
            completed_at: None,
        }
    }

    // Get all todo from database
    pub async fn get_all() -> Result<Vec<Self>, Error> {
        let select_one_todo = "
        select
            id as todo_id,
            task as todo_task,
            priority as todo_priority,
            created_at as todo_created_at,
            expired_at as todo_expired_at,
            completed_at as todo_completed_at
            from todo;";

        let rows = DBManager::get().await.query(select_one_todo, &[]).await?;

        let todo_list: Vec<Self> = rows
            .iter()
            .map(|row| Self::try_from(row).unwrap())
            .collect();

        Ok(todo_list)
    }

    // get a todo by id from database
    pub async fn get_by_id(id: &Uuid) -> Result<Self, Error> {
        let select_one_todo = "
        select
            id as todo_id,
            task as todo_task,
            priority as todo_priority,
            created_at as todo_created_at,
            expired_at as todo_expired_at,
            completed_at as todo_completed_at
            from todo where id = $1;";

        let row = DBManager::get()
            .await
            .query_one(select_one_todo, &[id])
            .await?;

        Ok(Self::try_from(&row)?)
    }

    // Toggle completed_at, if None the todo is not completed,
    pub fn toggle_complete(&mut self) {
        self.completed_at = match self.completed_at {
            Some(_) => None,
            None => Some(chrono::offset::Utc::now()),
        }
    }

    // Method to persist the object in database
    // can be calls to create or update an existing object in database
    pub async fn save(&self) -> Result<&Self, Error> {
        let insert_new_todo = "
            insert into todo (id, task, priority, created_at, expired_at, completed_at)
            values ($1, $2, $3, $4, $5, $6)
            ON CONFLICT (id)
            DO UPDATE SET
                task = EXCLUDED.task,
                priority = EXCLUDED.priority,
                created_at = EXCLUDED.created_at,
                expired_at = EXCLUDED.expired_at,
                completed_at = EXCLUDED.completed_at;";

        let _ = DBManager::get()
            .await
            .query(
                insert_new_todo,
                &[
                    &self.id,
                    &self.task,
                    &self.priority,
                    &self.created_at,
                    &self.expired_at,
                    &self.completed_at,
                ],
            )
            .await?;
        Ok(self)
    }

    // Be carefull, it's not a soft-delete.
    // this will remove the data of the object from the database. 
    // But the object himself is not dropped. So you can continue to
    // interact with it.
    async fn delete(&self) -> Result<&Self, Error> {
        let delete_todo = "delete from todo where id = $1;";
        let _ = DBManager::get()
            .await
            .query(delete_todo, &[&self.id])
            .await?;

        Ok(self)
    }
}
Enter fullscreen mode Exit fullscreen mode

The methods save and delete are quite special, I was inspired by ORM like Doctrine or Eloquent.
You can create and manipulate the object and save it in the database when you're ready to store it.
I provide static methods (get_by_id and get_all) to fetch todos from the database. These act like methods from a repository.

But you should have seen something special, I call try_from method to convert a &Row into an instance of Todo. So as you imagine, we have to implement the TryFrom<T> trait.

The final touch!

impl<'a> TryFrom<&'a Row> for Todo {
    type Error = Error;

    fn try_from(row: &'a Row) -> Result<Self, Self::Error> {
        let id = row.try_get("todo_id").context(PostgresError)?;
        let task = row.try_get("todo_task").context(PostgresError)?;
        let created_at = row.try_get("todo_created_at").context(PostgresError)?;
        let expired_at = row.try_get("todo_expired_at").context(PostgresError)?;
        let completed_at = row.try_get("todo_completed_at").context(PostgresError)?;
        let priority = row.try_get("todo_priority").context(PostgresError)?;

        Ok(Self {
            id,
            task,
            created_at,
            expired_at,
            completed_at,
            priority,
        })
    }
}
Enter fullscreen mode Exit fullscreen mode

Have you noticed the names of the columns? The column name starts with the name of the table (todo_) and finishes with the name of the field.
That's the trick. When you select data from PostgreSQL, each field must start with the name of the entity.

Like this:

        select
            id as todo_id,
            task as todo_task,
            priority as todo_priority,
            created_at as todo_created_at,
            expired_at as todo_expired_at,
            completed_at as todo_completed_at
            from todo;
Enter fullscreen mode Exit fullscreen mode

This naming system prevents you from getting wrong fields if you join some tables because many tables can have common named fields.

#[tokio::main]
async fn main() -> Result<(), Error> {
    // TODO: You can improve this by using clap to
    // get database settings from CLI or ENV VAR
    let options = DBOptions {
        pg_params: String::from(
            "postgres://postgres:test@localhost:5432/postgres?connect_timeout=10",
        ),
        pool_max_size: 8u32,
    };

    // Create the unique instance of DBManager
    let _ = DB_MANAGER_INSTANCE.set(DBManager::new(options).await?);

    // Create a new todo
    let mut todo_finish_this_draft = Todo::new(String::from("Publish this draft"), PriorityLevel::High, None);

    // Persist this todo in database,
    // this insert the data of the object
    // into the todo table
    todo_finish_this_draft.save().await?;

    // Show the todo object
    println!("{:?}", todo_finish_this_draft);

    // Mutate the state of this todo make it completed!
    todo_finish_this_draft.toggle_complete();

    // Then, persist the object again.
    // This update the object in database because
    // the id of this object already exist.
    todo_finish_this_draft.save().await?;

    // Display the updated todo
    println!("{:?}", todo_finish_this_draft);

    // Fetch all todo from the database
    let todo_list = Todo::get_all().await?;

    // As you see, there is only 1 todo in the database
    // That's normal, we persist 2 times the same object.
    println!("{:?}", todo_list);

    // Remote the object data from the database
    // but it does not drop the rust object.
    todo_finish_this_draft.delete().await?;

    // As you see, there is no more todo in database
    let new_todo_list = Todo::get_all().await?;
    println!("{:?}", new_todo_list);

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

Honestly, it would be better to write unit tests, but I want to keep things simple for now.
The main function calls all the implemented functions to test them.

The complete code is available on this repository:
https://github.com/prx0/todolist-bb8-postgres/tree/main/src

Top comments (0)