DEV Community

Cover image for Building Redis Server in Rust: Part 1
Ashish Singh
Ashish Singh

Posted on • Edited on

Building Redis Server in Rust: Part 1

In this blog post, we will be making a Redis server in Rust Programming Language. If you are new to rust and looking to learn the language this is a good starting point. In this post, no prior knowledge of the language is assumed. However, if you have read the rust book or some starter tutorial it would be great. If you are someone who has done some basic rust, then this is perfect to dive deeper by building a full scale software program.

The second part of this blog series.

The final implementation will have a fully functioning Redis server with get and set commands. You can get the code for this blog here.

This is going to be a 2 blog post series

  1. In this first blog post we will create a basic single threaded server client interaction and data persistence and retrieval in server.
  2. In the second post, we will implement multi-threaded client server interaction and implement shutdown notification to client using channels.

Prerequisite

You must have rust installed on your computer, if not follow instructions on the official installation guide.

Project Setup

Here we will create the application boilerplate, install all the necessary dependencies, and configure the project.

To get started, create a new rust project with the command.

$ cargo new my_redis
$ cd my_redis  
Enter fullscreen mode Exit fullscreen mode

Open the my_redis project in visual studio code or any other editor of your choice to start coding.

You will get 2 main files in the code. first is Cargo.toml and other is /src/main.rs. Cargo.toml file contains all the metadata of the project like project name and most importantly, the dependencies used in the project. Later, we will add libraries under the dependecies section of this file.

Dependencies used

  1. Clap: Used to read command line arguments
  2. Tokio: Used for async implementation.

main.rs has main function which is the entry point of our rust program whenever we run the command cargo run from terminal.

Redis consists of 2 parts, first is server and second is client. Client makes a request to the server to save or retrieve data. Server holds the responsibility to persist data in the server and retrieving it. So, in command line/terminal we will have 2 windows running server and client each.

By default, main.rs is the default executable file. However, since we need 2 executables each for server and client, we will put the server and client executable in src/bin folder which we will create soon. This folder holds as many executable files as we want. This is perfect for our use since we need 2 separate executable files for server and client to run in the terminal.

So, let's create a new folder inside src called bin. Next, create server.rs and client.rs inside the bin folder. The folder structure at this point should be:

src/
│   ├── main.rs
│   └── bin/
│       └── server.rs
│       └── client.rs
Enter fullscreen mode Exit fullscreen mode

Add main() function in both files to get rid of the error which complains about missing main functions. Since, these are executable files, they must contain main functions which is the entry point for a rust program.

To run these executable files simply run the command like so:

$ cargo run --bin server 
$ cargo run --bin client // in new terminal window
Enter fullscreen mode Exit fullscreen mode

--bin argument tell Cargo which executable to run in case there are multiple executables.

After this step, we don't need the main.rs file since we have our separate binaries ready to be executed. So go ahead and delete the main.rs file.

Server and client interaction will happen with a socket connection. Server will listen for incoming socket connection requests. Client will try to connect to server.

Intro to Tokio

Tokio provides asynchronous capabilities and utilities to work in rust. In this blog post, the major usage of tokio will be in form of TcpListener and TcpStream implementations from the tokio::net crate. Tokio has exposed .await functionality on these modules making them truly asynchronous. In short, whenever we use await keyword on the async function calls, the control flow waits till the execution of the function has completed.

To use tokio, add the tokio dependency in Cargo.toml file.
File Cargo.toml

[dependencies]
tokio = {version="1", features = ["full"]}
Enter fullscreen mode Exit fullscreen mode

Server implementation

Now, add #[tokio::main] macro right before the async fn main() line. This transforms the async main function into synchronous main function and wraps the code in the main function in an async block. The reason to put the code in the async block is because using .await on tokio functions will require the main function to be async type. But, since rust runtime expects the entry point main function to be synchronous in nature hence #[tokio::main] does that transformation for us.

#[tokio::main]
async fn main() {
  println!("in main");
} 

//gets transformed to 
fn main() {
  let rt = tokio::runtime::Runtime::new().unwrap()
  rt.block_on(async { // block_on takes a block of code which is async in nature, so any `.await` inside the block will be fine.
    println!("in main");
  })
}
Enter fullscreen mode Exit fullscreen mode

Always remember, if you want to use .await inside a function or block of code then the function or block of code should be async type.

Next, we want to listen to incoming socket connections in the server.rs file. Import the TcpListener from tokio crate using use tokio::net::TcpListener.

Now in main function using TcpListen::bind() method and supplying it an address with a port. Since the bind method is an async method, we will use await to hold the execution till bind returns the value. The ? operator is a special operator which returns the value wrapped inside Result or propagate the error from the function one level up. You can read more about it here.

File bin/server.rs

pub async fn main() -> Result<(), std::io::Error> {
    let listener = TcpListener::bind("127.0.0.1:8081").await?;
    loop {
      let (mut socket, _) = listener.accept().await?;
      println!("connection accepted {:?}", socket);
    }
    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

At this point, please note we have changed the return type of main function to Result this is coming from core::Result<T, E>. The error gets propagated from the ? operator as explained above.

If we run the server with command $ cargo run --bin server It will run the program and wait at listener.accept() for new connections. With this our server is listening for socket connections on a single thread.

Now, let's move to the client for establishing a socket connection with server.

Since, the client initiates the socket connection, we will be using TcpStream from tokio::net crate. We will make the main function async and annotate with the macro #[tokio::main], which is the same thing we did in the the server setup.

To create a new connection use the same host address and port as server.

pub async fn main() -> Result<(), std::io::Error> {
    let mut stream = TcpStream::connect("127.0.0.1:8081").await?;

    Ok(())
}
Enter fullscreen mode Exit fullscreen mode

At this point, note we have changed the return type of main function to Result this is coming from core::Result<T, E>. The error gets propagated from the ? operator as explained above.

Let's check this minimal server and client interaction. We expect the server to stop after accepting one client connection. It will also print the connection accepted print message.

Run the server and client in the separate terminal windows with the commands shared above and you will notice that server will print something like this.

connection accepted PollEvented { io: Some(TcpStream { addr: 127.0.0.1:8081, peer: 127.0.0.1:61643, fd: 10 }) }
Enter fullscreen mode Exit fullscreen mode

It means, our connection is established and socket is ready to start sending and receiving data.

Writing to socket from client

The first thing we want to do is send the command from client to server. For the sake of simplicity we will send data in form of a string (Bytes in actuality) on socket.

Let's use write_all() on socket function which expects bytes to write data on the socket.

stream.write_all(b"set foo bar").await?;
Enter fullscreen mode Exit fullscreen mode

In rust, any string literal prefixed with b in front of it makes it bytes sequence. If you want to understand the difference in between string literal, string and byte sequence checkout this awesome post.

That's all we need to write data to the socket connection.

Reading from socket in server

To read data from the socket on the server side, we will use read_buf method. This method accepts on argument as a buffer. It copies data from socket to the provided buffer.

We shall create a new buffer from BytesMut. It comes from the bytes crate. First add bytes dependency under the [dependencies] in Cargo.toml file.

[dependencies]
bytes = { version = "1" }
Enter fullscreen mode Exit fullscreen mode

Then import BytesMut from bytes crate in the server.rs file. use bytes::BytesMut.

Let's create a mutable BytesMut with a capacity. This buf object is mutable because it will allow writing bytes into it. Next read the data from socket into the created buffer using read_buf method.
Bytes::with_capacity() takes in number of bytes to allocate to the buffer. This buffer with automatically grow in size as needed but it's more efficient to start with a generous buffer size.
Take for example "foo" will take 3 bytes but "ƒoo" will take 4 bytes. let's start by 1024 bytes as a standard staring size for our buffer.

let mut buf = BytesMut::with_capacity(1024); 
socket.read_buf(&mut buf).await?;
println!("buffer {:?}", buf); // printing the data in buffer
Enter fullscreen mode Exit fullscreen mode

This is all we need to read data from socket into the server. Let's fire the server and client to see if our program is working as expected.

You will get buffer b"set foo bar" as output on the server terminal.

If you reached to this point, congratulations are in order as you have successfully sent from client to server that too in rust.

Command matching

Now, that we have read the data from client which is a command like "set foo bar". The first element is the command name and the next 2 are key and value.
Depending on what is the command we will either perform a get or set.

So, let's get started by converting the buffer to a Vector(sort of Array) of Strings. Then we can take the first string from the vector and fetch the command from Command Enum. We will create Command enum later in this post.

Let's create a method for this conversion called buffer_to_array. Create a new file src/helper.rs and put this function in that file. Since, this file is in src folder, we need to create another file called lib.rs and import the module Helper which gets created as a result of new file cmd.rs. Add pub mod helper; in the src/lib.rs file. This exposes the helper module to the library code. In rust, main.rs is the binary executable and lib.rs is the shared code. If you are creating a library lib.rs is where you'll import all your files.

File src/helper.rs

use bytes::Buf; // get_u8 

fn buffer_to_array(buf: &mut BytesMut) -> Vec<String> {
    let mut vec = vec![];
    let length = buf.len();
    let mut word = "".to_string();

    for i in 0..length {
        match buf.get_u8() {
            b' ' => { // match for space
                vec.push(word);
                word = "".to_string();
            }
            other => {
                // increase the word
                word.push(other as char);
                let new = word.clone();
                if i == length - 1 {
                    vec.push(new);
                }
            }
        }
    }
    vec
}
Enter fullscreen mode Exit fullscreen mode

get_u8 method comes from the use bytes::Buf trait. Since BytesMut implements Buf trait so we can get_u8 on BytesMut.

buffer_at_array method essentially breaks the string at every space encountered (b' ') and returns a vector of strings vec<String>. Let's call this method in server.rs file in the loop to get a list of strings eg. ["set foo bar"].

use blog_redis::helper::buffer_to_array;

main() {
   loop {
      ...// previous code 
      let attrs = buffer_to_array(&mut buf);
   }
}
Enter fullscreen mode Exit fullscreen mode

Next step, would be to find out the command. For this, take the first string from the vector and compare it with the list of known commands. In our case, Get or Set.

Let's create another file src/cmd.rs. We will create an enum for the commands inside this file. These are the set of commands we will be implementing.

pub enum Command {
  Get,
  Set,
  Invalid,
}
Enter fullscreen mode Exit fullscreen mode

Now, we will try to match the first string and depending on the match return a valid command. Let's create a function inside the src/cmd.rs as implementation of Command to return the command(enum) based on the first string matching.

impl Command {
    pub fn get_command(str: &String) -> Command {
        match str.as_bytes() {
            b"set" => Command::Set,
            b"get" => Command::Get,
            _ => Command::Invalid,
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Since this file is in src folder, let's import the code in lib.rs.

File lib.rs

pub mod cmd;
pub use cmd::Command;
Enter fullscreen mode Exit fullscreen mode

Let's use this function to get the command.

loop {
    .. // previous code
    let attrs = buffer_to_array(&mut buf);
    let command = Command::get_command(&attrs[0]);
}
Enter fullscreen mode Exit fullscreen mode

Persisting key and value

For our use case, we need to store key value pairs in the database. A data structure which is good for storing key, value pairs is HashMap. Any new connection request will take the reference to the db created and either update (set/write) data into it or get(read) data from it. We will create an instance of db first time when the server starts and for any new connections we will use the db instance to read or write data to it.

Let's create a struct for Database which will hold our key value pairs inside Hashmap. This struct should be added in its own file src/db.rs.

Let's import this module in our src/lib.rs like we did previously for cmd.rs and helper.rs.

File lib.rs

pub mod db;
pub use db::Db;
Enter fullscreen mode Exit fullscreen mode

File src/db.rs

use bytes::Bytes;
use std::collections::HashMap;

pub struct Db {
    entries: HashMap<String, Bytes>
}
Enter fullscreen mode Exit fullscreen mode

Next, let's create a new instance of this Db struct in the server.rs main function. This will be done before the server starts accepting the socket connection requests in the loop {}. Since, we want to persist the database object throughout the scope of the main function of the server.rs file. If we create db in the loop, then at the end of the first loop the object will be dropped from the scope. That's not what we want.

File server.rs

let db = Db::new();
loop {
    let (socket, _) = listener.accept().await?;
}
Enter fullscreen mode Exit fullscreen mode

Let's create a new function process_query() in the server.rs file. It will accept the command, attrs, socket, db parameters and write the attributes to the db object.

pub async fn main() -> Result<(), std::io::Error> { 
    // previous main fn code...
    process_query(command, attrs, &mut socket, &mut db).await?;
}

async fn process_query(
    command: Command,
    attrs: Vec<String>,
    socket: &mut TcpStream,
    db: &mut Db,
) -> std::io::Result<()> {
    match command {
        Command::Get => {
            Ok(())
        }
        Command::Set => {
            let resp = db.write(&attrs);

            match resp {
                Ok(result) => {
                    println!("set result: {}", result);
                    socket.write_all(&result.as_bytes()).await?;
                }
                Err(_err) => {
                    socket.write_all(b"").await?;
                }
            }

            Ok(())
        }
        Command::Invalid => Ok(()),
    }
Enter fullscreen mode Exit fullscreen mode

In process_query() method, we will match the command for Command::Get, Command::Set and Command::Invalid.

In Command::Set we will call db.write(&attrs). As of now, this write method on db is not implemented, so let's go ahead and do that.

File src/db.rs

impl Db {
    pub fn write(&mut self, arr: &[String]) -> Result<&str, &'static str> {
        let key = &arr[1];
        let value = &arr[2];

        let val = value.clone();
        let res: &Option<Bytes> = &self.entries.insert(String::from(key), Bytes::from(val));

        match res {
            Some(_res) => Ok("r Ok"),
            None => Ok("Ok"),
        }
    }
Enter fullscreen mode Exit fullscreen mode

We need to clone the referenced value since Bytes::from() function expects a 'static lifetime variable but value has unknown lifetime in this function context, since it's being defined outside the scope of this function.

insert() function returns None if they key was not present in the hashmap, it returns Some(old_value) if the key was already present in the hashmap. We are returning "r Ok" in case of key already present and "Ok" when the key was not already present in the hash. This helps on the client side to give appropriate message to the client/consumer.

Some and None are types on the Option type. Option is used when we are expecting optional values, meaning either there will be a value with Some(value) or None.

Now, coming back to the bin/server.rs notice that we are getting the result of db.write(&attrs) into a result object. We are using match to capture the response from db.write(...) and writing the value directly into the socket using socket.write_all(&result).await?;. This will send bytes on the socket to be further read by the client socket connection.

client side result

File bin/client.rs

// previous main function code...
let mut buf = BytesMut::with_capacity(1024);
let _length = stream.read_buf(&mut buf).await?;
match std::str::from_utf8(&mut buf) {
    Ok(resp) => {
        if resp == "r Ok" {
            println!("key updated");
        } else if resp == "Ok" {
            println!("key set");
        }
    }
    Err(err: Utf8Error) => {
        // failed to convert bytes into string slice
        println!("error: {}", err);
    }
}
Enter fullscreen mode Exit fullscreen mode

We will create buf of type BytesMut and read the data from socket into the buffer. Why we are using BytesMut is explained above.

Next, we will match the string data from socket and return response based on whether its fresh key write or key updated. We are also capturing Error from std::str::from_utf8(&mut buf) as it will return Utf8Error if it fails to convert the byte slice into string slice.

At this point, our implementation of set command is somewhat complete. We can not test this from the command line.

Let's go ahead and test this by firing the client and hope to see a success message. It will be either "key set" or "key updated".

$ cargo run --bin client
// response should be "key set" in terminal.
Enter fullscreen mode Exit fullscreen mode

So far, we have set a key value pair in our server. Let's try to implement the get command and fetch the corresponding value. Let's implement this in next section.

Read data from server

To fetch the value associated with the key, we will create another socket connection in main() of client.rs. On this socket we will write the data get foo.

File bin/client.rs

let mut stream = TcpStream::connect("127.0.0.1:8081").await?;
stream.write_all(b"get foo").await?;
Enter fullscreen mode Exit fullscreen mode

Next, we will change the bin/server.rs to accept the Command::Get in the match function in the process_query method.
File bin/server.rs

match command {
    Command::Get => {
        let result = db.read(&attrs);
        match result {
            Ok(result) => {
                socket.write_all(&result).await?;
            }
            Err(_err) => {
                println!("no key found {:?}", _err);
                socket.write_all(b"").await?;
            }
        }
        Ok(())
    }
}
Command::Set => { 
  // already implemented above
}
Enter fullscreen mode Exit fullscreen mode

The server has returned the response by writing to the socket with the response from the db. Next, we will update the src/db.rs file to implement the read() method.

pub fn read(&mut self, arr: &[String]) -> Result<&Bytes, &'static str> {
    let key = &arr[1];
    let query_result = self.entries.get(key);

    if let Some(value) = query_result {
        return Ok(value);
    } else {
        return Err("no such key found");
    }
}
Enter fullscreen mode Exit fullscreen mode

The error returned is a string slice str. The &'static lifetime generally means, that the value will persist till the end of the program. The reason for adding &'static lifetime to the str is because, the string is created in this function scope and will be dropped as soon as the function call ends. Hence, making it &'static helps convey to the compiler that this string lives forever. Rust lifetimes are not a simple concept, checkout this awesome tutorial for better understanding.

Final step would be to read the response in client.rs file and print the value in the command line.

let mut buf = BytesMut::with_capacity(1024);
let _length = stream.read_buf(&mut buf).await?;
println!("buffer: {:?}", &buf);
match std::str::from_utf8(&mut buf) {
    Ok(resp) => {
        if resp == "" {
            println!("no such key found");
        } else {
            println!("value: {}", resp);
        }
    }
    Err(_err) => {
        println!("in errr");
    }
}
Enter fullscreen mode Exit fullscreen mode

This is all we need to implement a basic single-threaded redis server with set and get commands.

If you run the server and then the client, you will get response value: bar in the terminal. Hurray!!!

CLAP implementation

So far we have hardcoded our Redis commands into the client.rs file, that's not how a client should be sending the data. So, let's try to accept the command from the command line/terminal.

Clap is a command line parser crate in rust. It makes accepting command line arguments super easy.

First things first, add the dependency in Cargo.toml

clap = { version = "3.1.18", features = ["derive"] }
Enter fullscreen mode Exit fullscreen mode

Now, let's import this in client.rs file and start adding needed code. We are changing way we were sending command from client file to server file. Previously, we created to socket connections from client file. Now, we will only create on socket connection per command line run of client with the command line arguments.

File bin/client.rs

use clap::{Parser, Subcommand};

#[derive(Parser, Debug)]
struct Cli {
    #[clap(subcommand)]
    command: Command,
}

#[derive(Subcommand, Debug)]
enum Command {
    Get {
        key: String,
    },
    Set {
        key: String,
        value: String,
    },
}

main() {
    let args = Cli::parse();

    let mut stream = TcpStream::connect("127.0.0.1:8081").await.unwrap();
    match args.command {
        Command::Set { key, value } => {
            stream.write_all(b"set").await?;
            stream.write_all(b" ").await?;

            stream.write_all(&key.as_bytes()).await?;
            stream.write_all(b" ").await?;

            stream.write_all(&value.as_bytes()).await?;
            let mut buf = BytesMut::with_capacity(1024);
            let _length = stream.read_buf(&mut buf).await?;
            match std::str::from_utf8(&mut buf) {
                Ok(resp) => {
                    if resp == "r Ok" {
                        println!("updated key");
                    } else if resp == "Ok" {
                        println!("key set");
                    }
                }
                Err(err) => {
                    // failed to convert bytes into string slice
                    println!("error: {}", err);
                }
            }
        }
        Command::Get { key } => {
            stream.write_all(b"get").await?;
            stream.write_all(b" ").await?;

            stream.write_all(&key.as_bytes()).await?;

            let mut buf = BytesMut::with_capacity(1024);
            let _length = stream.read_buf(&mut buf).await?;
            match std::str::from_utf8(&mut buf) {
                Ok(resp) => {
                    if resp == "" {
                        println!("no such key found");
                    } else {
                        println!("key: {} => value: {}", key, resp);
                    }
                }
                Err(_err) => {
                    println!("in errr");
                }
            }
            return Ok(());
        }
    }

}

Enter fullscreen mode Exit fullscreen mode

In the code above, we are defining clap with subcommand, for this we are creating an Enum of commands with additional parameters and types.

We are doing simple matching on command supplied from the command line and writing the appropriate command on the socket connection for the server to read and execute.

What next?

The current implementation is a single threaded program. In reality, this is not what we want. Imagine that there are 5000 requests coming to our Redis server implementation, the current program will wait for the first request to complete to process the next one. However, we want all the requests to be handled without any wait time. This can be achieved using multi-threading. In the next post, we will change this single threaded implementation to a multi-threaded one. That is where Tokio shines the most with its advanced tokio::spawn and tokio::select functions.

We will also implement a shutdown mechanism powered by tokio channels like broadcast and mpsc strategies. Imagine there are 5000 active connections, what happens if the server is to shutdown. Graceful implementation of shutdown for all threads will be covered in the next post.

Happy Programming!

Top comments (2)

Collapse
 
tinashe_munda_d28e0a180ba profile image
Tinashe Munda • Edited

Hi,
I am following along the article and I am wondering where you are defining the fn new() that you then use to instantiate Db in the server.rs file?
let db = Db::new();

Collapse
 
tinashe_munda_d28e0a180ba profile image
Tinashe Munda

Okay, I think I might have figured it out, at least to get it to work.

impl Db {  
    pub fn new() -> Db {  
        Db {  
            entries: HashMap::new(),  
        }  
    }
Enter fullscreen mode Exit fullscreen mode

PS: I am new to Rust.