DEV Community

Amin Arria
Amin Arria

Posted on • Updated on

Scalable Broadcasting Using WebRTC

The Idea

WebRTC is a free, open project that provides browsers and mobile applications with Real-Time Communications (RTC) capabilities via simple APIs. The WebRTC components have been optimized to best serve this purpose.

Our idea is pretty simple: use WebRTC to create video broadcasting (1 user sends video to N users). For this we will first look into how to design it, create a proof of concept, and finally implement it in Erlang.

WebRTC Problem

As you may know, WebRTC uses a peer-to-peer connection to transmit video. This works incredible for one-to-one calls or even for small group calls.

WebRTC group call

The problem is we are building a broadcasting feature, this means that there could be 100, 1000, or even 7 billion users connected. So if we try using WebRTC as is, this means the broadcaster will have to establish that many peer connections and send to each one the video he wants to broadcast. So, unless the broadcaster has a lot of CPU power and bandwidth to spare it will crash and burn trying.

WebRTC broadcast

How can we solve this issue? Well, all of our research gave us the same basic solution: broadcaster sends video to a central server and this server sends it to all listeners, been the central server the one with the CPU/bandwidth burden. Sounds simple, right? Well unfortunately doing this using WebRTC is not that simple, because it is meant to be peer-to-peer. This means that we can't simply create a central server and relay the video to the clients.

scalable broadcast architecture

Solution

Janus is a WebRTC Server developed by Meetecho conceived to be a general purpose one. As such, it doesn't provide any functionality per se other than implementing the means to set up a WebRTC media communication with a browser, exchanging JSON messages with it, and relaying RTP/RTCP and messages between browsers and the server-side application logic they're attached to. Any specific feature/application is provided by server side plugins, that browsers can then contact via Janus to take advantage of the functionality they provide.

Great! We have something that looks like can solve all our issues regarding a central server using WebRTC, but before declaring this "the cure to all diseases" we decided to create a PoC to check two things:

  • Verify that broadcaster/listener WebRTC connections behaved as expected (only one per peer to Janus).
  • We could use Janus without its Javascript library.

Why focus on not using Janus's JS library? Well because we plan on using our backend as a wrapper for all things Janus. Saving frontend from having to deal with Janus's extra complexity.

Making the PoC

Now the first hurdle in this new journey: Creating a JS client for Janus. Looking into the demos and documentation we found out we needed to use the video room plugin.

First things first, Janus works by creating a session to identify clients and a handle (sort of a session inside the session) to interact with a specific plugin, so let's create them:

function start_session() {
  msg = JSON.stringify({'janus': 'create', 'transaction': 'create_session'});
  janusConnection.send(msg);
}

function start_handle() {
  msg = JSON.stringify({'janus': 'attach', 'transaction': 'create_handle', "plugin": "janus.plugin.videoroom", 'session_id': sessionId});
  janusConnection.send(msg);
}

IMPORTANT: Janus websocket uses the sub-protocol janus-protocol in JS is just another argument, but in case you need it, you set it in the header Sec-WebSocket-Protocol field as per RFC 6455

Next thing is for our broadcaster to create the room, join it, and start broadcasting:

function make_room() {
  msg = JSON.stringify({janus: 'message', transaction: 'create_room', body: {request: 'create'}, session_id: sessionId, handle_id: handleId});
  janusConnection.send(msg);
}

function join_room_publisher() {
  roomId = parseInt(roomInput.value);
  msg = JSON.stringify({janus: 'message', transaction: 'join_publisher', body: {request : 'join', ptype: 'publisher', room: roomId}, session_id: sessionId, handle_id: handleId});
  janusConnection.send(msg);
}

function start_broadcast() {
  peerConnection = new RTCPeerConnection(CONFIG);
  peerConnection.onicecandidate = on_ice_candidate;
  peerConnection.ontrack = on_track;

  navigator.mediaDevices.getUserMedia(CONFIG).then(function(stream) {
    localVideo.srcObject = stream;
    peerConnection.addStream(stream);

    peerConnection.createOffer().then(function(offer) {
      peerConnection.setLocalDescription(offer);
      msg = JSON.stringify({janus: 'message', transaction: 'publish', body: {request: 'publish'}, jsep: offer, session_id: sessionId, handle_id: handleId});
      janusConnection.send(msg);
    });
  });
}

As you can see we have started using WebRTC, its important to know that Janus responds to the publish request with the WebRTC answer in the jsep field. Also, by default Janus includes the ICE candidates inside the SDP, you can change it to send them using trickle, but for this PoC it was fine as is.

That's pretty much it for the broadcaser side of things, just two things missing (sending ICE candidates and keepalive), but will talk about them later as they are the same for broadcasters and viewers.

Now, the viewers. We already have the session and handle taken care so we just only need to go into the room and start receiving the feed from the broadcaster. To do this two things we need: the room ID and the feed ID for that broadcaster. Why is there a feed ID? Well because Janus doesn't impose an actual limit on number of broadcasters, you can set it when creating the room.

function join_room_subscriber() {
  roomId = parseInt(roomInput.value);
  feedId = parseInt(feedInput.value);

  peerConnection = new RTCPeerConnection(CONFIG);
  peerConnection.onicecandidate = on_ice_candidate;
  peerConnection.ontrack = on_track;

  msg = JSON.stringify({janus: 'message', transaction: 'join_subscriber', body: {request : 'join', ptype: 'subscriber', room: roomId, feed: feedId}, session_id: sessionId, handle_id: handleId});
  janusConnection.send(msg);
}

function join_subscriber(payload) {
  peerConnection.setRemoteDescription(new RTCSessionDescription(payload.jsep));
  peerConnection.createAnswer().then(function(answer) {
    peerConnection.setLocalDescription(answer);
    msg = JSON.stringify({janus: 'message', transaction: 'blah', body: {request: 'start'}, jsep: answer, session_id: sessionId, handle_id: handleId});
    janusConnection.send(msg);
  });
}

So now that broadcaster and viewer side of things are done let's send our ICE candidates:

function on_ice_candidate(event) {
  msg = JSON.stringify({janus: 'trickle', transaction: 'candidate', candidate: event.candidate, session_id: sessionId, handle_id: handleId});
  janusConnection.send(msg);
}

One final, and important, thing missing is the keepalive. Janus by default will terminate any session that doesn't send any request in 60 seconds. Remember, sessions and websockets are different things.

function keepalive() {
    msg = JSON.stringify({janus: 'keepalive', transaction: 'keepalive', session_id: sessionId});
    janusConnection.send(msg);
}

So after all this we have a working PoC for our broadcast using Janus :D

The Project

First of, dependencies:

  • Erlang (we worked with OTP 21).
  • Docker and docker-compose
  • Cowboy: to handle the client-server websocket connection.
  • websocket_client: to handle the server-Janus websocket connection.
  • jsx: JSON decode/encode, all our websockets communicate using JSON.

Let's start by creating a module in charge of server-Janus communication, janus_handler.erl:

-module(janus_handler).

-behaviour(websocket_client_handler).

-export([init/2,
         websocket_handle/3,
         websocket_info/3,
         websocket_terminate/3,
         start_link/1]).

start_link(From) ->
  Opts = [{extra_headers, [{<<"Sec-WebSocket-Protocol">>, <<"janus-protocol">>}]}],
  websocket_client:start_link("ws://localhost:8188", ?MODULE, From, Opts).

% Websocket Handler
init(From, _ConnState) ->
  {ok, #{from => From}}.

websocket_handle({text, Msg}, _ConnState, State) ->
  DecodedMsg = jsx:decode(Msg, [return_maps]),
  process(DecodedMsg, State).

websocket_info(create_session, _ConnState, State) ->
  Msg = jsx:encode(#{janus => create, transaction => create_session}),
  {reply, {text, Msg}, State};

websocket_info({create_handle, SessionId}, _ConnState, State) ->
  Msg = jsx:encode(#{janus => attach,
                     plugin => <<"janus.plugin.videoroom">>,
                     transaction => create_handle,
                     session_id => SessionId}),
  {reply, {text, Msg}, State};

websocket_info({create_room, SessionId, HandleId}, _ConnState, State) ->
  Msg = jsx:encode(#{janus => message,
                     transaction => create_room,
                     body => #{request => create},
                     session_id => SessionId,
                     handle_id => HandleId}),
  {reply, {text, Msg}, State};

websocket_info({join_publisher, SessionId, HandleId, RoomId}, _ConnState, State) ->
  Body = #{request => join, ptype => publisher, room => RoomId},
  Msg = jsx:encode(#{janus => message,
                     transaction => join_publisher,
                     body => Body,
                     session_id => SessionId,
                     handle_id => HandleId}),
  {reply, {text, Msg}, State};

websocket_info({join_subscriber, SessionId, HandleId, RoomId, FeedId}, _ConnState, State) ->
  Body = #{request => join, ptype => subscriber, room => RoomId, feed => FeedId},
  Msg = jsx:encode(#{janus => message,
                     transaction => join_subscriber,
                     body => Body,
                     session_id => SessionId,
                     handle_id => HandleId}),
  {reply, {text, Msg}, State};

websocket_info({send_offer, SessionId, HandleId, Offer}, _ConnState, State) ->
  Msg = jsx:encode(#{janus => message,
                     transaction => send_offer,
                     body => #{request => publish},
                     jsep => Offer,
                     session_id => SessionId,
                     handle_id => HandleId}),
  {reply, {text, Msg}, State};

websocket_info({send_answer, SessionId, HandleId, Answer}, _ConnState, State) ->
  Msg = jsx:encode(#{janus => message,
                     transaction => send_answer,
                     body => #{request => start},
                     jsep => Answer,
                     session_id => SessionId,
                     handle_id => HandleId}),
  {reply, {text, Msg}, State};

websocket_info({trickle, SessionId, HandleId, Candidate}, _ConnState, State) ->
  Msg = jsx:encode(#{janus => trickle,
                     transaction => trickle,
                     candidate => Candidate,
                     session_id => SessionId,
                     handle_id => HandleId}),
  {reply, {text, Msg}, State};

websocket_info({keepalive, SessionId}, _ConnState, State) ->
  Msg = jsx:encode(#{janus => keepalive,
                     transaction => keepalive,
                     session_id => SessionId}),
  {reply, {text, Msg}, State}.

websocket_terminate(_Reason, _ConnState, _State) ->
  ok.

% Internal
process(#{<<"transaction">> := <<"create_session">>,
          <<"data">> := #{<<"id">> := SessionId}}, State) ->
  Msg = #{session_id => SessionId},
  send_msg(Msg, State);
process(#{<<"transaction">> := <<"create_handle">>,
          <<"data">> := #{<<"id">> := HandleId}}, State) ->
  Msg = #{handle_id => HandleId},
  send_msg(Msg, State);
process(#{<<"transaction">> := <<"create_room">>,
          <<"plugindata">> := #{<<"data">> := #{<<"room">> := RoomId}}}, State) ->
  Msg = #{room_id => RoomId},
  send_msg(Msg, State);
process(#{<<"transaction">> := <<"join_publisher">>,
          <<"plugindata">> := #{<<"data">> := #{<<"id">> := PubId}}}, State) ->
  Msg = #{publisher_id => PubId},
  send_msg(Msg, State);
process(#{<<"transaction">> := <<"join_subscriber">>,
          <<"jsep">> := Offer}, State) ->
  Msg = #{jsep => Offer},
  send_msg(Msg, State);
process(#{<<"transaction">> := <<"publish">>,
          <<"jsep">> := Answer}, State) ->
  Msg = #{jsep => Answer},
  send_msg(Msg, State);
process(#{<<"janus">> := <<"ack">>}, State) ->
  {ok, State}.

send_msg(Msg, #{from := From} = State) ->
  From ! {reply, Msg},
  {ok, State}.

You might wonder what that From stored in State is, well at the end it will correspond to the process handling the client-server websocket connection. That way we establish a one-to-one relationship between the two processes and they can pass requests/responses one to another.

Since we plan our server-janus connections to be established on-demand let's create a supervisor for them and add it to our app's (janus_erlang_wrapper) supervision tree.

-module(janus_erlang_wrapper_sup).

-behaviour(supervisor).

-export([init/1,
         start_link/0]).

-export([]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
  JanusSup =
    {janus_sup, {janus_sup, start_link, []},
     permanent, infinity, supervisor, [janus_sup]
    },
  {ok, {{one_for_all, 10, 10}, [JanusSup]}}.

-module(janus_sup).

-behaviour(supervisor).

-export([create_janus_connection/0,
         init/1,
         start_link/0]).


start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, []).

create_janus_connection() ->
  supervisor:start_child(?MODULE, [self()]).

init(_) ->
  JanusHandler =
    {janus_handler, {janus_handler, start_link, []},
     transient, 5000, worker, [janus_handler]
    },
  {ok, {{simple_one_for_one, 10, 10}, [JanusHandler]}}.

Great! Now we can dynamically establish connections from our server to Janus and communicate with it. Final step will be to create the API our clients will use, for this we create a websocket handler for Cowboy:

-module(ws_handler).

-export([init/2,
         websocket_handle/2,
         websocket_info/2,
         websocket_init/1]).

init(Req, State) ->
  {cowboy_websocket, Req, State}.

websocket_init(State) ->
  {ok, State}.

websocket_handle({text, Msg}, State) ->
  DecodedMsg = jsx:decode(Msg, [return_maps]),
  process(DecodedMsg, State);
websocket_handle(_, State) ->
  {ok, State}.

websocket_info({reply, Msg}, State) ->
  EncodedMsg = jsx:encode(Msg),
  {reply, {text, EncodedMsg}, State}.

% Internal
process(#{<<"request">> := <<"create_session">>}, State) ->
  {ok, JanusWs} = janus_sup:create_janus_connection(),
  JanusWs ! create_session,
  {ok, State#{janus_ws => JanusWs}};

process(#{<<"request">> := <<"create_handle">>,
          <<"session_id">> := SessionId}, #{janus_ws := JanusWs} = State) ->
  JanusWs ! {create_handle, SessionId},
  {ok, State};

process(#{<<"request">> := <<"create_room">>,
          <<"session_id">> := SessionId,
          <<"handle_id">> := HandleId}, #{janus_ws := JanusWs} = State) ->
  JanusWs ! {create_room, SessionId, HandleId},
  {ok, State};

process(#{<<"request">> := <<"join_publisher">>,
          <<"session_id">> := SessionId,
          <<"handle_id">> := HandleId,
          <<"room_id">> := RoomId}, #{janus_ws := JanusWs} = State) ->
  JanusWs ! {join_publisher, SessionId, HandleId, RoomId},
  {ok, State};

process(#{<<"request">> := <<"join_subscriber">>,
          <<"session_id">> := SessionId,
          <<"handle_id">> := HandleId,
          <<"room_id">> := RoomId,
          <<"feed_id">> := FeedId}, #{janus_ws := JanusWs} = State) ->
  JanusWs ! {join_subscriber, SessionId, HandleId, RoomId, FeedId},
  {ok, State};

process(#{<<"request">> := <<"publish">>,
          <<"session_id">> := SessionId,
          <<"handle_id">> := HandleId,
          <<"jsep">> := Offer}, #{janus_ws := JanusWs} = State) ->
  JanusWs ! {send_offer, SessionId, HandleId, Offer},
  {ok, State};

process(#{<<"request">> := <<"listen">>,
          <<"session_id">> := SessionId,
          <<"handle_id">> := HandleId,
          <<"jsep">> := Answer}, #{janus_ws := JanusWs} = State) ->
  JanusWs ! {send_answer, SessionId, HandleId, Answer},
  {ok, State};

process(#{<<"request">> := <<"trickle">>,
          <<"session_id">> := SessionId,
          <<"handle_id">> := HandleId,
          <<"candidate">> := Candidate}, #{janus_ws := JanusWs} = State) ->
  JanusWs ! {trickle, SessionId, HandleId, Candidate},
  {ok, State};

process(#{<<"request">> := <<"keepalive">>,
          <<"session_id">> := SessionId}, #{janus_ws := JanusWs} = State) ->
  JanusWs ! {keepalive, SessionId},
  {ok, State}.

As you might notice we are having one API request for each Janus request. This was done for simplicity's sake, but you could easily write an API request which performs multiple Janus requests.

Considerations

Multiplexing websockets

As you saw, we create one server-Janus websocket per client. On actual live usage this might not scale properly due to the overhead of having to establish the websocket connection on-demand, so we designed a possible solution: multiplex multiple sessions through a set of previously created websockets. Something like this:

  1. Create a fix set of server-Janus websockets on start using a worker pool.
  2. Create a unique ID for requests, should identify the process to send Janus's response.
  3. Add the request's unique ID to transaction value.
  4. When handling a response from Janus decode the request ID to figure out to what process should we send the response.

NOTE: Sessions have to be unique, one per client.

Problems inside Docker

I think I've haven't mention this, but we are using Janus inside a Docker container. Normally this wouldn't be an issue, add it to our docker-compose.yml, expose the ports, and that's it.

Sadly no, things aren't so easy. STUN servers have one simple job: to tell a client its public IP. That way if a client is behind a NAT and only knows its private IP it can find out its public IP. We are using our own set of STUN/TURN servers (inside the same docker-compose as Janus), which becomes an issue when Janus creates its ICE candidates given that Janus and STUN are on the same LAN. Because the STUN server returns to Janus its public IP as the one it has inside the Docker containers' network, a private IP. If you don't know a lot about networking, just know that this is REALLY bad.

Thankfully Janus has ways to handle this sort of things, you can have it replace the host IP on all ICE candidates by an IP of your choosing:

  • Pass the argument -1 <public IP>
  • In janus.jcfg inside nat section set nat_1_1_mapping = "<public IP>"

Conclusion

Great! Now we have a working and scalable broadcasting using WebRTC. So many new things learned, a bit over-engineering over there, but we made it. :D

You can find the code for all this in

GitHub logo AminArria / janus_erlang_wrapper_demo

Demo for a Erlang app wrapping Janus

Demo: Erlang wrapper for Janus

This is a demo Erlang app to demonstrate the usage of Janus. For further information regarding "why this demo?" I encourage you to read my blog post Scalable Broadcasting Using WebRTC.

The demo will expose the following endpoints:

NOTE: This demo was tested using Firefox 65 and Chrome 72.

Requirements

  • Erlang/OTP (tested with Erlang/OTP 21)
  • Docker Compose

Running Demo

First turn on Janus

make ops

Start Erlang app

make run

To stop Janus

make ops_stop

Demo

Both clients, janus-client and api-client, present the same interface and way to use so everthing explained below applies to both.

As broadcaster

  1. Start session
  2. Start handle
  3. Make room (This will fill the Room ID field)
  4. Join as publisher (This will…

Top comments (2)

Collapse
 
abhicoding profile image
Abhishek Shingane
Sadly no, things aren't so easy. STUN servers have one simple job: to tell a client its public IP. That way if a client is behind a NAT and only knows its private IP it can find out its public IP. We are using our own set of STUN/TURN servers (inside the same docker-compose as Janus), which becomes an issue when Janus creates its ICE candidates given that Janus and STUN are on the same LAN. Because the STUN server returns to Janus its public IP as the one it has inside the Docker containers' network, a private IP. If you don't know a lot about networking, just know that this is REALLY bad.

paragraph is duplicated

Collapse
 
aminarria profile image
Amin Arria

Fixed, thanks!