Hello, amazing people and welcome back to my blog! Today we will build a real-time chat application in Rust using the Rocket framework.
In the past, I've written a tutorial on a chat application, but this one is way different and feels more modern. It also includes a UI whereas my previous one was CLI-based.
Introduction
One of the most popular server backend frameworks in Rust is Rocket, and one of the great things about Rocket is the documentation and examples repository, so I was inspired to create this project after checking this repo.
Let's start:
After you create your Rust project, let's open up cargo.toml and add Rocket as a dependency. We'll also add the rand
crate as a dev dependency. This is going to be useful later when we implement tests.
[dependencies]
rocket = { features = ["json"] }
[dev-dependencies]
rand = "0.8"
Main.rs
Let's go to main.rs and import rocket
. We're importing rocket
explicitly with the macro_use
attribute so that all the rocket
macros are imported globally. This means you can use rocket
macros anywhere in your application, which is important because the rocket
framework uses macros extensively.
#[macro_use] extern crate rocket;
fn rocket()
After that, the first thing we'll do is to create a state and a rocket server instance.
#[launch]
fn rocket() -> _ {
rocket::build()
.manage(channel::<Message>(1024).0)
}
As you see from the code above, the manage
method allows us to add state to our rocket server instance, which all handlers have access to. The state we want to add is a channel
. Rocket uses Tokyo
as an async runtime and channels are a way to pass messages between different async tasks.
Let's add Tokio
:
use rocket::tokio::sync::broadcast::{channel, Sender, error::RecvError};
So, back to the fn rocket()
, we're creating a channel and specifying what type of messages we'd like to send across the channel. In this case, a Message
struct, which we haven't implemented yet. We also pass in a capacity (1024)
, which is the amount of messages a channel can retain at a given time. The return value of calling the channel function is a tuple containing a sender and receiver end. At the end of this call, we write .0
to get the first element in the tuple because we only want to store the sender end in state.
struct Message
Now that we have our state set up, let's implement the Message
struct. The Message
struct has three fields: a room
name, a username
, and a message
, all of which are strings
. Also, some extra validation is added to room
and username
. The room
name can only be up to 29
characters long and username
can only be up to 19
characters long.
#[derive(Debug, Clone, FromForm, Serialize, Deserialize)]
#[cfg_attr(test, derive(PartialEq, UriDisplayQuery))]
#[serde(crate = "rocket::serde")]
struct Message {
#[field(validate = len(..30))]
pub room: String,
#[field(validate = len(..20))]
pub username: String,
pub message: String,
}
This struct is also deriving a few traits.
#[derive(Debug, Clone, FromForm, Serialize, Deserialize)]
#[serde(crate = "rocket::serde")]
Debug
so this struct could be printed out with debug format,clone
so we can duplicate messages,FromForm
so we can take form data and transform it into a message struct, andserialize
anddeserialize
, which will allow this data structure to be serialized and deserialized. Serialization will happen via crate and the next attribute states that we want to use the crate defined in rocket.#[serde(crate = "rocket::serde")]
The Message
struct defines the type of messages we want to send. Our real-time chat application is going to have rooms, users, and messages, so these three fields make sense. Now that we have our message defined, there's only one last thing to do, which is to implement our endpoints. Our chat application needs two endpoints, one endpoint to post messages and another endpoint to receive messages.
Post messages
This route #[post("/message", data = "<form>")]
matches against post requests to the message path and accepts form data.
#[post("/message", data = "<form>")]
fn post(form: Form<Message>, queue: &State<Sender<Message>>) {
// A send 'fails' if there are no active subscribers. That's okay.
let _res = queue.send(form.into_inner());
}
use rocket::form::Form;
The function handler accepts two arguments, the form data, which is going to be converted to the Message
struct, and the server state, which is going to be a sender. Inside the body, we send the message to all receivers queue.send(form.into_inner());
The send method returns a result
type because sending a message could fail if there are no receivers. In this project, I don't care about that case, so we're going to ignore it. 🙂
Receive messages
This route #[get("/events")]
handles get
requests to the events path. The return type is an infinite stream of server-sent events EventStream
. EventStream
allow clients to open a long-lived connection with the server, and then the server can send data to the clients whenever it wants. This is similar to WebSockets
, except it only works in one direction. The server can send data to clients, but the clients can't send data back to the server.
#[get("/events")]
async fn events(queue: &State<Sender<Message>>, mut end: Shutdown) -> EventStream![] {
.
.
}
Unlike the other handler functions we implemented, notice that this function is prefixed with async
, that's because server-sent events are produced asynchronously. The handler takes two arguments, queue
, which is our server state, and end
, which is of type Shutdown
. Shutdown
is a feature which resolves when our server instance is Shutdown
.
Inside the handler, the first thing we do is call queue.subscribe()
to create a new receiver. This will allow us to listen for messages when they're sent down the channel. Next, we use generator syntax to yield an infinite series of server-sent events.
#[get("/events")]
async fn events(queue: &State<Sender<Message>>, mut end: Shutdown) -> EventStream![] {
let mut rx = queue.subscribe();
EventStream! {
loop {
let msg = select! {
msg = rx.recv() => match msg {
Ok(msg) => msg,
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(_)) => continue,
},
_ = &mut end => break,
};
yield Event::json(&msg);
}
}
}
use rocket::{State, Shutdown};
use rocket::response::stream::{EventStream, Event};
use rocket::tokio::select;
Inside this infinite loop, the first thing we do is use the select!
macro. select!
waits on multiple concurrent branches and returns as soon as one of them completes. In this case, we only have two branches. The first one is calling receive on our receiver (rx.recv()
), which waits for new messages. When we get a new message, we map it to msg
and then match
against that. recv
returns a result enum
:
If we get the
Ok
variant, we simply return the message inside of it (Ok(msg) => msg
)If we get the error variant and the error is closed (
Err(RecvError::Closed)
), that means there are no more senders so we can break out of the infinite loopIf we get the error variant and the error is lagged (
Err(RecvError::Lagged(_)) => continue,
), that means our receiver lagged too far behind and was forcibly disconnected. In that case, we simply skip to the next iteration of the loop.
The second branch looks a little odd ( _ = &mut end => break
), but what this is doing is waiting for the Shutdown
feature to resolve. The Shutdown
feature resolves when our server is notified to shutdown, at which point we can break
out of this infinite loop.
Assuming we don't hit one of these break or continue statements, the select
macro will return the message we got from our receiver, at which point we can yield a new server-sent event, passing in our message (yield Event::json(&msg);
).
Yeah! Now both our routes are complete. But we're not yet done! 🤓
Finish fn rocket()
The last thing we need to do is mount
these routes
. Let's go back to fn rocket()
. We'll mount post
and events
to the rootes
path.
.mount("/", routes![post, events])
Our backend is complete, but we also need a frontend. Before we start adding HTML files, let's mount
a handler that will serve static
files.
.mount("/", FileServer::from(relative!("static")))
Your rocket()
should look like this:
#[launch]
fn rocket() -> _ {
rocket::build()
.manage(channel::<Message>(1024).0)
.mount("/", routes![post, events])
.mount("/", FileServer::from(relative!("static")))
}
Front-end
Our static
files need to be stored in a folder called static
, so I'll create that folder. I'm not going to go into detail about the implementation of the frontend as it's out of the scope of this tutorial, but in the end you'll find the full source code.
As a quick overview, the front end consists of a simple HTML
page, a couple CSS
files, and a vanilla JavaScript
file. The JavaScript
file uses the EventSource
object to establish a new connection with our server and listen for new messages. When a new message is received, it's parsed into JSON
and appended to the DOM
.
To send messages, a simple post request is dispatched.
if (STATE.connected) {
fetch("/message", {
method: "POST",
body: new URLSearchParams({ room, username, message }),
}).then((response) => {
if (response.ok) messageField.value = "";
});
}
Run the App
Hit cargo run
into your terminal. Your server will run a localhost similar to this 8000
. Open up two web browsers and navigate to your localhost (mine is 8000
) on both. Then pick two usernames for the field 'user'. Finally, send a message!
If you run this with me, you can see, the message appeared for user 1 and user 2 instantly. One thing to note is that we didn't implement any type of persistence, so if we refresh one of these web pages, all the messages will be lost. You could consider this a bug or a neat little security feature. 😏
Source code
Find the code here:
EleftheriaBatsou / chat-app-rocket-rust
Realtime chat application in Rust using the Rocket framework
Realtime Chat Application in Rust
Run it:
cargo run
instructions:
Open 2 browsers, type the user name, and send a message! If you refresh the page the messages will get lost.
Tutorial here.
Credits here.
Happy Rust Coding! 🦀
👋 Hello, I'm Eleftheria, Community Manager, developer, public speaker, and content creator.
🥰 If you liked this article, consider sharing it.
Top comments (0)