DEV Community

loading...

Rolling your own MOM, or how I did it - Project setup

cppchedy profile image cppchedy Updated on ・14 min read

Introduction

If you were bored reading the previous article, sorry about that, yes it was boring but unfortunately it's required. However, from now on, We get to the concrete stuff where you could test and follow along if you want.

The main point of this part is to present the entry point of the middleware, the top level class of the code. Therefore, we will be building up to reach that goal through a Seastar introduction, the project structure and finally get to code.

Seastar

Seastar is an open source c++ framework designed to help build highly-performant server application. It's used by ScyllaDb, a high-performance NoSQL Database.

One of the key feature of Seastar is it's event-driven nature which makes it easy for it's user to write non-blocking, asynchronous code. Moreover, Seastar architecture is based on the following points:

  • Shared-nothing Design: Seastar uses a shared-nothing model that shards all requests onto individual cores.

  • Futures and Promises: An advanced new model for concurrent applications that offers C++ programmers both high performance and the ability to create comprehensible, testable high-quality code.

  • High-performance networking : Seastar offers a choice of network stack, including conventional Linux networking for ease of development, DPDK for fast user-space networking on Linux, and native networking on OSv

  • Messaging passing : A design for sharing information between CPU cores without time-consuming locking.

A little detour

Giving that I will be using Seastar through out this series, I thought that I should include a "How to use Seastar" section. I think it make sense to have this detour before going further because it will make explanations of the MOM code much simpler.

I assume that you are already familiar with the concept of Future and promise. If you would like to dig deep into this amazing framework and learn it properly then I recommend this: Tutorial, Demos and "real world" apps.

To keep things short, We will see only basic examples and present the skeleton of Seastar programs.

Program Structure

Let's start with the general structure of a Seastar program:

#include <seastar/core/app-template.hh>
//other includes...

int main(int argc, char** argv) {
    seastar::app_template app;
    /*
      if you want to add option for your application
    */
    app.add_options()
        ("foo", bpo::value<std::string>(), "foo arg")
        ;
    app.run(argc, argv, [] {
            //write your logic here
            //... could retrieve args

            return /*return a future*/;
    });
}
Enter fullscreen mode Exit fullscreen mode

Generally, every Seastar program you will write, will follow this structure: you instantiate an object of type app_template and then call on run method which takes 3 args. The last one is the most relevant for us: it's a lambda containing the logic of our application.
The app_template manage configuration needed by Seastar engine: logging config, CPU config, memory configs, etc... when we start our app, we can tweak those args as needed. For example, you can control the number of threads by issuing -cNB where NB is the number of threads you want. Note that if you choose a number higher then the number of hardware threads you have on your machine, you will get a runtime exception. I like this choice, it reduces the overhead caused by threads switches.

As for run, it starts the runtime(scheduler,eventloop....) that take care of executing handlers. we will discuss this later on.

Examples

Although using Seastar brings many advantages like performance boost, Seastar asynchronous programming model comes with it's own trouble: We write code in an unconventional way and we have to pay attention to new things. Many assumptions we hold are no longer true and this is because of Seastar asynchronous, non-blocking aspect. The following examples shows some of the issues we need to be aware of and how we can address them.

The first example is for introduction. it just sleeps about two seconds and print out "this is not the right way to learn Seastar."

#include <seastar/core/app-template.hh>
#include <seastar/core/sleep.hh>
#include <iostream>

int main(int argc, char** argv) {
    seastar::app_template app;
    app.run(argc, argv, [] {
        using namespace std::chrono_literals;
        return seastar::sleep(2s).then([] {
            std::cout << "this is not the right way to learn Seastar.\n";
        });
    });
}

Enter fullscreen mode Exit fullscreen mode

Note you can see the code in compiler explorer.

In this program, sleep sets up a timer and return a future that will resolve in 2s. We chain on this future with a lambda that prints "this is.." to std::cout.

The problem we have to constantly think about is object lifetime or scope. Unlike nodejs/javascript's closure, C++'s lambda doesn't spare us from dealing with object/variable accessibility. Depending on what we are doing, we can access an invalid state resulting in wrong results or we could get a runtime error. The following example demonstrates a common mistake that new users of Seastar make:

//includes ...

seastar::future<> dummy_fn(int offset) {
  int bad = 8;
  return seastar::make_ready_future<int>(offset)
      .then([&bad](int nb) { bad += nb; })
      .then([&bad] {
        bad++;
        std::cout << bad << '\n';
      });
}

int main(int argc, char **argv) {
  seastar::app_template app;
  return app.run(argc, argv, [] {
    auto ftr1 = dummy_fn(12);
    auto ftr2 = dummy_fn(46);

    return when_all(std::move(ftr1), std::move(ftr2))
        .then([](std::tuple<seastar::future<> , seastar::future<>> tf) {
            std::cout << "done computing";
            return 0;
        });
  });
}
Enter fullscreen mode Exit fullscreen mode

You can check out the code in Compiler Explorer.

First let's analyze main: as usual, we have app_template and we call run to start Seastar engine and execute the task. Next the lambda: we call dummy_fn two times. This function does nothing special, it's only purpose is to show the mistake. For now, let's only keep in mind that it returns a future. The last statement start with when_all. This means that we will be waiting for futures ftr1 and ftr2 to resolve so that we can carry on with the next task(printing "computation done").

Now let's take a look at dummy_fn: We have a variable bad on which some calculation is performed. these calculation are dispatched in lambdas chained one after another. before that, we make a ready_future(one that resolves immediately). this future resolve to an int taken from the argument offset. The problem is the capture by reference of bad : remember that we are executing asynchronously and without blocking which means we are submitting tasks to Seastar scheduler and not executing them, therefore, when the lambdas will run, bad will be already destroyed because we returned from dummy_fn and we will get wrong results.

So a general rule, if you are capturing things by reference you will mostly get in trouble unless you know what you are doing. so the default here will be capture by value.

Seastar developer thought already about this problem and supplied us with the necessary utilities.

To correct the precedent example, we could use do_with function. Then dummy_fn becomes :

seastar::future<> dummy_fn(int offset) {
  return seastar::do_with(int{8}, [offset](int &bad) {
    return seastar::make_ready_future<int>(offset)
        .then([&bad](int nb) { bad += nb; })
        .then([&bad] {
          bad++;
          std::cout << bad << '\n';
          return seastar::make_ready_future();
        });
  });
}
Enter fullscreen mode Exit fullscreen mode

We solve the dangling ref problem by using do_with. This utility manages for us the lifetime of object until we no longer need to use it. In short, it allocates memory and moves the values we will be using there then when the lamnda finish executing, it frees the memory. You should note that Seastar manages it's own memory which is pre-allocated from the start. Also note that you can tweak memory allocation size.

In addition to the previous solution, We can use reference counting to keep the variable alive until we finish computing. Using seastar::shared_ptr, dummy_fn becomes as follows:

seastar::future<> dummy_fn(int offset) {
  auto bad = seastar::make_shared<int>(8);
  return seastar::make_ready_future<int>(offset)
      .then([bad](int nb) { *bad += nb; })
      .then([bad] {
        (*bad)++;
        std::cout << *bad << '\n';
      });
}

Enter fullscreen mode Exit fullscreen mode

Seastar offer it's own shared_ptr which is a non-thread safe version of std::shared_ptr.
In dummy_fn, we make a sstar shared_ptr by calling make_shared and capture it by copy to all other lambdas. This way, we no longer have the life time problem.

Generally, we will be using shared_ptr but when we have a variable needed within a specific scope we may use do_with.

Our final example is rather oriented to expose more of Seastar constructs then to show a specific problem. With this sample, we introduce looping. Seastar offer an elegant solution for looping: keep_doing, repeat, repeat_until_value, etc.... The following is an example of using do_until. We get a string with --message option then we print it char by char:

#include <core/app-template.hh>
#include <core/sleep.hh>
#include <iostream>
#include <string>

namespace bst = boost::program_options;

int main(int argc, char **argv) {
  seastar::app_template app;
  app.add_options()("message", bst::value<std::string>(), "Message to print");
  return app.run(argc, argv, [&app] {
    auto &&config = app.configuration();
    seastar::do_with(config["message"].as<std::string>(), int{0},
                     [](std::string &msg, int &i) {
                       return seastar::do_until(
                           [&i, &msg] { return i >= msg.size(); },
                           [&i, &msg] { 
                               std::cout << msg[i] << '\n';
                               ++i;
                               return seastar::make_ready_future();
                           });
                     });
    return seastar::make_ready_future();
  });
}
Enter fullscreen mode Exit fullscreen mode

Compiler explorer link

We begin by adding message option by calling add_options. In main lambda, we start by getting the configuration and passing the value of --message option to do_with along with a counter initialized with 0.

I should point out that the lambda passed to do_with must respect the order and the type of the other parameters: for example if we want do_with to manage an int and float we write:
do_with(int{9}, float{13.01}, [](int &arg1, float& arg2) { ...});
I hope I was clear on this point.

Now, let's get to the looping part, do_until. This function repeats a task until some condition is met. It accepts two lambdas, the first one is a predicate and the second one is the body of the loop. We check each time the value of the counter with respect to the string size. The loop end when i becames greater then the size of msg.
For the body, we are printing each char and advancing the counter.

Hopefully, we can now go back to the MOM with a basic understanding of Seastar.

First steps

Setting up the project

First, we are going write cmake Files and prepare our dependency with conanfile.txt. I am assuming that you have installed cmake and conan So let's proceed to project structure:

As you can see from the picture, We have 3 main folders: one for cmake scripts, another one that contains the application it self, src, and a folder containing tests.

To integrate clang-format and clang-tidy with cmake, I found these cmake-scripts. note that there are other goodies there. all credits go to maintainers. copy the files into the cmake folder.

let's take care of conanfile.txt

[requires]
gtest/1.8.0@lasote/stable
spdlog/1.3.1@bincrafters/stable 

[generators]
cmake
Enter fullscreen mode Exit fullscreen mode

we are using google's lib gtests for our tests and spdlog for logging. And we are specifying cmake as our generator.

root CMakeLists.txt:

cmake_minimum_required(VERSION 3.10)
project(Moza)

#includes cmake scripts
include(cmake/c++-standards.cmake)
include(cmake/compiler-options.cmake)
include(cmake/sanitizers.cmake)
include(cmake/formatting.cmake)
include(cmake/tools.cmake)

# require Seastar and use it's boost dependency
find_package(Seastar REQUIRED)
include_directories(${Boost_INCLUDE_DIR})

#include conan generated file for camke 
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup(TARGETS)

#instruct seastar to use std::optional, std::variant etc, instead of it's own impl
add_definitions(-DSEASTAR_USE_STD_OPTIONAL_VARIANT_STRINGVIEW)

# clang format and clang-tidy integration
file(GLOB_RECURSE FFILES *.[chi]pp *.[chi]xx *.cc *.hh *.ii *.[CHI] )
clang_format(format ${FFILES})

clang_tidy(-format-style=file -checks=* -header-filter='${CMAKE_SOURCE_DIR}/*')

# we add only src folder
add_subdirectory(src)

Enter fullscreen mode Exit fullscreen mode

Depending on the way you compiled and installed Seastar, you may need to add extras arguments for cmake so that it can find the required library. I will show how I am compiling the project later on in this article.

Next, let's look at CMakeLists.txt inside src folder:

file(GLOB SRC_CORE lib/core/*.cpp lib/core/*.hh)


add_library(MozaLib ${SRC_CORE})
target_link_libraries(MozaLib Seastar::seastar CONAN_PKG::spdlog)


file(GLOB SRC_SERV server/*.cpp)
add_executable(Moza main.cpp ${SRC_SERV})
target_link_libraries(Moza Seastar::seastar MozaLib CONAN_PKG::spdlog)
Enter fullscreen mode Exit fullscreen mode

We are done with preparation now so let's now get to the source folder. I structured src in a way that separate modules from the actual logic of the MOM: We have a lib sub folders containing our modules, server containing the source code of the class server and a main.cpp file.

Main

This is the lisiting of main.cpp:

#include <seastar/core/app-template.hh>
#include <seastar/core/distributed.hh>

#include "server/server.hh"

int main(int ac, char **av) {
  using namespace seastar;
  using namespace net;
  distributed<server_tcp> tcp_server;
  app_template app;
  return app.run(ac, av, [&] {
    engine().at_exit([&] { return tcp_server.stop(); });
    // here is not there
    return tcp_server.start_single().then(
        [&tcp_server] { return tcp_server.invoke_on(0, &server_tcp::start); });
  });
}
Enter fullscreen mode Exit fullscreen mode

As evoked earlier, Seastar is an event-driven framework build around the Reactor pattern. Each Seastar application has a Reactor(or an event-loop) for each logical core(or hardware thread), of course assuming default configurations(no --c flag). These event-loop are completely independent from each other(shared nothing design).

To help us shard our logic over the available core, Seastar developer introduced seastar::distributed. This utility take core of distributing the service(in our case server_tcp) passed as a template argument to all hardware threads.

we start by getting the local reactor instance of the executing core and attach a lambda that shutdown our service when we are leaving. distributed require the service to have a stop method. that's the only requirement.

Next, we call start_sinlge then invoke server_tcp::start on the shard(core) 0. start_sinlge activate only one shard.

previously I said that I wasn't using Seastar full potential. You can see that here: I am using only one shard. This is actually a mistake in my design. I didn't account for sharding from the beginning, I totlally forgot. I was stressed because deadlines are approching So even when I saw my mistake I didn't take the time to fix it even though I thought of a couple of solutions but they required changes to the protocol, something I didn't want to mess with. Hopefully, This will a topic for another post.

Server class

Let's focus now on the server, our service class. Let's get an overview before digging into the code. Note that the move and copy constructors/assignment_ops are deleted.

The following is the listing of src/server/server.hh :

#pragma once

#include <seastar/core/reactor.hh>
#include <seastar/core/temporary_buffer.hh>
#include <seastar/net/api.hh>

#include "spdlog/spdlog.h"

#include "lib/core/connection.hh"
#include "lib/core/topology.hh"
#include "lib/protocol/parser.hh"
#include "lib/identity/identity.hh"
#include "lib/identity/identity_generator.hh"

using namespace seastar;
using namespace net;


class server_tcp {
  Topology _tp;
  lw_shared_ptr<server_socket> _listener;
  std::vector<lw_shared_ptr<Connection>> connections;
  uint16_t _port = 11212;

  std::string handleRequest(Specification sp, socket_address sco);

public:
  server_tcp() =default;
  ~server_tcp() =default;

  server_tcp(const server_tcp&) =delete;
  server_tcp(server_tcp&& ) =delete;

  server_tcp& operator=(const server_tcp&) =delete;
  server_tcp& operator=(server_tcp&&) =delete;

  server_tcp(uint16_t port) : _port{port} { }

  void start();
  future<> stop();

  future<> handleConnection(lw_shared_ptr<Connection> conn);

};
Enter fullscreen mode Exit fullscreen mode

Server stores a shared ptr to a socket for listening, the port it will be listening on and an object of type Topology. let's ignore the vector of connection for now.

Topology is the core of the server. it holds all the entitis of our Messaging model, we will discussed in more details in the next part of this series.

let's focus on start and handleConnection.

void server_tcp::start() {
  spdlog::info("server is staring...");
  listen_options lo;
  lo.reuse_address = true;
  _listener = engine().listen(make_ipv4_address({_port}), lo);

  spdlog::info("listening on port {}", _port);
  keep_doing([this] {
    return _listener->accept().then(
        [this](connected_socket fd, socket_address addr) mutable {
          auto conn = make_lw_shared<Connection>(std::move(fd), addr);
          handleConnection(conn);
        });
  });
}
Enter fullscreen mode Exit fullscreen mode

start member function is simple. it makes a listener with the method listen of reactor class. we get the local instance of reactor through the call of engine function. Next, we start accepting connection with accept method from the listener and handle connections. We repeat this process using keep_doing utility from Seastar. Note that we are storing file descriptor(connected_socket) and the address of the client(socket_address) in a dedicated class, Connection. It enables communication with the client in both directions. let's look at the code:

//src/lib/core/connection.hh
#pragma once

#include <seastar/core/temporary_buffer.hh>
#include <seastar/net/api.hh>

using namespace seastar;
using namespace net;


class Connection {
public:
    connected_socket m_fd;
    input_stream<char> m_in;
    output_stream<char> m_out;
    socket_address m_address;
    bool m_enabled;

  public:
    Connection(connected_socket fd, socket_address addr)
      : m_fd{std::move(fd)}, m_in{m_fd.input()}, m_out{m_fd.output()}, m_address{addr}, m_enabled{true} {
      m_fd.set_keepalive(true);
      m_fd.set_nodelay(true);
    }

    Connection(Connection &&orig)
      : m_fd{std::move(orig.m_fd)}
      , m_in{std::move(orig.m_in)}
      , m_out{std::move(orig.m_out)}
      , m_address{orig.m_address}
      , m_enabled{orig.m_enabled} {
    }

    Connection &operator=(const Connection &rhs) = delete;


    Connection &operator=(Connection &&rhs) = default;

    ~Connection() {
      closeStream();
    }

    void closeStream() {
      m_enabled = false;
      m_fd.shutdown_input();
      m_fd.shutdown_output();
    }

    future<temporary_buffer<char>> readFromStream() {
      return m_in.read();
    }


    bool isAlive() {
      return m_enabled;
    }

    socket_address getRemoteAddress() const {
      return this->m_address;
    }
  };
Enter fullscreen mode Exit fullscreen mode

The most important bits are output_stream and input_stream with which we can read/write from/to the client. We open the streams with input() and output() in Connection constructor.

Let's pass to handleConnection. This method reads from socket, parse the packet and execute the task(and finally report back). AS for now we are missing many modules and we didn't explain yet the messaging model nor the protocol, therefore, we will write it such that it reads some byte, log the request and send a response. Expect it to change when we advance in the series.

seastar::future<> server_tcp::handleConnection(lw_shared_ptr<Connection> conn) {
  return conn->readFromStream()
      .then([this](temporary_buffer<char> buf) {
        std::string str;
        std::copy(std::begin(buf), std::end(buf), std::back_inserter(str));
        spdlog::info("received data : {}", str);
      })
      .then([this, conn] {
        if (conn->isAlive()) {
          return conn->m_out.write("WYSIWYG")
              .then([conn] { return conn->m_out.flush(); })//lambda 1
              .then([conn] { return conn->m_out.close(); })//lambda 2
              .then([conn] {});//lambda 3
        }
        return make_ready_future();
      });
}
Enter fullscreen mode Exit fullscreen mode

readFromStream returns a future of temporary_buffer passed for the next continuation in the chain when it's ready. temporary_buffer is an efficient container designed with the zero copy idiom and is move only. We copy the buffer into a std::string and log the data. Next we send a message to the client.

Note the use of flush after write. we make sure the data is send to the client and we close the connection.

Notice also that the last lambda in the chain is empty but it captures the conn by copy. This line is required to prevent an access to a destroyed object(in our case it's m_out): calling conn->m_out.close(); arranges for the connection to be closed and returns a future representing that event. Excluding lambda 3 from the chain, triggers the destruction of conn at the end of lambda 2. However, we still need m_out to be alive until we finish with closing the connection. This is why we need it, even though it's empty.

For now stop returns a ready future :

future<> server_tcp::stop() {
  return make_ready_future<>();
}
Enter fullscreen mode Exit fullscreen mode

Now that we finished start and handleConnection we can pass to compiling the project.

Compiling the project

We will start by listing commands you need to type in order to compile the project then speak about some technicalties you need to be aware of.

To compile the project first you need to make a new folder in the root directory called build. This contains all results of compilation, conan, etc....

mkdir build
Enter fullscreen mode Exit fullscreen mode

we cd to build then type:

conan install ..
Enter fullscreen mode Exit fullscreen mode

This will install all required package described in the conanfile.txt
, specifially gtest and spdlog.

Next, we build the project with cmake:

seastar_dir=/path/to/seastar
cd ..
CC=gcc-8 CXX=g++-8 cmake -DCMAKE_PREFIX_PATH="$seastar_dir/build/;$seastar_dir/build/_cooking/installed" -DCMAKE_MODULE_PATH=$seastar_dir/cmake -H. -Bbuild
Enter fullscreen mode Exit fullscreen mode

You need to point to Seastar's cmake modules required for find_package and also where you installed Seastar.

I should say that you need to comment the includes lines from the files we worked on and the Topology attribute of server_tcp before building because we didn't work on them yet.

Also, Note that I added my own profile for conan so when I run it I pass an extra command: -pr=myProf. The profile file is located in ~/.conan/profiles if you keeped the default configuration when you installed conan. let's take a look at the file:

[settings]
os=Linux
arch=x86_64
compiler=gcc
compiler.version=8.1
compiler.libcxx=libstdc++
build_type=Release
[options]
[build_requires]
[env]

Enter fullscreen mode Exit fullscreen mode

I build Seastar using the cooking.sh way from their how to on building Seastar.

Conclusion

In this post we learned how to use Seastar and we wrote the first building blocks of our project. I hope you have enjoyed reading.

Next Part

Next on this series, We will discuss the Messaging model of our system, the core of MOM.

Discussion (0)

pic
Editor guide