DEV Community

Marcio Lopes de Faria
Marcio Lopes de Faria

Posted on

Acme Pub Sub, an Elixir exercise

Recently I was rejected by a company that uses Elixir ostensively after expending a lot of energy engaging in the interview process, so I was thinking of ways to get something positive. I completed a specific exercise, but one hour is a small time-box to do something solid, and I think that the way I finished it was not what they expected, so I'm here learning in public, attempting a different approach.

The exercise mandates to transform the echo server https://elixir-lang.org/getting-started/mix-otp/task-and-gen-tcp.html#echo-server, into a broadcast pub-sub server.

My first approach was to make use of an Agent to maintain all accepted sockets. When some server receives a message from one client, it iterates over all other accepted sockets persisted on the Agent, sending each message back for each client socket. Pretty Simple!

This are all the Snippets of changes made to make this first attempt work.

# in lib/acme_pub_sub/application.ex
  def start(_type, _args) do
    port = String.to_integer(System.get_env("PORT") || "4040")

    children = [
      {Task.Supervisor, name: AcmePubSub.TaskSupervisor},
      %{
        id: AcmePubSub.ClientStorage,
        start: {Agent, :start_link, [fn -> [] end, [name: AcmePubSub.ClientStorage]]}
      }, # An supervised Agent was included to hold all clients
      Supervisor.child_spec({Task, fn -> AcmePubSub.accept(port) end}, restart: :permanent)
    ]

    opts = [strategy: :one_for_one, name: AcmePubSub.Supervisor]
    Supervisor.start_link(children, opts)
  end

# in lib/acme_pub_sub.ex

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    Logger.info("Accepted new connection")

    {:ok, pid} = Task.Supervisor.start_child(AcmePubSub.TaskSupervisor, fn -> serve(client) end)

    # All clients are stored here.
    Agent.update(
      AcmePubSub.ClientStorage, 
      fn clients -> 
        [client | clients] 
      end
    )

    :ok = :gen_tcp.controlling_process(client, pid)

    loop_acceptor(socket)
  end

  defp serve(socket) do
    received_message = read_line(socket)
    Logger.info("received message #{received_message}")

    # get all clients
    clients = Agent.get(
      AcmePubSub.ClientStorage, 
      &Function.identity/1
    )

    # iterate over all clients, broadcasting all messages
    for client <- clients,
        socket != client do
      write_line(received_message, client)
    end
  after
    serve(socket)
  end
Enter fullscreen mode Exit fullscreen mode

This first version is on github under the first-version branch:

https://github.com/marciol/acme_pub_sub/tree/first-version

A better approach

But it'd be better to encapsulate the access to client sockets so that only the specific process can read and write to their sockets.

An aside note: Writing tests for this specific kind of application is challenging, because of the need to ensure that all receiving test clients are set up just before starting to broadcast. Thanks to Elixir Tasks it is possible to start all receiving clients asynchronously and get all received messages directly on the caller test process, and with a little trick, get all moving parts synchronized.

So, with this working solution, it'd be nice to make it even better, establishing a way to broadcast messages without writing directly on those sockets.

One interesting approach would be to separate two kinds of servers, an input server responsible for listening to all received messages and dispatch them to the correspondent output servers.

The input server must know about the corresponding output server, so that if the client closes the socket, it sends a close signal to output server.

# in lib/acme_pub_sub.ex

  defp loop_acceptor(socket) do
    {:ok, client_socket} = :gen_tcp.accept(socket)
    Logger.info("Accepted new connection")

    # the output server task
    {:ok, output_pid} =
      Task.Supervisor.start_child(
        AcmePubSub.TaskSupervisor,
        fn ->
          output_server(client_socket)
        end
      )

    # the input server task
    {:ok, input_pid} =
      Task.Supervisor.start_child(
        AcmePubSub.TaskSupervisor,
        fn ->
          input_server({client_socket, output_pid})
        end
      )

    # all connected input and output corresponding clients
    Agent.update(
      AcmePubSub.ConnectedClients,
      fn clients ->
        [{input_pid, output_pid} | clients]
      end
    )

    loop_acceptor(socket)
  end

  defp input_server({client_socket, output_pid}) do
    case read_line(client_socket) do
      {:ok, data} ->
        # it will dipatch the message to all clients
        dispatch(data)
        input_server({client_socket, output_pid})

      {:error, :closed} ->
        # when closed, it will remove the client tasks
        Agent.update(
          AcmePubSub.ConnectedClients,
          fn clients ->
            List.delete(clients, {self(), output_pid})
          end
        )

        # it will send the close signal to the
        # corresponding output server
        send(output_pid, :close)

        :ok
    end

  # dispatching function
  defp dispatch(data) do
    clients = Agent.get(AcmePubSub.ConnectedClients, &Function.identity/1)

    # for each input/output process, different from itself
    # it will dispatch the message
    for {input_pid, output_pid} <- clients,
        input_pid != self() do
      send(output_pid, data)
    end
  end


  # the output server
  defp output_server(client_socket) do
    receive do
      # it will close if the input server was closed
      :close ->
        :ok

      # receive the dispatched message, 
      # sending out to client
      data ->
        write_line(data, client_socket)
        output_server(client_socket)
    end
  end
Enter fullscreen mode Exit fullscreen mode

One alternative approach could be to encapsulate all this stuff in a GenServer, but it's an exercise for the reader, but I like this last solution because it shows how much it's possible with the Elixir primitives, Agent and Tasks.

The final solution is on the main branch of this repo:

https://github.com/marciol/acme_pub_sub

Top comments (2)

Collapse
 
pjo336 profile image
Peter Johnston

That sounds like a super hard exercise for an interview! This feels like one of those companies that complains about how hard it is to hire elixir devs but then throws this kind of stuff at you

Collapse
 
strzibny profile image
Josef Strzibny

It's ridiculous.