DEV Community

Masui Masanori
Masui Masanori

Posted on

[Go] Try Pion/WebRTC with SSE

Intro

This time, I will try Pion/WebRTC.

Because Pion already has good examples, I will create a sample based on SFU-WebSocket of example-webrtc-applications.

I will try changing these points.

  • Use SSE(Server-Sent Events) for signaling
  • Start connecting manually

I will add WebRTC functions into the last sample project.

And I also refer this post(especially the client-side).

Environments

  • Go ver.go1.18.2 windows/amd64
  • Node.js ver.18.1.0

Connecting with WebRTC SFU

When I tried WebRTC last time, the server-side application just worked for signaling.
After signaling, the clients were directly connected to each other.

Image description

This time they will only connect to the server-side application.
After connecting, the clients will send video tracks and audio tracks to the server-side application.
And The server-side application send other clients' tracks as remote tracks to the clients.

Image description

Samples

This time, I publish the sample project on GitHub.

Client-side

Because the process for connecting starts from the server-side application.

So the client-side just needs handling offer and ICE candidate events.
And I will create a RTCPeerConnection on start.

main.page.ts

...
function handleReceivedMessage(value: string) {
    const message = JSON.parse(value);
    if(!checkIsClientMessage(message)) {
        return;
    }
    switch(message.event) {
        case "text":
            view.addReceivedText({ user: message.userName, message: message.data });
            break;
        case "offer":
            webrtc.handleOffer(JSON.parse(message.data));
            break;
        case "candidate":
            webrtc.handleCandidate(JSON.parse(message.data));
            break;
    }
}
function sendAnswer(data: RTCSessionDescriptionInit) {
    if(!hasAnyTexts(userName)) {
        return;
    }
    sse.sendMessage({userName, event: "answer", data: JSON.stringify(data)});
}
function sendCandidate(data: RTCIceCandidate) {
    if(!hasAnyTexts(userName)) {
        return;
    }
    sse.sendMessage({userName, event: "candidate", data: JSON.stringify(data)});
}
function checkIsClientMessage(value: any): value is ClientMessage {
    // All messages from the server-side application are sent as "ClientMessage".
    if(value == null) {
        return false;
    }
    if(("event" in value &&
        typeof value["event"] === "string") === false) {
        return false;
    }
    if(("data" in value &&
        typeof value["data"] === "string") === false) {
        return false;
    }
    return true;
}
init();
Enter fullscreen mode Exit fullscreen mode

webrtc.controller.ts

export class WebRtcController {
    private webcamStream: MediaStream|null = null; 
    private peerConnection: RTCPeerConnection|null = null;
    private answerSentEvent: ((data: RTCSessionDescriptionInit) => void)|null = null;
    private candidateSentEvent: ((data: RTCIceCandidate) => void)|null = null;
    public init() {
        const localVideo = document.getElementById("local_video") as HTMLVideoElement;
        localVideo.addEventListener("canplay", () => {
            const width = 320;
            const height = localVideo.videoHeight / (localVideo.videoWidth/width);          
            localVideo.setAttribute("width", width.toString());
            localVideo.setAttribute("height", height.toString());
          }, false);
        navigator.mediaDevices.getUserMedia({ video: true, audio: true })
          .then(stream => {
              localVideo.srcObject = stream;
              localVideo.play();
              this.webcamStream = stream;
              // create a RTCPeerConnection after getting local MediaStream
              this.connect();
          })
          .catch(err => console.error(`An error occurred: ${err}`));
    }
...
    /** handle received offer and send answer */
    public handleOffer(data: RTCSessionDescription|null|undefined) {
        if(this.peerConnection == null ||
                data == null) {
            return;
        }
        this.peerConnection.setRemoteDescription(data);
        this.peerConnection.createAnswer()
            .then(answer => {
                if(this.peerConnection != null) {
                    this.peerConnection.setLocalDescription(answer);
                }
                if(this.answerSentEvent != null) {
                    this.answerSentEvent(answer);
                }
            });
    }
    /** add ICE Candidate */
    public handleCandidate(data: RTCIceCandidate|null|undefined) {
        if(this.peerConnection == null ||
            data == null) {
            return;
        }
        this.peerConnection.addIceCandidate(data);
    }
    private connect() {
        if(this.webcamStream == null) {
            return;
        }
        this.peerConnection = new RTCPeerConnection({
            iceServers: [{
                urls: `stun:stun.l.google.com:19302`,  // A STUN server              
            }]
        });
        // Add remote video tracks as new video elements.
        this.peerConnection.ontrack = (ev) => {
            if (ev.track.kind === "audio" ||
                ev.streams[0] == null) {
              return;
            }    
            const remoteVideo = document.createElement("video");
            remoteVideo.srcObject = ev.streams[0];
            remoteVideo.autoplay = true;
            remoteVideo.controls = true;
            const videoArea = document.getElementById("remote_video_area") as HTMLElement;
            videoArea.appendChild(remoteVideo);
            ev.track.onmute = () => {
                remoteVideo.play();
            };
            ev.streams[0].onremovetrack = () => {
              if (remoteVideo.parentNode) {
                remoteVideo.parentNode.removeChild(remoteVideo);
              }
            };
          };
        this.webcamStream.getTracks().forEach(track => {
            if(this.peerConnection == null ||
                this.webcamStream == null) {
                return;
            }
            this.peerConnection.addTrack(track, this.webcamStream)
        });
        this.peerConnection.onicecandidate = ev => {
            if (ev.candidate == null ||
                this.candidateSentEvent == null) {
              return;
            }
            this.candidateSentEvent(ev.candidate);
        };
    }   
}
Enter fullscreen mode Exit fullscreen mode

Server-side

sseClient.go

package main

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "log"
    "net/http"

    "github.com/pion/webrtc/v3"
)

type SSEClient struct {
    candidateFound        chan *webrtc.ICECandidate
    changeConnectionState chan webrtc.PeerConnectionState
    addTrack              chan *webrtc.TrackRemote
    userName              string
    w                     http.ResponseWriter
}

func newSSEClient(userName string, w http.ResponseWriter) *SSEClient {
    return &SSEClient{
        candidateFound:        make(chan *webrtc.ICECandidate),
        changeConnectionState: make(chan webrtc.PeerConnectionState),
        addTrack:              make(chan *webrtc.TrackRemote),
        userName:              userName,
        w:                     w,
    }
}

func registerSSEClient(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
    userName, err := getParam(r, "user")
    if err != nil {
        log.Println(err.Error())
        fmt.Fprint(w, "The parameters have no names")
        return
    }
    newClient := newSSEClient(userName, w)
    ps, err := NewPeerConnectionState(newClient)
    if err != nil {
        log.Println(err.Error())
        fmt.Fprint(w, "Failed connection")
        return
    }
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    hub.register <- ps

    // For pushing data to clients, I call "flusher.Flush()"
    flusher, _ := w.(http.Flusher)
    defer func() {
        hub.unregister <- ps
        if ps.peerConnection.ConnectionState() != webrtc.PeerConnectionStateClosed {
            ps.peerConnection.Close()
        }
        close(newClient.candidateFound)
        close(newClient.changeConnectionState)
        close(newClient.addTrack)
    }()
    for {
        // handle PeerConnection events and close SSE event.
        select {
        case candidate := <-newClient.candidateFound:
            jsonValue, err := NewCandidateMessageJSON(newClient.userName, candidate)
            if err != nil {
                log.Println(err.Error())
                return
            }
            fmt.Fprintf(w, "data: %s\n\n", jsonValue)
            flusher.Flush()
        case track := <-newClient.addTrack:
            hub.addTrack <- track
        case connectionState := <-newClient.changeConnectionState:
            switch connectionState {
            case webrtc.PeerConnectionStateFailed:
                return
            case webrtc.PeerConnectionStateClosed:
                return
            }
        case <-r.Context().Done():
            // when "es.close()" is called, this loop operation will be ended.
            return
        }
    }
}
func sendSSEMessage(w http.ResponseWriter, r *http.Request, hub *SSEHub) {
    w.Header().Set("Content-Type", "application/json")
    body, err := ioutil.ReadAll(r.Body)

    if err != nil {
        log.Println(err.Error())
        j, _ := json.Marshal(GetFailed("Failed reading values from body"))
        w.Write(j)
        return
    }
    message := &ClientMessage{}
    err = json.Unmarshal(body, &message)
    if err != nil {
        log.Println(err.Error())
        j, _ := json.Marshal(GetFailed("Failed converting to ClientMessage"))
        w.Write(j)
        return
    }
    w.WriteHeader(200)
    hub.broadcast <- *message
    data, _ := json.Marshal(GetSucceeded())
    w.Write(data)
}
Enter fullscreen mode Exit fullscreen mode

peerConnectionState.go

package main

import (
    "github.com/pion/webrtc/v3"
)

type PeerConnectionState struct {
    peerConnection *webrtc.PeerConnection
    client         *SSEClient
}

func NewPeerConnectionState(client *SSEClient) (*PeerConnectionState, error) {
    peerConnection, err := webrtc.NewPeerConnection(webrtc.Configuration{
        ICEServers: []webrtc.ICEServer{
            {
                URLs: []string{
                    "stun:stun.l.google.com:19302",
                },
            },
        },
    })
    if err != nil {
        return nil, err
    }
    for _, typ := range []webrtc.RTPCodecType{webrtc.RTPCodecTypeVideo, webrtc.RTPCodecTypeAudio} {
        if _, err := peerConnection.AddTransceiverFromKind(typ, webrtc.RTPTransceiverInit{
            Direction: webrtc.RTPTransceiverDirectionRecvonly,
        }); err != nil {
            return nil, err
        }
    }
    // Add event handlers.
    peerConnection.OnICECandidate(func(i *webrtc.ICECandidate) {
        if i == nil {
            return
        }
        client.candidateFound <- i
    })
    peerConnection.OnConnectionStateChange(func(p webrtc.PeerConnectionState) {
        // avoid panic after closing channel
        if p == webrtc.PeerConnectionStateClosed {
            _, ok := <-client.changeConnectionState
            if ok {
                client.changeConnectionState <- p
            }
            return
        }
        client.changeConnectionState <- p
    })
    peerConnection.OnTrack(func(t *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
        client.addTrack <- t
    })

    return &PeerConnectionState{
        peerConnection: peerConnection,
        client:         client,
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

sseHub.go

package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "time"

    "github.com/pion/rtcp"
    "github.com/pion/webrtc/v3"
)

type SSEHub struct {
    clients     map[*PeerConnectionState]bool
    broadcast   chan ClientMessage
    register    chan *PeerConnectionState
    unregister  chan *PeerConnectionState
    trackLocals map[string]*webrtc.TrackLocalStaticRTP
    addTrack    chan *webrtc.TrackRemote
}

func newSSEHub() *SSEHub {
    return &SSEHub{
        clients:     make(map[*PeerConnectionState]bool),
        broadcast:   make(chan ClientMessage),
        register:    make(chan *PeerConnectionState),
        unregister:  make(chan *PeerConnectionState),
        trackLocals: map[string]*webrtc.TrackLocalStaticRTP{},
        addTrack:    make(chan *webrtc.TrackRemote),
    }
}
func (h *SSEHub) run() {
    go func() {
        for range time.NewTicker(time.Second * 3).C {
            dispatchKeyFrame(h)
        }
    }()
    for {
        select {
        case client := <-h.register:
            h.clients[client] = true
            signalPeerConnections(h)
        case client := <-h.unregister:
            if _, ok := h.clients[client]; ok {
                delete(h.clients, client)
                signalPeerConnections(h)
            }
        case track := <-h.addTrack:
            trackLocal, err := webrtc.NewTrackLocalStaticRTP(track.Codec().RTPCodecCapability,
                track.ID(), track.StreamID())
            if err != nil {
                log.Println(err.Error())
                return
            }
            h.trackLocals[track.ID()] = trackLocal
            signalPeerConnections(h)
            go updateTrackValue(h, track)

        case message := <-h.broadcast:
            handleReceivedMessage(h, message)
        }
    }
}
func updateTrackValue(h *SSEHub, track *webrtc.TrackRemote) {
    defer func() {
        delete(h.trackLocals, track.ID())
        signalPeerConnections(h)
    }()

    buf := make([]byte, 1500)

    for {
        i, _, err := track.Read(buf)
        if err != nil {
            return
        }
        if _, err = h.trackLocals[track.ID()].Write(buf[:i]); err != nil {
            return
        }
    }
}
func handleReceivedMessage(h *SSEHub, message ClientMessage) {
    switch message.Event {
    case TextEvent:
        m, _ := json.Marshal(message)
        jsonText := string(m)

        for client := range h.clients {
            flusher, _ := client.client.w.(http.Flusher)

            fmt.Fprintf(client.client.w, "data: %s\n\n", jsonText)
            flusher.Flush()
        }
    case CandidateEvent:
        candidate := webrtc.ICECandidateInit{}
        if err := json.Unmarshal([]byte(message.Data), &candidate); err != nil {
            log.Println(err)
            return
        }
        for pc := range h.clients {
            if pc.client.userName == message.UserName {
                if err := pc.peerConnection.AddICECandidate(candidate); err != nil {
                    log.Println(err)
                    return
                }
            }
        }
    case AnswerEvent:
        answer := webrtc.SessionDescription{}
        if err := json.Unmarshal([]byte(message.Data), &answer); err != nil {
            log.Println(err)
            return
        }
        for pc := range h.clients {
            if pc.client.userName == message.UserName {
                if err := pc.peerConnection.SetRemoteDescription(answer); err != nil {
                    log.Println(err)
                    return
                }
            }
        }

    }
}
func signalPeerConnections(h *SSEHub) {
    defer func() {
        dispatchKeyFrame(h)
    }()
    for syncAttempt := 0; ; syncAttempt++ {
        if syncAttempt == 25 {
            // Release the lock and attempt a sync in 3 seconds. We might be blocking a RemoveTrack or AddTrack
            go func() {
                time.Sleep(time.Second * 3)
                signalPeerConnections(h)
            }()
            return
        }
        // For ignoring errors like below, execute attemptSync multiple times.
        // InvalidModificationError: invalid proposed signaling state transition: have-local-offer->SetLocal(offer)->have-local-offer
        if !attemptSync(h) {
            break
        }
    }
}
// Share received tracks to all connected peers.
func attemptSync(h *SSEHub) bool {
    for ps := range h.clients {
        if ps.peerConnection.ConnectionState() == webrtc.PeerConnectionStateClosed {
            delete(h.clients, ps)
            // We modified the slice, start from the beginning
            return true
        }
        existingSenders := map[string]bool{}

        for _, sender := range ps.peerConnection.GetSenders() {
            if sender.Track() == nil {
                continue
            }
            existingSenders[sender.Track().ID()] = true

            if _, ok := h.trackLocals[sender.Track().ID()]; !ok {
                if err := ps.peerConnection.RemoveTrack(sender); err != nil {
                    return true
                }
            }
        }
        for _, receiver := range ps.peerConnection.GetReceivers() {
            if receiver.Track() == nil {
                continue
            }
            existingSenders[receiver.Track().ID()] = true
        }
        for trackID := range h.trackLocals {
            if _, ok := existingSenders[trackID]; !ok {
                if _, err := ps.peerConnection.AddTrack(h.trackLocals[trackID]); err != nil {
                    return true
                }
            }
        }

        offer, err := ps.peerConnection.CreateOffer(nil)
        if err != nil {
            return true
        }
        messageJSON, err := NewOfferMessageJSON(ps.client.userName, offer)
        if err != nil {
            return true
        }

        if err = ps.peerConnection.SetLocalDescription(offer); err != nil {
            return true
        }
        flusher, _ := ps.client.w.(http.Flusher)

        fmt.Fprintf(ps.client.w, "data: %s\n\n", messageJSON)
        flusher.Flush()
    }
    return false
}
func dispatchKeyFrame(h *SSEHub) {
    for ps := range h.clients {
        for _, receiver := range ps.peerConnection.GetReceivers() {
            if receiver.Track() == nil {
                continue
            }

            _ = ps.peerConnection.WriteRTCP([]rtcp.Packet{
                &rtcp.PictureLossIndication{
                    MediaSSRC: uint32(receiver.Track().SSRC()),
                },
            })
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Channels

I create channels in SSEClient and SSEHub.
I tried adding some channels into SSEClient to send messages from SSEHub first.

But if I did that, the application hang when I sent text messages after connecting WebRTC.
Because I think the cause is a circular reference, I remove these channels and send messages from SSEHub.

Resources

Discussion (0)