PREVIOUSLY, we created a small proof-of-concept OCaml project to prove out how easy it is to make small but helpful utilities. That project was a simple server that forked off a new process to handle each incoming connection, using nothing but OCaml's built-in libraries. It's a simple, brute-force method, but it quickly reaches a limit in scalability.
To get around this limit, many technologies have evolved over the years. However, as OCaml's Multicore edition (version 5.00) nears its ship date, we will leapfrog past all those technologies and go back to the future–direct-style nonblocking concurrent I/O, running in parallel threads.
To enable access to all these features, an exciting new library called Eio is being developed. It uses a new paradigm of direct-style concurrent I/O programming, without the need for monads or async/await, thus avoiding the function colour problem.
Let's update our example server to use Eio, with the following goals:
- Run multi-threaded (N parallel threads where N is the number of CPUs). OCaml's threads are called 'domains' and they map 1:1 to OS threads.
- Handle each incoming request in a non-blocking fiber in each domain.
Note: to better understand the rest of this post, I highly recommend reviewing the previous proof-of-concept project.
Fibers are essentially non-blocking green threads that run in each domain. A domain can run one fiber at a time. When a fiber gets blocked waiting for I/O, it yields and lets another one run. This achieves concurrency. Meanwhile, other fibers can run in parallel in other domains. This achieves parallelism. By combining the two, we can fully saturate all our CPU cores.
Now, let's install Multicore OCaml. It hasn't officially shipped yet, so we will get the alpha version. Note, refer to the Up and Running page to install opam, the OCaml Package Manager. Then:
# 'mc' is just a short name we choose # We need to explicitly add the opam alpha repository with higher priority than the default repo, to allow installing some multicore-only libraries: opam switch create mc 5.0.0~alpha1 --repositories=mc=git+https://github.com/kit-ty-kate/opam-alpha-repository.git,default eval $(opam env) opam install dune utop eio
Now, we create the statsd filter project almost exactly the same way as before:
mkdir ocaml_statsd_filter cd ocaml_statsd_filter
(lang dune 3.4)
(executable (name ocaml_statsd_filter) (libraries str eio_main))
Now, we come to the first interesting changes. A couple of the configs need to be expressed slightly differently. Here's the
let num_threads = try int_of_string (Sys.getenv "num_threads") with Not_found -> Domain.recommended_domain_count let listen_port = try int_of_string (Sys.getenv "listen_port") with Not_found -> 8125 let target_host = try Sys.getenv "target_host" with Not_found -> "localhost" let target_port = try Sys.getenv "target_port" with Not_found -> "8126" let blocklist = try "blocklist" |> Sys.getenv |> String.split_on_char ',' |> List.map Str.regexp_string with Not_found -> 
We add a new
num_threads config variable to allow overriding the default, which is the 'recommended number of domains' from the standard library.
target_host can now be just a host name as we are now using the
Eio.Net.getaddrinfo_stream function (shown below) to look up the IP address.
The rest of the configs are unchanged.
Finally, we come to the main file,
open Eio let max_size = 8192 let listen_addr = `Tcp (Net.Ipaddr.V4.any, Cfg.listen_port) let target_addr net = match Net.getaddrinfo_stream net Cfg.target_host ~service:Cfg.target_port with |  -> invalid_arg Cfg.target_host | addr :: _ -> addr let allow data = Cfg.blocklist |> List.exists (fun regexp -> Str.string_match regexp data 0) |> not let on_error = traceln "Connection handling error: %a" Fmt.exn let main net new_domain = Switch.run @@ fun sw -> let target = Net.connect ~sw net (target_addr net) in let listen_socket = Net.listen ~backlog:128 ~sw net listen_addr in traceln "Listening on :%d" Cfg.listen_port; let domain_loop () = new_domain @@ fun () -> let domain_id = (Domain.self () :> int) in Switch.run @@ fun sw -> while true do Net.accept_fork ~sw listen_socket ~on_error @@ fun client _ -> let buf_str = client |> Buf_read.parse_exn ~max_size Buf_read.take_all |> String.trim in if allow buf_str then begin Flow.copy_string buf_str target; traceln "Domain %d: sent: '%s'" domain_id buf_str end else traceln "Domain %d: did not send: '%s'" domain_id buf_str done in let domains = List.init Cfg.num_threads (fun _ -> domain_loop) in Fiber.all domains let () = Eio_main.run @@ fun env -> main (Stdenv.net env) (Domain_manager.run @@ Stdenv.domain_mgr env)
This has quite a lot of new concepts. The first thing we notice is that the old
Unix module open is replaced with
Eio. Eio moves away from the old-style I/O which was very much modelled on Unix/C system calls, and tries to introduce new, safer abstractions for raw I/O.
Another change is in how we express network addresses: we now use the types and functions in the
Eio.Net module to construct them.
The allowlist data checking is unchanged as it uses only the regular expressions module,
We introduce an error handler to report on any exceptions that occurred while handling requests.
Finally, we come to the meat of the server. This is a
main function which takes two parameters: an
Eio.Net.t 'capability', and a function which can spawn a new domain safely. Eio encourages a 'capabilities style' of programming, in which functions are passed in only sufficient permissions to do the work they need to, and nothing else. This is explained quite extensively in the excellent readme documentation.
With the capabilities given to the
main function, we can now start a new 'switch' (essentially, a resource manager that safely disposes of all open resources when their containing scope ends), then use it to connect to the target (upstream StatsD) address and also bind to the listening address on the local host.
We then define the 'domain loop' (which is my little pun on 'main loop') function. This function immediately spawns a new domain and returns. Asynchronously in the new domain, we capture the domain ID (an integer) for logging purposes, then start handling requests from our clients. The key to this is the
Eio.Net.accept_fork function, which handles each client connection in a new fiber. This is where the asynchronous I/O magic happens. In each domain we handle each request in a new fiber. So essentially, N domains * M fibers.
The other point of note here is
Eio.Flow which is an abstraction over byte streams. The actual data being shunted back and forth in requests and responses is done in terms of flows, e.g.
Flow.copy_string buf_str target. And we no longer need to think in terms of 'read N bytes from that socket, then check that N bytes were read'. Flows handle all that for us.
In fact they handle it a little too well, and a client could potentially send a gigantic request which could eat up all our memory. To prevent this, we use
Eio.Buf_read.parse_exn ~max_size Buf_read.take_all, which will take all the characters from the source flow, upto our
max_size limit–which is actually the same as before, 8192 bytes.
max_size parameter is explained in
Notice that we output some trace log messages to prove to ourselves that multiple domains are actually running in parallel. We will show this in action later. We spin up the domains in the final line of the
Fiber.all domains, which runs each of the domain spawns concurrently in new fibers.
The final lines are about calling the
main function with the required arguments. To get those, we need to call
Eio_main.run and pass it a callback in which we set up the arguments with the
env is the ultimate source of all the capabilities, and from it we extract only the exact permissions we need for our
main function–namely accessing the network, and running new domains.
You may have noticed a perhaps unexpected number of callbacks in this code. In most modern async I/O code today we see almost no callbacks–everyone has switched to using monadic promises/futures (async/await style). So why does Eio reintroduce a seemingly retro style?
There are two main types of callbacks:
- Callbacks to access resources which the library then automatically cleans up for us, e.g. the Eio environment, switches
- Callbacks to delay execution of code which needs to run in new domains or fibers
In my opinion, the second type of callback is most likely unavoidable. This is because we are not using any specific effect system which can delay execution and move it to different fibers/domains for us. We're using direct style and need to delay such executions ourselves. Fortunately, the type system tells us exactly where callbacks are required.
The first type of callback may go away in future iterations of OCaml and Eio, if say we standardize on a let-operator like
let& to indicate 'use a resource'. E.g., hypothetically,
let& sw = Switch.run in ... let& env = Eio_main.run in ...
This remains to be seen however and for now a callback is the obvious way to do it. The main win here is that async I/O calls don't need callbacks–we read the client's request as a string directly, and sent it to the target address again, directly. Eio takes care of the boring details of yielding the fiber when it's blocked on I/O, and resuming when it's unblocked.
Now, we need to actually run this to prove to ourselves that it works. We'll need three terminals:
Mock StatsD server on port 8126 (or optionally a real one if you prefer). I just spin up a Python HTTP server:
python3 -m http.server 8126. It doesn't matter what type of server it is, we just need something listening on the correct port.
Run our filter:
OCAMLRUNPARAM=b blocklist=foo,bar dune exec ./ocaml_statsd_filter.exe. This will immediately throw an exception and exit if the server in (1) is not running.
Finally, send a few requests to our filter:
export i=0 while [ $i -lt 50 ]; do echo "bar:$i|c" | nc localhost 8125; i=$(expr $i + 1); done
This sends about 50 requests to our filter and gives it a chance to go through multiple domains. If you check the trace logs, they look like this:
+Listening on :8125 +Domain 1: did not send: 'bar:0|c' +Domain 2: did not send: 'bar:1|c' +Domain 3: did not send: 'bar:2|c' +Domain 4: did not send: 'bar:3|c' +Domain 5: did not send: 'bar:4|c' +Domain 6: did not send: 'bar:5|c' +Domain 7: did not send: 'bar:6|c' +Domain 8: did not send: 'bar:7|c' +Domain 1: did not send: 'bar:8|c' +Domain 2: did not send: 'bar:9|c' +Domain 3: did not send: 'bar:10|c' +Domain 4: did not send: 'bar:11|c' +Domain 5: did not send: 'bar:12|c' +Domain 6: did not send: 'bar:13|c' +Domain 7: did not send: 'bar:14|c' +Domain 8: did not send: 'bar:15|c' +Domain 1: did not send: 'bar:16|c' ...
The domains are cycled through in a round-robin manner, which proves that they're all running in parallel (otherwise domains 2 through 8 would be blocked on 1, and the trace would print out only domain 1 for every request).
Now we come to the end of our little experiment. Hopefully this gives you some idea of what's coming in OCaml 5.00! For fun, if you want to compare this code to its original inspiration, the equivalent Rust project is here: https://github.com/askldjd/statsd-filter-proxy-rs/blob/main/src/server.rs