The server is a HTTP/2 router
Server Sent Events are "simple" HTTP requests. The browser has a build-in interface, and all we need to make a SSE server is to provide a GET endpoint that sends SSEs. The Javascript front-end will connect and listen to SSEs via a simple call new EventSource(back-endpoint)
(see client code at the end).
We follow the example of Plug base HTTP server to produce a Docker image of a simple mock SSE server written in Elixir with the webserver Cowboy.
To overcome the limitations of SSE over HTTP1.1, we will upgrade from HTTP to HTTP/2 via HTTPS (this is the browsers constraint).
❗ You discover that there is a limit of 50 simultaneous open connections even over HTTP/2. The need for sophistication is therefor questionable. Even if some "more serious" libraries are shown at the end, this seems an important limitation and the prefered protocol should be websocket instead of HTTP since this does not suffer from this low limit, at the expense of easy scalability.
In this simple example, the server will expose 3 endpoints: two GET and one POST. The first GET will be a mock of a periodic signal: we will emit periodically a letter of the alphabet. Why not! The second GET will emit back the message that has been posted to the POST endpoint. The event bus will use Phoenix.PubSub
to pass the state - the payload - between endpoints, so that this can be used in any part of the app. This can even be distributed if needed.
defmodule SSE do
import Plug.Conn
use Plug.Router
# define the front-end urls that are permitted to reach the back-end to the CORS Plug.
@front1 http://localhost:3000
@front2 https://front-end.surge.sh
plug(:match)
# set CORS between the front-end and back-end
plug(CORSPlug, origin: [@front1, @front2])
plug(Plug.SSL, rewrite_on: [:x_forwarded_host, :x_forwarded_port, :x_forwarded_proto])
plug(Plug.Parsers, parsers: [:json], pass: ["text/*", "application/json"], json_decoder: Jason)
plug(:dispatch)
#source emits a random letter every 5 seconds
get "/sse" do
prepare_sse(conn)
|> send_letter()
end
# message posting endpoint that we broadcast on the topic "post"
post "/post" do
with params <- conn.params,
msg <- make_message(params) do
Phoenix.PubSub.broadcast(SSE.PubSub, "post", {:post, msg})
conn |> resp(303, "broadcasted") |> send_resp()
end
end
#source emits an SSE every time a message is received on a topic "post"
get "/post" do
Phoenix.PubSub.subscribe(SSE.PubSub, "post")
prepare_sse(conn)
receive do
{:post, data} ->
chunk(conn, data)
end
conn
end
#function plug
defp prepare_sse(conn) do
conn
|> Plug.Conn.put_resp_header("connection", "keep-alive")
|> Plug.Conn.put_resp_header("content-type", "text/event-stream")
|> send_chunked(200)
end
defp make_message(params) do
data = Jason.encode!(params)
uuid = uuid4()
"event: message\ndata: #{data}\nid: #{uuid}\nretry: 6000\n\n"
end
# we send a letter of the alphabet every 5 seconds
defp send_letter(conn, x \\ "a") do
msg = make_message(%{msg: x})
{:ok, _conn} = chunk(conn, msg)
:timer.sleep(5_000)
send_letter(conn, get_random())
end
defp get_random() do
Enum.map(?a..?z, fn x -> <<x::utf8>> end)
|> Enum.random()
end
def uuid4() do
:uuid.get_v4() |> :uuid.uuid_to_string()|> to_string()
end
end
The application is supervised and the start
function is defined below.
defmodule SSE.Application do
use Application
def start(_type, _args) do
plug_options = [
port: app_port(),
compress: true,
cipher_suite: :strong,
certfile: "priv/cert/sse+2.pem",
keyfile: "priv/cert/sse+2-key.pem",
otp_app: :sse,
protocol_options: [idle_timeout: :infinity]
]
children = [
{Plug.Cowboy, scheme: :https, plug: SSE.Router, options: plug_options},
{Phoenix.PubSub, name: SSE.Pubsub}
]
Supervisor.start_link(children, strategy: :one_for_one, name: SSE.Supervisor)
end
defp app_port do
System.get_env()
|> Map.get("PORT", "4043")
|> String.to_integer()
end
end
The mix file is:
{:plug_cowboy, "~> 2.5"},
{:plug_crypto, "~> 1.2"},
{:cors_plug, "~> 3.0"},
{:jason, "~>1.3"},
{:uuid, ">= 2.0.4", [hex: :uuid_erl]},
{:phoenix_pubsub, "~> 2.0"},
{:credo, "~> 1.6", only: [:dev, :test], runtime: false},
{:httpoison, "~> 1.8", only: [:dev, :test]}
Test it
We will mostly test the interface. We have three ways to test: with cURL
, with Elixir code and with the browser.
Curl
Just cURL
in another terminal.
curl https://localhost:4000/sse
curl -H 'Content-type: application/json' \
-d {"test": "sent me via SSE"}' \ https://localhost:4000/post
Elixir client
Since we are just using HTTP requests, we will use HTTPoison to consume SSEs as a client. You can use HTTPoison.AsyncChunk
to receive SSEs and we keep the connection indefinitely open. Note that this worked with HTTP1.1 but I could not make it work with the :ssl
options despite curl
hitting the https endpoint without any problem.
The way we emit random letters on HTTP connection doesn't help to automate this test. Since we are mainly concerned by the interface, instead of making a "_test.exs" file, just run an iex session to test the interface:
> iex -S mix
# check if the broadcasted message is sent by SSE on "/post"
iex> Test.is_broadcasted("hello") === %{"test"=> "hello"}
# check if the stream is sent by SSE on "/sse"
iex> Test.sse_receiver(20000)
# a list with letters builds up...during 20s
# run 50 simultaneous open connections during 20s
iex> Enum.each(1..50, fn _ -> Test.sse_receiver(20000) end)
❗ This module only works with HTTP
defmodule Test do
@moduledoc false
require Logger
@headers [{"Content-Type", "application/json"}]
@url_post "http://localhost:4043/post"
@url_sse "http://localhost:4043/sse"
defp is_posted(text \\ "ok") do
Phoenix.PubSub.subscribe(SSE.PubSub, "post")
{:ok, msg} = Jason.encode(%{test: text})
case HTTPoison.post!(@url_post, msg, @headers) do
%HTTPoison.Response{status_code: code} ->
code
end
end
def is_broadcasted(text) do
case is_posted(text) do
303 ->
receive do
{:post, data} ->
regexme(data)
end
end
end
def sse_receiver(time) do
m = []
Task.start(fn ->
Logger.info("starting test")
Task.start(fn ->
HTTPoison.get!(@url_sse, [],
recv_timeout: :infinity,
stream_to: self()
)
receiver(m)
end)
Process.sleep(time)
Logger.info("end of test")
Process.exit(pid, :kill)
end)
end
defp receiver(m) do
receive do
%HTTPoison.AsyncChunk{chunk: chunk} ->
data = regexme(chunk)
m = [data["msg"] | m]
Logger.debug(m)
receiver(m)
end
end
defp regexme(text) do
text |> String.split("\n") |> Enum.at(1) |> String.split(" ") |> Enum.at(1) |> Jason.decode!()
end
end
Client code
You can quickly scaffold a React app "create-react-app" and add the tiny component described at the end which will be reactive to Server Sent Events. With CORS enabled, you may not need HTTPS end-to-end in dev mode.
you can set up a secure front-end quickly with Surge: just build the code and run
surge ./build
with it's CLI once it's installed. You will get an url such ashttps://demo.surge.sh
.HTTP2 set-up. For the back-end, we can use a reverse proxy for the TLS termination. For example, Nginx Proxy Manager or Caddy Server automate the certificates for you. Caddy automatically uses HTTP2. We can alternatively terminate the connection directly to the webserver Cowboy: we then need to add self-signed certificates to it (in dev mode). You can use mkcert or the Elixir package X509: generate self-signed certificates with
mix x509.gen.selfsigned
.
We used Valtio to get dynamic rendering, but we could have used useEffect
as well. Use something like the component below in a React scaffold.
const { proxy, useSnapshot } from 'valtio'
const { derive } from 'valtio/utils'
const state = proxy({messages: {letter: null, post: null})
const sse = derive({
getMsg: (get) => {
const evtSource1 = new EventSource(process.env.REACT_APP_SSE_URL_SSE);
evtSource1.addEventListener('message', (e) =>
get(state.messages).letter = e.data
);
const evtSource2 = new EventSource(process.env.REACT_APP_SSE_URL_POST);
evtSource2.addEventListener('message', (e) =>
get(state.messages).post = e.data
);
}
})
const SSE = () => {
const { messages: {letter, post} } = useSnapshot(state)
return <>{letter}{" "}{post}</>
}
Run it
We can do MIX_END=prod PORT=4000 mix run --no-halt
but the idea is to use the mock as a pre-build Docker image.
Dockerfile
We ship a container to deploy the server code. We build a release - so we use a multi-stage Dockerfile - to produce a tiny image (20M) of our Elixir SSE server.
ROM bitwalker/alpine-elixir:latest AS build
ARG NAME
ARG PORT
ENV ENV=${MIX_ENV:-prod}
WORKDIR /opt/app
RUN mix do local.hex --force, local.rebar --force
COPY mix.exs mix.lock ./
COPY config /.config
RUN mix do deps.get --only ${ENV}
COPY lib ./lib
COPY priv ./priv
COPY rel ./rel/
RUN MIX_ENV=prod mix release ${NAME} --quiet
FROM alpine:latest AS app
ARG NAME
ENV PORT=${PORT}
WORKDIR /opt/app
RUN apk --update --no-cache add openssl ncurses-libs libstdc++ libgcc
RUN chown -R nobody: /opt/app
USER nobody
EXPOSE 4043
ENV HOME=/app
ENV NAME=${NAME}
ENV MIX_ENV=prod
COPY --from=build --chown=nobody:root /opt/app/_build/${MIX_ENV}/rel/${NAME} ./
CMD ./bin/${NAME} start
and then build (pass the NAME) and run the image (set the mandatory env PORT)
docker build --build-arg NAME=myapp -t myapp:v1 .
docker run -it --rm --env PORT=4043 -p 443:4043 myapp:v1
SSE libraries
https://github.com/mustafaturan/sse
https://github.com/codenoid/elixir-sse-example/blob/master/lib/sse_example_web/helpers/ticker.ex
https://github.com/CrowdHailer/server_sent_event.ex
Oldest comments (0)