DEV Community πŸ‘©β€πŸ’»πŸ‘¨β€πŸ’»

DEV Community πŸ‘©β€πŸ’»πŸ‘¨β€πŸ’» is a community of 963,673 amazing developers

We're a place where coders share, stay up-to-date and grow their careers.

Create account Log in
Sylvain Kerkour
Sylvain Kerkour

Posted on • Originally published at kerkour.com

How to implement HTTP Long Polling in Rust

We will use the new web framework developed by tokio's team: axum. Its performance and simplicity are unparalleled in the Rust world. Also, please note that porting this code to another web framework is easy.

We will implement a simple chat server, as chat is the textbook application that benefits the most from long polling.

There are 3 tricks to make this implementation efficient, so stay attentive ;)

The Chat Service

The Chat Service is an object that encapsulates all our business logic. To keep the example simple, we will only make database calls.

Here is our first trick: In order to enable message ordering, we don't use a UUIDv4. Instead, we use a ULID that we convert to a UUID so there is no problem to serialize / deserialize it: Uuid = Ulid::new().into()

chat.rs

impl ChatService {
    pub fn new(db: DB) -> Self {
        ChatService { db }
    }

    pub async fn create_message(&self, body: String) -> Result<Message, Error> {
        if body.len() > 10_000 {
            return Err(Error::InvalidArgument("Message is too large".to_string()));
        }

        let created_at = chrono::Utc::now();
        let id: Uuid = Ulid::new().into();

        let query = "INSERT INTO messages
            (id, created_at, body)
            VALUES ($1, $2, $3)";

        sqlx::query(query)
            .bind(id)
            .bind(created_at)
            .bind(&body)
            .execute(&self.db)
            .await?;

        Ok(Message {
            id,
            created_at,
            body,
        })
    }
Enter fullscreen mode Exit fullscreen mode

Here is our second trick: notice the after.unwrap_or(Uuid::nil()) which return a "zero" UUID (00000000-0000-0000-0000-000000000000). With WHERE id > $1 it allows us to return all the messages if after is None.

It's useful to rehydrate the whole state of a client, for example.

    pub async fn find_messages(&self, after: Option<Uuid>) -> Result<Vec<Message>, Error> {
        let query = "SELECT *
            FROM messages
            WHERE id > $1";

        let messages: Vec<Message> = sqlx::query_as::<_, Message>(query)
            .bind(after.unwrap_or(Uuid::nil()))
            .fetch_all(&self.db)
            .await?;

        Ok(messages)
    }
}
Enter fullscreen mode Exit fullscreen mode

The Web Server

Next, the boilerplate to run the web server.

Thanks to .layer(AddExtensionLayer::new(ctx)), ServerContext is injected into all the routes so we can call ChatService's methods.

struct ServerContext {
    chat_service: chat::ChatService,
}

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    std::env::set_var("RUST_LOG", "rust_long_polling=info");
    env_logger::init();

    let database_url = std::env::var("DATABASE_URL")
        .map_err(|_| Error::BadConfig("DATABASE_URL env var is missing".to_string()))?;

    let db = db::connect(&database_url).await?;
    db::migrate(&db).await?;

    let chat_service = chat::ChatService::new(db);
    let ctx = Arc::new(ServerContext::new(chat_service));

    let app = Router::new()
        .route(
            "/messages",
            get(handler_find_messages).post(handler_create_message),
        )
        .or(handler_404.into_service())
        .layer(AddExtensionLayer::new(ctx));

    log::info!("Starting server on 0.0.0.0:8080");
    axum::Server::bind(
        &"0.0.0.0:8080"
            .parse()
            .expect("parsing server's bind address"),
    )
    .serve(app.into_make_service())
    .await
    .expect("running server");

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

Long Polling

Finally, our third trick: long polling is a simple loop with tokio::time::sleep.

By using tokio::time::sleep, an active connection will barely use any resources when waiting.

If new data is found, we immediately return with the new data. Else, we wait one more second.

After 10 seconds, we return empty data.

main.rs

async fn handler_find_messages(
    Extension(ctx): Extension<Arc<ServerContext>>,
    query_params: Query<FindMessagesQueryParameters>,
) -> Result<Json<Vec<Message>>, Error> {
    let sleep_for = Duration::from_secs(1);

    // long polling: 10 secs
    for _ in 0..10u64 {
        let messages = ctx.chat_service.find_messages(query_params.after).await?;
        if messages.len() != 0 {
            return Ok(messages.into());
        }

        tokio::time::sleep(sleep_for).await;
    }

    // return an empty response
    Ok(Vec::new().into())
}
Enter fullscreen mode Exit fullscreen mode

The code is on GitHub

As usual, you can find the code on GitHub: github.com/skerkour/kerkour.com (please don't forget to star the repo πŸ™).

Want to learn how to craft more advanced web applications in Rust (such as a WebAssembly frontend and a JSON API backend)? Take a look at my book: Black Hat Rust.

Top comments (0)

What image format should you use in your next project? πŸ€”