New to this series? Check out Parts 1 and 2 first.
Last time, I wrote about how coroutines help enable concurrency, and how they work in JavaScript. I spent some time after that trying to implement coroutines in Ruby and PHP, but inspired by these two articles, I decided to go bigger—implement an event loop.
What's an event loop?
An event loop is a loop that runs your code and does things based on some events. That's vague, I know, but it'll become clearer as we go.
It's called a loop because it really is a loop. In fact, this is basically what happens in an event loop:
function runEventLoop() {
runUserCode();
while (taskQueue.isNotEmpty()) {
runNextTask();
}
}
First, it executes your code, statement-by-statement. Then, if there are any tasks in the task queue (which your code might have added), it executes them one-by-one. It'll keep looping until there are no more tasks, and then it exits. For instance, take this JS code:
// index.js
const x = 3 + 6;
setTimeout(() => {
console.log("I'm a task!");
}, 2000);
fs.readFile("some-file.txt", (contents) => {
console.log("Also a task!");
});
console.log(x);
When you run this with node index.js
, it executes each statement in the file. By the time that's done, there are now two tasks in the queue: the tasks added by setTimeout()
and fs.readFile()
. The event loop waits until those tasks are ready and executes them. When they're completed, the task queue is empty, and the event loop exits, ending the script.
JavaScript's event loop is implicit, so you don't have to run it manually. We'll be implementing an event loop in Ruby, so we'll have to explicitly start the loop with our code inside, so we can enter the loop. Here's how using our event loop will look like:
loop = EventLoop.new
loop.run do
# do stuff
loop.set_timeout(2000) do
puts "I'm a task!"
end
# do stuff
end
Task queues
As we've seen, event loops are powered by a task queue. This queue is typically FIFO (first in, first out)—so tasks that get added earlier will get executed first. Let's implement our EventLoop
class: its constructor creates a new task queue, while its run
method executes the block that it's given, then waits until the task queue is empty.
class EventLoop
def initialize
@queue = TaskQueue.new
end
def run(&entrypoint)
entrypoint.call
until @queue.empty?
callback = @queue.next
callback.call if callback
end
end
end
(We're going to be passing blocks and procs around a lot with and without the &
notation; check out my article on function handles if you're not familiar with those in Ruby.)
Now, let's implement our TaskQueue
. Normally, you'd use a simple array and manipulate it as needed, but there's an important detail: not all tasks are equal. For example, a task on a timer (like setTimeout()
) should be executed when the timer runs out, even if it was added to the queue last. To make things worse, JavaScript has a "microtask" queue—tasks on this queue get run before any other tasks. And then there's process.nextTick()
in Node.js, which runs before the microtask queue. That's a lot of queues!
Node.js' event loop handles this by running in phases: there's a "timers" phase where it checks for any timers that are due, a "pending callbacks" phase where it checks for I/O callbacks (like the callback you passed to fs.readFile()
), and more. The phases have a specific order, so that the tasks with higher priority get executed first.
However, we'll keep things simple: we'll still expose a single queue to our event loop, but that queue actually holds two sub-queues: one for timers, and one for any other callbacks.
class TaskQueue
def initialize
@callbacks = []
@timers = []
end
def empty?
@timers.empty? && @callbacks.empty?
end
# Get the next task from the queue
def next
# TODO
end
end
Timers
Let's see if we can implement our own version of JavaScript's setTimeout
. As we saw earlier, it should take a time and a block to be executed when the time is due:
loop.set_timeout(2000) do
puts "This task will be executed after 2s"
end
Okay, implementation. Running set_timeout
should add a timer task to the queue, telling the queue the time when the task should be executed:
class EventLoop
# ...
def set_timeout(timeout, &callback)
current_time = Time.now.to_f * 1000
@queue.add_timer(current_time + timeout, callback)
end
end
Now, back to the queue. We know that timers have higher priority than other tasks: they should be executed when the execution time is due, even if there are tasks in the queue ahead of them. This is why they have their separate queue. But they also have priority amongst themselves. If I run set_timeout(2000)
and then set_timeout(1000)
, I'd expect the second timer to be executed first, since it has a shorter timeout. There's a data structure for this: a priority queue.
A priority queue is like a regular queue, but it sorts items by their priority. When adding an item you specify its priority; when you ask for the next item, it gives you the one with the highest priority. This fits our use case—we can use the scheduled time as the priority for each timer, so when we ask the queue for a timer, we get the timer that's supposed to fire next.
Rather than implementing mine, I'll be using the priority queue from this library. We'll replace our @timers
array with a priority queue instead. Here's how our queue will work:
- We add timers to the timers queue using the inverse (negative) of their scheduled time (
-scheduled_time
) as priority. We're using the inverse because thisPriorityQueue
gives us highest priority items next, but we want items with the smallest time first (ie the timers that are happening soon). - Whenever
next()
is called on ourTaskQueue
, we fetch the next timer. If its time is due, then we return it, otherwise we return the next callback, ornil
, and try again later.
require 'algorithms'
class TaskQueue
def initialize
@callbacks = []
@timers = Containers::PriorityQueue.new
end
def empty?
@timers.empty? && @callbacks.empty?
end
def next
next_timer = @timers.next
current_time = Time.now.to_f * 1000
if next_timer && (current_time >= next_timer[:scheduled_time])
# Timer's due; remove it from the queue
@timers.pop
return next_timer[:callback]
end
if @callbacks.length
return @callbacks.shift
end
nil
end
def add_timer(scheduled_time, callback)
priority = -scheduled_time
@timers.push({ scheduled_time: scheduled_time, callback: callback }, priority)
end
end
All good!
Coroutines
Let's head back to coroutines, to see how they can be used to implement "asynchronous" operations with an event loop. Here's a Ruby version of our synchronous copy-and-stringify example from last time:
require 'json'
array = 8_000_000.times.map { |i| {a: i} }
copied = [*array]
puts "Copied"
stringified = copied.to_json
puts "Stringified"
Now let's convert this to a coroutine. Ruby doesn't have generators, but it has something called fibers that lets us achieve the same thing (pause and resume execution of a block). Here's how you'd write a fiber:
example_fiber = Fiber.new do
puts "Executing Part 1"
Fiber.yield
puts "Executing Part 2"
Fiber.yield
puts "Executing Part 3"
Fiber.yield
return "Finished"
end
example_fiber.resume # Prints "Executing Part 1"
example_fiber.resume # Prints "Executing Part 2"
example_fiber.resume # Prints "Executing Part 3"
You create a fiber by passing in a block to Fiber.new
. Inside that block, you call Fiber.yield
to pause execution, like JavaScript's yield
. The caller can start or resume the fiber by calling resume
.
Here's how we would write our copying and stringifying tasks as coroutines using fibers.
copy_fiber = Fiber.new do |arr|
copied_array = []
arr.each_with_index do |item, index|
if index > 0 && (index % 50_000).zero?
Fiber.yield # Pause every 50_000 items
end
copied_array.push item
end
puts "Done with copying"
copied_array
end
stringify_fiber = Fiber.new do |arr|
result = "["
arr.each_with_index do |item, index|
if index > 0 && (index % 50_000).zero?
Fiber.yield # Pause every 50_000 items
end
result += item.to_json
result += ","
end
result[result.length - 1] = "]"
puts "Done with stringifying"
result
end
Now let's implement the dispatcher as part of our event loop. We can add a new run_coroutine
method to our EventLoop
class:
class EventLoop
# ...
def run_coroutine(fiber, *args)
fiber.resume(*args)
if fiber.alive?
@queue.add { self.run_coroutine(fiber) }
end
end
end
Key details:
- the
run_coroutine
method takes in the fiber and any arguments to be passed to it, then starts the fiber by runningfiber.resume
with those arguments - we use the
fiber.alive?
method to check if there's still more of the fiber left to execute. If there is, then we add a new task to the task queue.
With that, we've achieved concurrency. By adding a new task to the queue, we're handing control to the event loop to continue the coroutine whenever it's free.
Let's implement the add
method in our TaskQueue
. Its job is very simple: add a new task to the @callbacks
array:
class TaskQueue
def add(&callback)
@callbacks << callback
end
end
Let's test out what we have so far:
loop = EventLoop.new
loop.run do
loop.run_coroutine(copy_fiber, array)
loop.run_coroutine(stringify_fiber, array)
loop.set_timeout(1200) { puts "Executed after 1.2s" }
loop.set_timeout(800) { puts "Executed next" }
loop.set_timeout(2200) { puts "Executed last" }
puts "Executed first"
end
Here it is in action (note: I reduced the size of the array because the Ruby version on repl.it is somewhat slow). Click "▸" to run the main.rb
; click the "Files" icon to see event_loop.rb
:
Sweet!
Continuations
We've implemented coroutines, but we can go a step further: how about if we add a piece of code we want to run after the coroutine is done?Think about fs.readFile()
in Node.js—you pass in the file you want to read, and then a function you want to execute when that is done (known as a callback or continuation).
fs.readFile(fileName, (contents) => {
// do stuff with the file
});
This is helpful in case we have some other operation that depends on the result of the coroutine (for example, our stringifying operation could use the result of the copy operation).
To do this, we add another argument to our run_coroutine
method to represent the continuation. Once the coroutine finishes (fiber.alive?
returns false), we capture its return value and add a new task to the queue to run the continuation later, passing it the return value.
class EventLoop
# ...
def run_coroutine(fiber, *args, &continuation)
return_val = fiber.resume(*args)
if fiber.alive?
@queue.add { self.run_coroutine(fiber, &continuation) }
else
if continuation
@queue.add { continuation.call(return_val) }
end
end
end
end
Now we can do this:
loop.run do
loop.run_coroutine(copy, array) do |copied|
# Continuation after copy
puts "Copy is DONE."
loop.run_coroutine(stringify, copied) do
# Continuration after stringify
puts "Stringify is DONE."
end
end
end
If we wanted to avoid the extra indentation and potential "callback hell", we could wrap this in a custom Promise
implementation, complete with .then/.catch
and even async/await
, but I'll stop here.
For completeness, we can add an add_callback
method to the event loop to provide an easy way for external code to directly add new tasks to the queue—it'll come in handy later.
def add_callback(&callback)
@queue.add &callback
end
Handling requests
Now let's make our event loop actually useful: we'll write a basic HTTP server. Ruby's default execution model is similar to PHP's (blocking and synchronous), so a single-threaded server would have to finish handling one request before it can accept new ones. For example, here's a simple HTTP server:
require 'socket'
server = TCPServer.new("127.0.0.1", 5678)
puts "Listening on localhost:5678"
while socket = server.accept # Wait for a new connection
request = socket.gets.strip
puts "Started handling req #{request}: #{Time.now}"
sleep 5
puts "Responding 5 seconds later: #{Time.now}"
socket.puts <<~HTTP
HTTP/1.1 200 OK
Content-Type: text/html
Hii 👋
HTTP
socket.close
end
When I test this with autocannon making three simultaneous requests (autocannon --connections 3 --amount 3 --timeout 10000 --no-progress http://localhost:5678/
):
Listening on localhost:5678
Started handling req GET / HTTP/1.1: 2021-04-29 10:23:31 +0100
Responding 5 seconds later: 2021-04-29 10:23:36 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 10:23:36 +0100
Responding 5 seconds later: 2021-04-29 10:23:41 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 10:23:41 +0100
Responding 5 seconds later: 2021-04-29 10:23:46 +0100
As expected, requests are handled sequentially. Each request does something that takes 5 seconds, and the thread has to wait for that to finish before handling the next request, so it takes 15 seconds in total. Let's change this to a concurrent setup using our event loop.
def handle_next_request(server, &handler)
begin
socket = server.accept_nonblock
request = socket.gets.strip
puts "Started handling req #{request}: #{Time.now}"
handler.call(socket)
add_callback { handle_next_request(server, &handler) }
rescue IO::WaitReadable, Errno::EINTR
add_callback { handle_next_request(server, &handler) }
end
end
run_loop do
server = TCPServer.new("127.0.0.1", 5678)
puts "Listening on localhost:5678"
handle_next_request(server) do |socket|
set_timeout(5000) do
puts "Responding 5 seconds later: #{Time.now}"
socket.puts <<~HTTP
HTTP/1.1 200 OK
Content-Type: text/html
Hii 👋
HTTP
socket.close
end
# A few extra timers, to look cool 😎
set_timeout(0) { puts "0ms later: #{Time.now}" }
set_timeout(1500) { puts "1.5s later: #{Time.now}" }
end
end
It looks a little complicated at first, but it's not. Here are the important bits:
-
begin/rescue
is Ruby's try/catch -
server.accept_nonblock
is key here.server.accept
(the earlier version) checks to see if there's a new incoming request, and waits until there is one.accept_nonblock
, however, throws an error if there's no incoming connection (this is why we use thebegin/rescue
), giving you the freedom to do other things before checking again. And that's what we're doing here: if there isn't any new connection, we add a task to check again, allowing the event loop to execute any pending tasks first. On the flip side, if there's a request,handle_next_request
will call the handler block you passed in, allowing you to respond to the request.
Note: I made the code a little cleaner by using some Ruby abstractions to hide the creation of the loop, and to make our set_timeout
, run_coroutine
, and add_callback
methods global. See the full code in this gist.
Now, when I try making three concurrent requests:
Listening on localhost:5678
Started handling req GET / HTTP/1.1: 2021-04-29 11:02:24 +0100
0ms later: 2021-04-29 11:02:24 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 11:02:24 +0100
0ms later: 2021-04-29 11:02:24 +0100
Started handling req GET / HTTP/1.1: 2021-04-29 11:02:24 +0100
0ms later: 2021-04-29 11:02:24 +0100
1.5s later: 2021-04-29 11:02:26 +0100
1.5s later: 2021-04-29 11:02:26 +0100
1.5s later: 2021-04-29 11:02:26 +0100
Responding 5 seconds later: 2021-04-29 11:02:29 +0100
Responding 5 seconds later: 2021-04-29 11:02:29 +0100
Responding 5 seconds later: 2021-04-29 11:02:29 +0100
Sweet! The server starts handling one request, schedules the timeouts and switches to another, so no connection is kept waiting. And, in this case, we get a speed boost—the server responds to all requests in 5s total (rather than 15s), since it doesn't wait for one to finish.
Why use an event loop?
You might already be convinced, but let's formally answer this. Why use an event loop? Why try to make the language behave differently? The answer is concurrency (duh).
Event loops are a way to achieve concurrency without having to add more threads or processes. Remember in part 1 of this series, we saw how a single-threaded server in PHP has zero concurrency? With an event loop, we can split up handling one request into separate tasks, allowing for our app to switch over to handling a new request. In fact, the example of a concurrent webserver in PHP uses Amp's event loop, along with asynchronous coroutines like \Amp\File\get()
:
\Amp\Loop::run(function () {
$sockets = [
Server::listen("0.0.0.0:8080"),
];
$server = new HttpServer($sockets, new CallableRequestHandler(function (Request $request) {
$file = yield \Amp\File\get(__FILE__);
return new Response(Status::OK, ["content-type" => "text/plain"], "Hello, World!");
}), new NullLogger);
yield $server->start();
});
Many other popular async libraries implement an event loop, including ReactPHP, async (Ruby), and EventMachine (Ruby).
But it's not just the idea of doing multiple things at once that makes event loops cool. It's the ability to utilise "idle time". For example, with server.accept
, we spend a good deal of time waiting for the next connection. What if we could do something else while waiting? server.accept_nonblock
+ the event loop allows for that. If we go further and implement asynchronous I/O like file reads, database queries, network calls, we'll see the difference even more—in that time our code spends waiting for the OS/database/remote server to return results, we can switch to doing something else.
Event loops shine in event-driven contexts. New events (like new requests, complete database queries) add new items to the task queue, and the event loop is able to switch over and handle those. Nginx uses an event loop, and so do many other GUI systems.
I've mostly mentioned webservers in this series, but concurrency is needed in a lot of places. For instance, games have to process user input while rendering the game world and a making a bunch of calculations. Mobile apps have to handle user input, fetch data from remote APIs, draw UIs and more. An event loop is one way of making this work.
Top comments (0)