loading...
Cover image for How to use Redis Pub/Sub in Go Chat Application (Part 3)

How to use Redis Pub/Sub in Go Chat Application (Part 3)

jeroendk profile image Jeroen de Kok Originally published at whichdev.com on ・12 min read

In the third part of this tutorial series, we will add Redis Pub/Sub to our existing chat application (build in the previous parts). With the use of Redis Pub/Sub, we can scale our application by running multiple instances at the same time.

Preconditions

To follow along you should have completed part 1 and part 2 or grab the source from here.

What is Redis Pub/Sub?

Reds Pub/Sub is the Redis implementation of the Publish–subscribe pattern. This is a so-called “messaging pattern”, where senders of messages (publishers) don’t send their messages directly to receivers (subscribers) but publish their messages in a “channel”. Subscribers choose to subscribe to specific channels and will receive these published messages.

When we run multiple instances of the same application we can leverage these Pub/Sub channels to not only notify clients connected to the same instance but notify all clients connected to any instance.

Diagram of pub/sub subscriptions.

For our application every chat message is sent through a room, therefore we can use these rooms to publish and subscribe within their own channel. So we for each (running) room there will be a pub/sub channel (illustrated by the Room channels in the diagram above).

We would like to have a list of all the online users on each server as well, to be able to start a private chat for example. For this, we will use a “general” channel, where the WsServer can publish and subscribe. Ok, let’s start coding!

Step 1: Adding a persistence layer

Because Pub/Sub won’t playback missed messages we need some sort of persistence. If we scale our application after the service is running, the new instance needs a way to get all the existing data (rooms and users).

For this we will add a database, in this post we will keep it simple and use an SQLite database. Depending on your use case you ought to use a different database engine. To make this swap easy we will use the Repository Pattern.

Install the needed package with:

go get github.com/mattn/go-sqlite3

// config/database.go
package config

import (
    "database/sql"
    "log"

    _ "github.com/mattn/go-sqlite3"
)

func InitDB() *sql.DB {
    db, err := sql.Open("sqlite3", "./chatdb.db")
    if err != nil {
        log.Fatal(err)
    }

    sqlStmt := `    
    CREATE TABLE IF NOT EXISTS room (
        id VARCHAR(255) NOT NULL PRIMARY KEY,
        name VARCHAR(255) NOT NULL,
        private TINYINT NULL
    );
    `
    _, err = db.Exec(sqlStmt)
    if err != nil {
        log.Fatal("%q: %s\n", err, sqlStmt)
    }

    sqlStmt = ` 
    CREATE TABLE IF NOT EXISTS user (
        id VARCHAR(255) NOT NULL PRIMARY KEY,
        name VARCHAR(255) NOT NULL
    );
    `
    _, err = db.Exec(sqlStmt)
    if err != nil {
        log.Fatal("%q: %s\n", err, sqlStmt)
    }

    return db
}


// main.go
..
import (
    ...
    "github.com/jeroendk/chatApplication/config"
    "github.com/jeroendk/chatApplication/repository"
)

func main() {
    ...
    db := config.InitDB()
    defer db.Close()
}
Enter fullscreen mode Exit fullscreen mode

The code above will initialize the database when starting the Go application.

Room Repository

Next, we will add two repository files, first the roomRepository. To be able to use the room model in all our packages, we will create an interface for it in the models package. We add an interface for our roomRepository as well, this makes swapping out the implementation easier.

// models/room.go
package models

type Room interface {
    GetId() string
    GetName() string
    GetPrivate() bool
}

type RoomRepository interface {
    AddRoom(room Room)
    FindRoomByName(name string) Room
}

// repository/roomRepository.go

package repository

import (
    "database/sql"

    "github.com/jeroendk/chatApplication/models"
)

type Room struct {
    Id string
    Name string
    Private bool
}

func (room *Room) GetId() string {
    return room.Id
}

func (room *Room) GetName() string {
    return room.Name
}

func (room *Room) GetPrivate() bool {
    return room.Private
}

type RoomRepository struct {
    Db *sql.DB
}

func (repo *RoomRepository) AddRoom(room models.Room) {
    stmt, err := repo.Db.Prepare("INSERT INTO room(id, name, private) values(?,?,?)")
    checkErr(err)

    _, err = stmt.Exec(room.GetId(), room.GetName(), room.GetPrivate())
    checkErr(err)
}

func (repo *RoomRepository) FindRoomByName(name string) models.Room {

    row := repo.Db.QueryRow("SELECT id, name, private FROM room where name = ? LIMIT 1", name)

    var room Room

    if err := row.Scan(&room.Id, &room.Name, &room.Private); err != nil {
        if err == sql.ErrNoRows {
            return nil
        }
        panic(err)
    }

    return &room

}

func checkErr(err error) {
    if err != nil {
        panic(err)
    }
}
Enter fullscreen mode Exit fullscreen mode

The repository file has two methods, one for adding a new room and one for finding a room based on the given name.

User Repository

We will do the same thing for the users, add the interfaces and create a repository:

// models/user.go
package models

type User interface {
    GetId() string
    GetName() string
}

type UserRepository interface {
    AddUser(user User)
    RemoveUser(user User)
    FindUserById(ID string) User
    GetAllUsers() []User
}


package repository

import (
    "database/sql"
    "log"

    "github.com/jeroendk/chatApplication/models"
)

type User struct {
    Id string `json:"id"`
    Name string `json:"name"`
}

func (user *User) GetId() string {
    return user.Id
}

func (user *User) GetName() string {
    return user.Name
}

type UserRepository struct {
    Db *sql.DB
}

func (repo *UserRepository) AddUser(user models.User) {
    stmt, err := repo.Db.Prepare("INSERT INTO user(id, name) values(?,?)")
    checkErr(err)

    _, err = stmt.Exec(user.GetId(), user.GetName())
    checkErr(err)
}

func (repo *UserRepository) RemoveUser(user models.User) {
    stmt, err := repo.Db.Prepare("DELETE FROM user WHERE id = ?")
    checkErr(err)

    _, err = stmt.Exec(user.GetId())
    checkErr(err)
}

func (repo *UserRepository) FindUserById(ID string) models.User {

    row := repo.Db.QueryRow("SELECT id, name FROM user where id = ? LIMIT 1", ID)

    var user User

    if err := row.Scan(&user.Id, &user.Name); err != nil {
        if err == sql.ErrNoRows {
            return nil
        }
        panic(err)
    }

    return &user

}

func (repo *UserRepository) GetAllUsers() []models.User {

    rows, err := repo.Db.Query("SELECT id, name FROM user")

    if err != nil {
        log.Fatal(err)
    }
    var users []models.User
    defer rows.Close()
    for rows.Next() {
        var user User
        rows.Scan(&user.Id, &user.Name)
        users = append(users, &user)
    }

    return users
}
Enter fullscreen mode Exit fullscreen mode

The user repository has four methods:

  1. AddUser , to add new users to the database.
  2. RemoveUser , to remove a user from the database.
  3. FindUserById , to find one user by a given ID.
  4. GetAllUsers , to retrieve all users from the database.

Updating existing code to use interfaces

Before we can proceed further, we first need to update some existing code to comply with the new interfaces.

Message

// message.go
import (
    ...
    "github.com/jeroendk/chatApplication/models"
)

... 

type Message struct {
    Action string `json:"action"`
    Message string `json:"message"`
    Target *Room `json:"target"`
    Sender models.User `json:"sender"` // Use model.User interface
}

...

// UnmarshalJSON custom unmarshel to create a Client instance for Sender 
func (message *Message) UnmarshalJSON(data []byte) error {
    type Alias Message
    msg := &struct {
        Sender Client `json:"sender"`
        *Alias
    }{
        Alias: (*Alias)(message),
    }
    if err := json.Unmarshal(data, &msg); err != nil {
        return err
    }
    message.Sender = &msg.Sender
    return nil
}
Enter fullscreen mode Exit fullscreen mode

Client

// client.go
import (
    ...
    "github.com/jeroendk/chatApplication/models"
)

// Change the type sender from Client to the User interface.
func (client *Client) joinRoom(roomName string, sender models.User) {
  ...
}

func (client *Client) notifyRoomJoined(room *Room, sender models.User) {
  ...
}

// Add the GetId method to make Client compatible with model.User interface
func (client *Client) GetId() string {
    return client.ID.String()
}
Enter fullscreen mode Exit fullscreen mode

Room

// room.go

// Add the GetPrivate method to make Room compatible with model.Room interface
func (room *Room) GetPrivate() bool {
    return room.Private
}

Enter fullscreen mode Exit fullscreen mode

Step 2: Using the repositories

Currently, the chatServer is responsible for keeping track of the users and rooms. It does so by putting these entities in a map ( clients & rooms ). We will keep doing this but on top write both entities to the database.

For starters, add the two repositories as property in the struct and set them in the NewWebsocketServer method. We add a new property as well, “ users ” to keep track of all the users. The clients property is dedicated to actual clients, with an active WebSocket connection (this is in preparation for the Pub/Sub logic).

// chatServer.go
import (    
    "github.com/jeroendk/chatApplication/models"
)

type WsServer struct {
    ...
    users []models.User
    roomRepository models.RoomRepository
    userRepository models.UserRepository
}

func NewWebsocketServer(roomRepository models.RoomRepository, userRepository models.UserRepository) *WsServer {
    wsServer := &WsServer{
        clients: make(map[*Client]bool),
        register: make(chan *Client),
        unregister: make(chan *Client),
        rooms: make(map[*Room]bool),
        roomRepository: roomRepository,
        userRepository: userRepository,
    }

    // Add users from database to server
    wsServer.users = userRepository.GetAllUsers()

    return wsServer
}
Enter fullscreen mode Exit fullscreen mode

When creating a new instance of the WsServer, all the users are loaded from the database.

The next step is to change te call to NewWebsocketServer in main.go and include the two repositories

// main.go
...
wsServer := NewWebsocketServer(&repository.RoomRepository{Db: db}, &repository.UserRepository{Db: db})
Enter fullscreen mode Exit fullscreen mode

Using the room repository

Now that we have access to the repository we can use it inside the chatServer methods. First, we will update all existing methods to use the userRepository. Below are the modified methods, within the new code is marked with a comment.

// chatServer.go

func (server *WsServer) registerClient(client *Client) {
    // NEW: Add user to the repo
    server.userRepository.AddUser(client)    

    // Existing actions
    server.notifyClientJoined(client)
    server.listOnlineClients(client)
    server.clients[client] = true

    // NEW: Add user to the user slice
    server.users = append(server.users, message.Sender)
}

func (server *WsServer) unregisterClient(client *Client) {
    if _, ok := server.clients[client]; ok {
        delete(server.clients, client)
        server.notifyClientLeft(client)

        // NEW: Remove user from slice
        for i, user := range server.users {
          if user.GetId() == message.Sender.GetId() {
            server.users[i] = server.users[len(server.users)-1]
            server.users = server.users[:len(server.users)-1]
          }
        }

        // NEW: Remove user from repo
        server.userRepository.RemoveUser(client)
    }
}

func (server *WsServer) listOnlineClients(client *Client) {
    // NEW: Use the users slice instead of the client map
    for _, user := range server.users {
      message := &Message{
        Action: UserJoinedAction,
        Sender: user,
      }
      client.send <- message.encode()
    }
}

Enter fullscreen mode Exit fullscreen mode

After adding the above all the online users should saved in the database. When a user disconnects it is remove from the database.

Using the user repository

Next up are the rooms. We don’t need all the rooms when we start the server. Therefore we only try to look for it in the repository when we can’t find it in the local map.

// chatServer.go

func (server *WsServer) findRoomByName(name string) *Room {
    var foundRoom *Room
    for room := range server.rooms {
        if room.GetName() == name {
            foundRoom = room
            break
        }
    }

    // NEW: if there is no room, try to create it from the repo
    if foundRoom == nil {
        // Try to run the room from the repository, if it is found.
        foundRoom = server.runRoomFromRepository(name)
    }

    return foundRoom
}

// NEW: Try to find a room in the repo, if found Run it.
func (server *WsServer) runRoomFromRepository(name string) *Room {
    var room *Room
    dbRoom := server.roomRepository.FindRoomByName(name)
    if dbRoom != nil {
        room = NewRoom(dbRoom.GetName(), dbRoom.GetPrivate())
        room.ID, _ = uuid.Parse(dbRoom.GetId())

        go room.RunRoom()
        server.rooms[room] = true
    }

    return room
}

func (server *WsServer) createRoom(name string, private bool) *Room {
    room := NewRoom(name, private)
    // NEW: Add room to repo
    server.roomRepository.AddRoom(room)

    go room.RunRoom()
    server.rooms[room] = true

    return room
}
Enter fullscreen mode Exit fullscreen mode

That’s it, in the next step we will finally add the Pub/Sub integration.

Step 3: Redis Pub/Sub

Now, with everything in place, we can start to add the publishing and subscribing to Redis Pub/Sub channels.

First, install the Redis package:

go mod init
go get github.com/go-redis/redis/v8
Enter fullscreen mode Exit fullscreen mode

Then make sure you have a Redis container at your disposal. You can create one with docker & docker-compose for example:

# docker-compose.yml
version: '3.5'

services:
  redis:
    image: "redis:alpine"
    ports:
      - "6364:6379"
Enter fullscreen mode Exit fullscreen mode

Then start it with docker-compose up.

With your Redis container up and running lets create a connection within our application. For this we will create a new file called redis.go and lets put it in the config folder with our database connection.

// config/redis.go

package config

import "github.com/go-redis/redis/v8"

var Redis *redis.Client

func CreateRedisClient() {
    opt, err := redis.ParseURL("redis://localhost:6364/0")
    if err != nil {
        panic(err)
    }

    redis := redis.NewClient(opt)
    Redis = redis
}
Enter fullscreen mode Exit fullscreen mode

Then initialize the connection from your main.go

// main.go

func main() {
    ...
    config.CreateRedisClient()
    ...
}
Enter fullscreen mode Exit fullscreen mode

There are a total of 4 different messages we want to send through the Pub/Sub channels.

  • Chat messages
  • User joined notification
  • User left notification
  • Private chat invitation

Chat messages

Sending chat messages inside a room is the job of our room.go. It is actually quite easy to integrate the Pub/Sub channels in this logic.

First, we will add two new methods, for publishing in a channel and subscribing to a channel:

// room.go
package main
import (
    "fmt"
    "log"
    "github.com/jeroendk/chatApplication/config"
    "github.com/google/uuid"
    "context"
)

var ctx = context.Background()

...
func (room *Room) publishRoomMessage(message []byte) {
    err := config.Redis.Publish(ctx, room.GetName(), message).Err()

    if err != nil {
        log.Println(err)
    }
}

func (room *Room) subscribeToRoomMessages() {
    pubsub := config.Redis.Subscribe(ctx, room.GetName())

    ch := pubsub.Channel()

    for msg := range ch {
        room.broadcastToClientsInRoom([]byte(msg.Payload))
    }
}
Enter fullscreen mode Exit fullscreen mode

Then we will change the existing calls to broadcastToClientsInRoom , instead, they will use the new publish method. Also, start listing to the Pub/Sub subscription when starting the room.

// room.go 
func (room *Room) RunRoom() {
    // subscribe to pub/sub messages inside a new goroutine
    go room.subscribeToRoomMessages()

    for {
        select {
        ...
        case message := <-room.broadcast:
            room.publishRoomMessage(message.encode())
        }
    }
}

func (room *Room) notifyClientJoined(client *Client) {
    ...
    room.publishRoomMessage(message.encode())
}
Enter fullscreen mode Exit fullscreen mode

User joined & left

Next, lets publish when users join & leave and subscribe to these events inside the chatServer.go

// chatServer.go
package main

import (
    "encoding/json"
    "log"

    "github.com/google/uuid"
    "github.com/jeroendk/chatApplication/config"
    "github.com/jeroendk/chatApplication/models"
)

const PubSubGeneralChannel = "general"

// Publish userJoined message in pub/sub
func (server *WsServer) publishClientJoined(client *Client) {

    message := &Message{
        Action: UserJoinedAction,
        Sender: client,
    }

    if err := config.Redis.Publish(ctx, PubSubGeneralChannel, message.encode()).Err(); err != nil {
        log.Println(err)
    }
}

// Publish userleft message in pub/sub
func (server *WsServer) publishClientLeft(client *Client) {

    message := &Message{
        Action: UserLeftAction,
        Sender: client,
    }

    if err := config.Redis.Publish(ctx, PubSubGeneralChannel, message.encode()).Err(); err != nil {
        log.Println(err)
    }
}

// Listen to pub/sub general channels
func (server *WsServer) listenPubSubChannel() {

    pubsub := config.Redis.Subscribe(ctx, PubSubGeneralChannel)
    ch := pubsub.Channel()
    for msg := range ch {

        var message Message
        if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil {
            log.Printf("Error on unmarshal JSON message %s", err)
            return
        }

        switch message.Action {
        case UserJoinedAction:
            server.handleUserJoined(message)
        case UserLeftAction:
            server.handleUserLeft(message)      
        }
    }
}

func (server *WsServer) handleUserJoined(message Message) {
    // Add the user to the slice
    server.users = append(server.users, message.Sender)
    server.broadcastToClients(message.encode())
}

func (server *WsServer) handleUserLeft(message Message) {
    // Remove the user from the slice
    for i, user := range server.users {
        if user.GetId() == message.Sender.GetId() {
            server.users[i] = server.users[len(server.users)-1]
            server.users = server.users[:len(server.users)-1]
        }
    }
    server.broadcastToClients(message.encode())
}

Enter fullscreen mode Exit fullscreen mode

publishClientJoined & publishClientLeft will replace notifyClientJoined & notifyClientLeft.

Then again start listening to the Channel and make sure the publish methods above are correctly used:

// chatServer.go
func (server *WsServer) Run() {
    go server.listenPubSubChannel()
    ...
}

func (server *WsServer) registerClient(client *Client) {
    // Add user to the repo
    server.userRepository.AddUser(client)

    // Publish user in PubSub
    server.publishClientJoined(client)

    server.listOnlineClients(client)
    server.clients[client] = true
}

func (server *WsServer) unregisterClient(client *Client) {
    if _, ok := server.clients[client]; ok {
        delete(server.clients, client)

        // Remove user from repo
        server.userRepository.RemoveUser(client)

        // Publish user left in PubSub
        server.publishClientLeft(client)
    }
}

Enter fullscreen mode Exit fullscreen mode

Private chat

Almost done, the last piece of the puzzle is to let our users start private chats with each other while they are connected with different servers.

Star by changing the logic of client.go

// client.go

import (    
    ...
    "github.com/jeroendk/chatApplication/config"
    ...
)

func (client *Client) handleJoinRoomPrivateMessage(message Message) {
    // instead of searching for a client, search for User by the given ID.
    target := client.wsServer.findUserByID(message.Message)
    if target == nil {
        return
    }

    // create unique room name combined to the two IDs
    roomName := message.Message + client.ID.String()

    // Join room
    joinedRoom := client.joinRoom(roomName, target)

    // Instead of instantaneously joining the target client. 
    // Let the target client join with a invite request over pub/sub
    if joinedRoom != nil {
        client.inviteTargetUser(target, joinedRoom)
    }
}

// JoinRoom now returns a room or nil
func (client *Client) joinRoom(roomName string, sender models.User) *Room {

    room := client.wsServer.findRoomByName(roomName)
    if room == nil {
        room = client.wsServer.createRoom(roomName, sender != nil)
    }

    // Don't allow to join private rooms through public room message
    if sender == nil && room.Private {
        return nil
    }

    if !client.isInRoom(room) {
        client.rooms[room] = true
        room.register <- client
        client.notifyRoomJoined(room, sender)
    }
    return room
}

// Send out invite message over pub/sub in the general channel.
func (client *Client) inviteTargetUser(target models.User, room *Room) {
    inviteMessage := &Message{
        Action: JoinRoomPrivateAction,
        Message: target.GetId(),
        Target: room,
        Sender: client,
    }

    if err := config.Redis.Publish(ctx, PubSubGeneralChannel, inviteMessage.encode()).Err(); err != nil {
        log.Println(err)
    }
}

Enter fullscreen mode Exit fullscreen mode

So our client is once again able to start a private chat. All we have to do now is to make sure the target client will join as well.

Add the code below to your chatServer.go. The first part adds one extra case in the Switch, to handle private chat invitations.

// chatServer.go
func (server *WsServer) listenPubSubChannel() {
    ...

        switch message.Action {
        ...
        case JoinRoomPrivateAction:
            server.handleUserJoinPrivate(message)
        }
}

func (server *WsServer) handleUserJoinPrivate(message Message) {
    // Find client for given user, if found add the user to the room.
    targetClient := server.findClientByID(message.Message)
    if targetClient != nil {
        targetClient.joinRoom(message.Target.GetName(), message.Sender)
    }
}

// Add the findUserByID method used by client.go
func (server *WsServer) findUserByID(ID string) models.User {
    var foundUser models.User
    for _, client := range server.users {
        if client.GetId() == ID {
            foundUser = client
            break
        }
    }

    return foundUser
}
Enter fullscreen mode Exit fullscreen mode

Result

To test the new set-up, you can start multiple instances of your application on different ports. make sure your Javascript WebSocket actually connects to the correct server. You can change the connection string as follows:

serverUrl: "ws://" + location.host + "/ws",
Enter fullscreen mode Exit fullscreen mode

Then:

go run ./ --addr=:8080
go run ./ --addr=:8090
Enter fullscreen mode Exit fullscreen mode

Done! You finished your Pub/Sub chat application in Go. Stay tuned for the last part in this series. There we will make users log-in before they can participate in chatting.

If you want your users to automatically reconnect after a short outage of some sort, check this out.

Feel free to leave a comment when you have suggestions or questions!

The final source code of this part van be found here:

https://github.com/jeroendk/go-vuejs-chat/tree/v3.0

Discussion

pic
Editor guide