This post is partially about unfinished project observerio that I don't feel I have enough free time to finish in these days.
I've been working on the project to get the the real time testing of the different strategies for the games and as result the idea came into the project that's having http api, tcp server, ember dashboard and core library based on go(using xgo compilation for cross platform support).
Planning to share the few examples that it could be useful for someone.
If someone asked me about the chosen Elixir language, I can't explain feelings why I decided to use it on the backend. Probably I was inspired by previous experience in my startup app musicfeed. Obviously for me Elixir has pretty cool syntax, community, documentation. But the concepts around gen_server and other tools are pretty complex for understanding if you decide to choose it as the first language to learn. If someone tell you that you don't need to learn erlang it would be partially true while the number of libraries for Elixir is growing so fast.
I've decided to use tcp server and client to send variables and logs from mobile games via internal sdk. When we receive the variables or logs tcp server publish messages via pubsub (gateway) to subscribed websocket handler. In case if websocket connected(alive clients) it will automatically publish the changes to ember dashboard. Probably it sounds like the idea to play more than to have it in production but I did it anyway for fun :)
Lets review tcp server implementation(please feel free to apply changes if you see the possible issues, it's opensourced):
- define supervisors for tcp server and client(should be used for create the communication between websocket via pubsub and tcp client)
defmodule Web.Tcp.ServerSupervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [], name: :tcp_server_supervisor)
end
def init(_) do
children = [
worker(Web.Tcp.Server, [])
]
supervise(children, strategy: :one_for_one)
end
end
defmodule Web.Tcp.ClientSupervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [], name: :tcp_client_supervisor)
end
def start_client(token) do
Supervisor.start_child(:tcp_client_supervisor, [token])
end
def init(_) do
children = [
worker(Web.Tcp.Client, [])
]
# We also changed the `strategy` to `simple_one_for_one`.
# With this strategy, we define just a "template" for a child,
# no process is started during the Supervisor initialization, just
# when we call `start_child/2`
supervise(children, strategy: :simple_one_for_one)
end
end
- define server worker and pass module name
Web.Tcp.Handler
as entrypoint on running processing for connected client
defmodule Web.Tcp.Server do
require Logger
def start_link do
Logger.debug("[tcp] starting server on port :#{_port()}")
opts = [port: _port()]
{:ok, _} = :ranch.start_listener(
:tcp, _acceptors_size(), :ranch_tcp, opts, Web.Tcp.Handler, [])
end
def _port do
String.to_integer Application.get_env(:web, :tcp_port)
end
def _acceptors_size do
String.to_integer Application.get_env(:web, :tcp_acceptors_size)
end
end
- define module that would work as worker for connected tcp client. On init it will subscribe to pubsub channel by api key(per user). It will allow to receive the messages from pubsub and send back to the tcp client socket.
defmodule Web.Tcp.Client do
require Logger
require Poison
alias Web.Pubsub
def start_link(token) do
GenServer.start_link(__MODULE__, token, name: String.to_atom(token))
end
def init(token) do
Pubsub.subscribe("#{token}:vars:callback")
{:ok, %{token: token, messages: []}}
end
def handle_info(%{vars: vars}, %{token: token, messages: messages} = state) do
Logger.debug("[tcp.client] received message: #{inspect(vars)}")
message = _pack(token, "vars", vars)
Logger.debug("[tcp.client] packed message: #{inspect(message)}")
messages = messages ++ [message]
Logger.debug("[tcp.client] begin send message: #{inspect(messages)}")
state = token |> _get_socket |> _send_back(messages, state)
Logger.debug("[tcp.client] done send message: #{inspect(messages)}")
Logger.debug("[tcp.client] messages: #{inspect(messages)}")
{:noreply, state}
end
def terminate(reason, status) do
Logger.debug("[tcp.client] reason: #{inspect(reason)}, status: #{inspect(status)}")
:ok
end
defp _send_back({:ok, socket}, messages, state) do
:ok = _send(socket, messages)
%{state | messages: []}
end
defp _send_back(:enqueue, messages, state) do
%{state | messages: messages}
end
defp _send(s, []), do: :ok
defp _send({socket, transport} = s, [message | messages]) do
transport.send(socket, message)
_send(s, messages)
end
def _pack(token, "vars", vars) do
vars = vars
|> Poison.encode!
|> Base.encode64
"v:#{token}:#{vars}\n"
end
defp _get_socket(token) do
Logger.debug("[tcp.socket] search for socket, transport by token: #{inspect(token)}")
response = case Registry.lookup(Registry.Sockets, token) do
[{_, socket}] -> {:ok, socket}
[] -> :enqueue
end
Logger.debug("[tcp.client] _get_socket: #{inspect(response)}")
response
end
end
- tcp handler on receive will register socket in registry and start the new client worker to subscribe to pubsub channel by api key and have communication between pubsub and tcp client.
defmodule Web.Tcp.Handler do
require Logger
alias Web.Pubsub
@moduledoc """
`Handler` is waiting lines separated by \n new line, in case if handler don't
see new line it starts to accumulate data until it receives new line.
`Registry.Sockets` contains api_key -> socket records for easy back communication
from dashboard page to tcp clients.
"""
def start_link(ref, socket, transport, opts) do
pid = spawn_link(__MODULE__, :init, [ref, socket, transport, opts])
{:ok, pid}
end
def init(ref, socket, transport, _opts = []) do
:ok = :ranch.accept_ack(ref)
case transport.peername(socket) do
{:ok, _peer} -> loop(socket, transport, "")
{:error, reason} -> Logger.error("[tcp.handler] init receive error reason: #{inspect(reason)}")
end
end
@timeout 5_000
def loop(socket, transport, acc) do
# Don't flood messages of transport, receive once and leave the remaining
# data in socket until we run recv again.
transport.setopts(socket, [active: :once])
# before to proceed with receive block on messages we should call
# once transport.messages() to ping ranch
{ok, closed, error} = transport.messages()
receive do
{ok, socket, data} ->
Logger.debug("[tcp.handler] received data: #{inspect(data)}")
acc <> data
|> String.split("\n")
|> Enum.map(&(String.trim(&1)))
|> _process(socket, transport)
loop(socket, transport, "")
{closed, socket} ->
Logger.debug("[tcp.handler] closed socket: #{inspect(socket)}")
{error, socket, reason} ->
Logger.error("[tcp.handler] socket: #{inspect(socket)}, closed becaose of the error reason: #{inspect(reason)}")
{:error, error} ->
Logger.error("[tcp.handler] error: #{inspect(error)}")
{'EXIT', parent, reason} ->
Logger.error("[tcp.handler] exit parent reason: #{inspect(reason)}")
Process.exit(self(), :kill)
message ->
Logger.debug("[tcp.handler] message on receive block: #{inspect(message)}")
end
end
defp _kill(), do: Process.exit(self(), :kill)
defp _process([], socket, transport), do: loop(socket, transport, "")
defp _process([""], socket, transport), do: loop(socket, transport, "")
defp _process([line, ""], socket, transport) do
_protocol(line, socket, transport)
loop(socket, transport, "")
end
defp _process([line], socket, transport), do: loop(socket, transport, line)
defp _process([line | lines], socket, transport) do
_protocol(line, socket, transport)
_process(lines, socket, transport)
end
defp _protocol(line, socket, transport) do
Logger.debug("[_protocol] line: #{line}")
case line |> Web.Tcp.Protocol.process do
{:verified, api_key} ->
_register_socket(api_key, socket, transport)
Web.Tcp.ClientSupervisor.start_client(api_key)
Logger.debug("[tcp.server] transport should respond with OK")
case transport.send(socket, "OK\n") do
{:error, reason} ->
Logger.error(inspect(reason))
_ ->
end
{:error, reason} ->
Logger.error("[tcp] #{inspect(reason)}")
:error ->
Logger.error("error on processing: #{inspect(line)}")
_ ->
end
end
def _register_socket(api_key, socket, transport) do
Logger.debug("[tcp.handler] _register_socket token: #{api_key}")
case Registry.register(Registry.Sockets, api_key, {socket, transport}) do
{:error, {:already_registered, _pid}} ->
Registry.update_value(Registry.Sockets, api_key, fn (_) -> {socket, transport} end)
{:error, reason} ->
Logger.error("[tcp] reason: #{inspect(reason)}")
_ ->
end
end
end
-
Web.Tcp.Protocol
defines the parser and handler of possible commands that's passing via tcp client.
Example:
l:logs:<base64 json messages>
i:vars:<base64 json variables>
v:api_key - client should pass api key to verify that we have the registered client
defmodule Web.Tcp.Protocol do
require Logger
require Poison
alias Web.Gateway
alias Web.Db.Users
@moduledoc """
Server messages:
- `l:logs`
logs - should come as json array and encoded base64
- `i:vars`
vars - should come as json dictionary and encoded by base64
- `v:api_key`
api_key - should verify key using our registry
Client messages:
- `i:s:name:value` - var set by name value inside of app
"""
def process("l:" <> <<api_key :: bytes-size(12)>> <> ":" <> logs) do
Logger.debug("[protocol] api_key: #{api_key}, logs: #{inspect(logs)}")
logs
|> Base.decode64!
|> Poison.decode!
|> Gateway.logs(api_key)
end
def process("i:" <> <<api_key :: bytes-size(12)>> <> ":" <> vars) do
Logger.debug("[protocol] api_key: #{api_key}, vars: #{inspect(vars)}")
vars
|> Base.decode64!
|> Poison.decode!
|> Gateway.vars(api_key)
end
def process("v:" <> <<api_key :: bytes-size(12)>>) do
if Users.verify_key(api_key) do
{:verified, api_key}
else
{:error, "not registered user with api_key: #{api_key}"}
end
end
def process(_), do: :error
end
- finally the test case for review the basic expected behaviour tcp_test.ex:
defmodule Web.TcpTest do
use ExUnit.Case
doctest Web.Tcp
alias Web.Db.Users
require RedisPoolex
require Logger
require Poison
require Tirexs.Query
alias RedisPoolex, as: Redis
setup do
Redis.query(["FLUSHDB"])
{:ok, api_key} = Users.register(%{email: "user1@example.com", password: "12345678"})
port = Application.get_env(:web, :tcp_port)
host = "127.0.0.1" |> String.to_char_list
{:ok, socket} = :gen_tcp.connect(host, port, [active: false])
{:ok, socket: socket, api_key: api_key}
end
test "should register user session", %{socket: socket, api_key: api_key} do
:ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
{:ok, reply} = :gen_tcp.recv(socket, 0, 1000)
assert reply == 'OK'
end
test "should bulk insert logs on tcp request", %{socket: socket, api_key: api_key} do
:ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
:ok = :gen_tcp.send(socket, "l:" <> api_key <> ":" <> ([%{message: "testing1", timestamp: 123123123}, %{message: "testing2", timestamp: 123123123}] |> Poison.encode! |> Base.encode64) <> "\n")
:timer.sleep(2000)
assert {:ok, 200, %{hits: %{hits: hits}} = response} = Tirexs.Query.create_resource([index: "logs-#{api_key}", search: ""])
assert Enum.count(hits) == 2
end
# TODO: add marker support to create separate sessions on multiple devices.
# we could have separate dashboards for the different devices.
test "should store vars on tcp request", %{socket: socket, api_key: api_key} do
:ok = :gen_tcp.send(socket, "v:" <> api_key <> "\n")
:ok = :gen_tcp.send(socket, "i:" <> api_key <> ":" <> ([%{name: "testing1", type: "string", value: "example"}, %{name: "testing2", type: "integer", value: "-1"}] |> Poison.encode! |> Base.encode64) <> "\n")
:timer.sleep(2000)
vars = Redis.query ["HKEYS", "#{api_key}:vs"]
assert Enum.count(vars) == 4
assert vars == ["testing1:type", "testing1:value", "testing2:type", "testing2:value"]
end
end
In the next examples I will share the module for Pubsub and Gateway implementation, it would be pretty easy to review.
Everything was done for fun, in case of usage the same approaches it should be done very carefully, don't practice elixir daily and as result it could be wrong direction in some places and could be improved.
Thank you for reading!
Top comments (1)
This is actually mega helpful for what I'm working on. Great writeup and thanks for taking the time to do it <3