In the third part of this tutorial series, we will add Redis Pub/Sub to our existing chat application (build in the previous parts). With the use of Redis Pub/Sub, we can scale our application by running multiple instances at the same time.
Preconditions
To follow along you should have completed part 1 and part 2 or grab the source from here.
What is Redis Pub/Sub?
Reds Pub/Sub is the Redis implementation of the Publish–subscribe pattern. This is a so-called “messaging pattern”, where senders of messages (publishers) don’t send their messages directly to receivers (subscribers) but publish their messages in a “channel”. Subscribers choose to subscribe to specific channels and will receive these published messages.
When we run multiple instances of the same application we can leverage these Pub/Sub channels to not only notify clients connected to the same instance but notify all clients connected to any instance.
Diagram of pub/sub subscriptions.
For our application every chat message is sent through a room, therefore we can use these rooms to publish and subscribe within their own channel. So we for each (running) room there will be a pub/sub channel (illustrated by the Room channels in the diagram above).
We would like to have a list of all the online users on each server as well, to be able to start a private chat for example. For this, we will use a “general” channel, where the WsServer can publish and subscribe. Ok, let’s start coding!
Step 1: Adding a persistence layer
Because Pub/Sub won’t playback missed messages we need some sort of persistence. If we scale our application after the service is running, the new instance needs a way to get all the existing data (rooms and users).
For this we will add a database, in this post we will keep it simple and use an SQLite database. Depending on your use case you ought to use a different database engine. To make this swap easy we will use the Repository Pattern.
Install the needed package with:
go get github.com/mattn/go-sqlite3
// config/database.go
package config
import (
"database/sql"
"log"
_ "github.com/mattn/go-sqlite3"
)
func InitDB() *sql.DB {
db, err := sql.Open("sqlite3", "./chatdb.db")
if err != nil {
log.Fatal(err)
}
sqlStmt := `
CREATE TABLE IF NOT EXISTS room (
id VARCHAR(255) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
private TINYINT NULL
);
`
_, err = db.Exec(sqlStmt)
if err != nil {
log.Fatal("%q: %s\n", err, sqlStmt)
}
sqlStmt = `
CREATE TABLE IF NOT EXISTS user (
id VARCHAR(255) NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL
);
`
_, err = db.Exec(sqlStmt)
if err != nil {
log.Fatal("%q: %s\n", err, sqlStmt)
}
return db
}
// main.go
..
import (
...
"github.com/jeroendk/chatApplication/config"
"github.com/jeroendk/chatApplication/repository"
)
func main() {
...
db := config.InitDB()
defer db.Close()
}
The code above will initialize the database when starting the Go application.
Room Repository
Next, we will add two repository files, first the roomRepository. To be able to use the room model in all our packages, we will create an interface for it in the models package. We add an interface for our roomRepository as well, this makes swapping out the implementation easier.
// models/room.go
package models
type Room interface {
GetId() string
GetName() string
GetPrivate() bool
}
type RoomRepository interface {
AddRoom(room Room)
FindRoomByName(name string) Room
}
// repository/roomRepository.go
package repository
import (
"database/sql"
"github.com/jeroendk/chatApplication/models"
)
type Room struct {
Id string
Name string
Private bool
}
func (room *Room) GetId() string {
return room.Id
}
func (room *Room) GetName() string {
return room.Name
}
func (room *Room) GetPrivate() bool {
return room.Private
}
type RoomRepository struct {
Db *sql.DB
}
func (repo *RoomRepository) AddRoom(room models.Room) {
stmt, err := repo.Db.Prepare("INSERT INTO room(id, name, private) values(?,?,?)")
checkErr(err)
_, err = stmt.Exec(room.GetId(), room.GetName(), room.GetPrivate())
checkErr(err)
}
func (repo *RoomRepository) FindRoomByName(name string) models.Room {
row := repo.Db.QueryRow("SELECT id, name, private FROM room where name = ? LIMIT 1", name)
var room Room
if err := row.Scan(&room.Id, &room.Name, &room.Private); err != nil {
if err == sql.ErrNoRows {
return nil
}
panic(err)
}
return &room
}
func checkErr(err error) {
if err != nil {
panic(err)
}
}
The repository file has two methods, one for adding a new room and one for finding a room based on the given name.
User Repository
We will do the same thing for the users, add the interfaces and create a repository:
// models/user.go
package models
type User interface {
GetId() string
GetName() string
}
type UserRepository interface {
AddUser(user User)
RemoveUser(user User)
FindUserById(ID string) User
GetAllUsers() []User
}
package repository
import (
"database/sql"
"log"
"github.com/jeroendk/chatApplication/models"
)
type User struct {
Id string `json:"id"`
Name string `json:"name"`
}
func (user *User) GetId() string {
return user.Id
}
func (user *User) GetName() string {
return user.Name
}
type UserRepository struct {
Db *sql.DB
}
func (repo *UserRepository) AddUser(user models.User) {
stmt, err := repo.Db.Prepare("INSERT INTO user(id, name) values(?,?)")
checkErr(err)
_, err = stmt.Exec(user.GetId(), user.GetName())
checkErr(err)
}
func (repo *UserRepository) RemoveUser(user models.User) {
stmt, err := repo.Db.Prepare("DELETE FROM user WHERE id = ?")
checkErr(err)
_, err = stmt.Exec(user.GetId())
checkErr(err)
}
func (repo *UserRepository) FindUserById(ID string) models.User {
row := repo.Db.QueryRow("SELECT id, name FROM user where id = ? LIMIT 1", ID)
var user User
if err := row.Scan(&user.Id, &user.Name); err != nil {
if err == sql.ErrNoRows {
return nil
}
panic(err)
}
return &user
}
func (repo *UserRepository) GetAllUsers() []models.User {
rows, err := repo.Db.Query("SELECT id, name FROM user")
if err != nil {
log.Fatal(err)
}
var users []models.User
defer rows.Close()
for rows.Next() {
var user User
rows.Scan(&user.Id, &user.Name)
users = append(users, &user)
}
return users
}
The user repository has four methods:
- AddUser , to add new users to the database.
- RemoveUser , to remove a user from the database.
- FindUserById , to find one user by a given ID.
- GetAllUsers , to retrieve all users from the database.
Updating existing code to use interfaces
Before we can proceed further, we first need to update some existing code to comply with the new interfaces.
Message
// message.go
import (
...
"github.com/jeroendk/chatApplication/models"
)
...
type Message struct {
Action string `json:"action"`
Message string `json:"message"`
Target *Room `json:"target"`
Sender models.User `json:"sender"` // Use model.User interface
}
...
// UnmarshalJSON custom unmarshel to create a Client instance for Sender
func (message *Message) UnmarshalJSON(data []byte) error {
type Alias Message
msg := &struct {
Sender Client `json:"sender"`
*Alias
}{
Alias: (*Alias)(message),
}
if err := json.Unmarshal(data, &msg); err != nil {
return err
}
message.Sender = &msg.Sender
return nil
}
Client
// client.go
import (
...
"github.com/jeroendk/chatApplication/models"
)
// Change the type sender from Client to the User interface.
func (client *Client) joinRoom(roomName string, sender models.User) {
...
}
func (client *Client) notifyRoomJoined(room *Room, sender models.User) {
...
}
// Add the GetId method to make Client compatible with model.User interface
func (client *Client) GetId() string {
return client.ID.String()
}
Room
// room.go
// Add the GetPrivate method to make Room compatible with model.Room interface
func (room *Room) GetPrivate() bool {
return room.Private
}
Step 2: Using the repositories
Currently, the chatServer is responsible for keeping track of the users and rooms. It does so by putting these entities in a map ( clients & rooms ). We will keep doing this but on top write both entities to the database.
For starters, add the two repositories as property in the struct and set them in the NewWebsocketServer method. We add a new property as well, “ users ” to keep track of all the users. The clients property is dedicated to actual clients, with an active WebSocket connection (this is in preparation for the Pub/Sub logic).
// chatServer.go
import (
"github.com/jeroendk/chatApplication/models"
)
type WsServer struct {
...
users []models.User
roomRepository models.RoomRepository
userRepository models.UserRepository
}
func NewWebsocketServer(roomRepository models.RoomRepository, userRepository models.UserRepository) *WsServer {
wsServer := &WsServer{
clients: make(map[*Client]bool),
register: make(chan *Client),
unregister: make(chan *Client),
rooms: make(map[*Room]bool),
roomRepository: roomRepository,
userRepository: userRepository,
}
// Add users from database to server
wsServer.users = userRepository.GetAllUsers()
return wsServer
}
When creating a new instance of the WsServer, all the users are loaded from the database.
The next step is to change te call to NewWebsocketServer in main.go and include the two repositories
// main.go
...
wsServer := NewWebsocketServer(&repository.RoomRepository{Db: db}, &repository.UserRepository{Db: db})
Using the room repository
Now that we have access to the repository we can use it inside the chatServer methods. First, we will update all existing methods to use the userRepository. Below are the modified methods, within the new code is marked with a comment.
// chatServer.go
func (server *WsServer) registerClient(client *Client) {
// NEW: Add user to the repo
server.userRepository.AddUser(client)
// Existing actions
server.notifyClientJoined(client)
server.listOnlineClients(client)
server.clients[client] = true
// NEW: Add user to the user slice
server.users = append(server.users, message.Sender)
}
func (server *WsServer) unregisterClient(client *Client) {
if _, ok := server.clients[client]; ok {
delete(server.clients, client)
server.notifyClientLeft(client)
// NEW: Remove user from slice
for i, user := range server.users {
if user.GetId() == message.Sender.GetId() {
server.users[i] = server.users[len(server.users)-1]
server.users = server.users[:len(server.users)-1]
}
}
// NEW: Remove user from repo
server.userRepository.RemoveUser(client)
}
}
func (server *WsServer) listOnlineClients(client *Client) {
// NEW: Use the users slice instead of the client map
for _, user := range server.users {
message := &Message{
Action: UserJoinedAction,
Sender: user,
}
client.send <- message.encode()
}
}
After adding the above all the online users should saved in the database. When a user disconnects it is remove from the database.
Using the user repository
Next up are the rooms. We don’t need all the rooms when we start the server. Therefore we only try to look for it in the repository when we can’t find it in the local map.
// chatServer.go
func (server *WsServer) findRoomByName(name string) *Room {
var foundRoom *Room
for room := range server.rooms {
if room.GetName() == name {
foundRoom = room
break
}
}
// NEW: if there is no room, try to create it from the repo
if foundRoom == nil {
// Try to run the room from the repository, if it is found.
foundRoom = server.runRoomFromRepository(name)
}
return foundRoom
}
// NEW: Try to find a room in the repo, if found Run it.
func (server *WsServer) runRoomFromRepository(name string) *Room {
var room *Room
dbRoom := server.roomRepository.FindRoomByName(name)
if dbRoom != nil {
room = NewRoom(dbRoom.GetName(), dbRoom.GetPrivate())
room.ID, _ = uuid.Parse(dbRoom.GetId())
go room.RunRoom()
server.rooms[room] = true
}
return room
}
func (server *WsServer) createRoom(name string, private bool) *Room {
room := NewRoom(name, private)
// NEW: Add room to repo
server.roomRepository.AddRoom(room)
go room.RunRoom()
server.rooms[room] = true
return room
}
That’s it, in the next step we will finally add the Pub/Sub integration.
Step 3: Redis Pub/Sub
Now, with everything in place, we can start to add the publishing and subscribing to Redis Pub/Sub channels.
First, install the Redis package:
go mod init
go get github.com/go-redis/redis/v8
Then make sure you have a Redis container at your disposal. You can create one with docker & docker-compose for example:
# docker-compose.yml
version: '3.5'
services:
redis:
image: "redis:alpine"
ports:
- "6364:6379"
Then start it with docker-compose up.
With your Redis container up and running lets create a connection within our application. For this we will create a new file called redis.go and lets put it in the config folder with our database connection.
// config/redis.go
package config
import "github.com/go-redis/redis/v8"
var Redis *redis.Client
func CreateRedisClient() {
opt, err := redis.ParseURL("redis://localhost:6364/0")
if err != nil {
panic(err)
}
redis := redis.NewClient(opt)
Redis = redis
}
Then initialize the connection from your main.go
// main.go
func main() {
...
config.CreateRedisClient()
...
}
There are a total of 4 different messages we want to send through the Pub/Sub channels.
- Chat messages
- User joined notification
- User left notification
- Private chat invitation
Chat messages
Sending chat messages inside a room is the job of our room.go. It is actually quite easy to integrate the Pub/Sub channels in this logic.
First, we will add two new methods, for publishing in a channel and subscribing to a channel:
// room.go
package main
import (
"fmt"
"log"
"github.com/jeroendk/chatApplication/config"
"github.com/google/uuid"
"context"
)
var ctx = context.Background()
...
func (room *Room) publishRoomMessage(message []byte) {
err := config.Redis.Publish(ctx, room.GetName(), message).Err()
if err != nil {
log.Println(err)
}
}
func (room *Room) subscribeToRoomMessages() {
pubsub := config.Redis.Subscribe(ctx, room.GetName())
ch := pubsub.Channel()
for msg := range ch {
room.broadcastToClientsInRoom([]byte(msg.Payload))
}
}
Then we will change the existing calls to broadcastToClientsInRoom , instead, they will use the new publish method. Also, start listing to the Pub/Sub subscription when starting the room.
// room.go
func (room *Room) RunRoom() {
// subscribe to pub/sub messages inside a new goroutine
go room.subscribeToRoomMessages()
for {
select {
...
case message := <-room.broadcast:
room.publishRoomMessage(message.encode())
}
}
}
func (room *Room) notifyClientJoined(client *Client) {
...
room.publishRoomMessage(message.encode())
}
User joined & left
Next, lets publish when users join & leave and subscribe to these events inside the chatServer.go
// chatServer.go
package main
import (
"encoding/json"
"log"
"github.com/google/uuid"
"github.com/jeroendk/chatApplication/config"
"github.com/jeroendk/chatApplication/models"
)
const PubSubGeneralChannel = "general"
// Publish userJoined message in pub/sub
func (server *WsServer) publishClientJoined(client *Client) {
message := &Message{
Action: UserJoinedAction,
Sender: client,
}
if err := config.Redis.Publish(ctx, PubSubGeneralChannel, message.encode()).Err(); err != nil {
log.Println(err)
}
}
// Publish userleft message in pub/sub
func (server *WsServer) publishClientLeft(client *Client) {
message := &Message{
Action: UserLeftAction,
Sender: client,
}
if err := config.Redis.Publish(ctx, PubSubGeneralChannel, message.encode()).Err(); err != nil {
log.Println(err)
}
}
// Listen to pub/sub general channels
func (server *WsServer) listenPubSubChannel() {
pubsub := config.Redis.Subscribe(ctx, PubSubGeneralChannel)
ch := pubsub.Channel()
for msg := range ch {
var message Message
if err := json.Unmarshal([]byte(msg.Payload), &message); err != nil {
log.Printf("Error on unmarshal JSON message %s", err)
return
}
switch message.Action {
case UserJoinedAction:
server.handleUserJoined(message)
case UserLeftAction:
server.handleUserLeft(message)
}
}
}
func (server *WsServer) handleUserJoined(message Message) {
// Add the user to the slice
server.users = append(server.users, message.Sender)
server.broadcastToClients(message.encode())
}
func (server *WsServer) handleUserLeft(message Message) {
// Remove the user from the slice
for i, user := range server.users {
if user.GetId() == message.Sender.GetId() {
server.users[i] = server.users[len(server.users)-1]
server.users = server.users[:len(server.users)-1]
}
}
server.broadcastToClients(message.encode())
}
publishClientJoined & publishClientLeft will replace notifyClientJoined & notifyClientLeft.
Then again start listening to the Channel and make sure the publish methods above are correctly used:
// chatServer.go
func (server *WsServer) Run() {
go server.listenPubSubChannel()
...
}
func (server *WsServer) registerClient(client *Client) {
// Add user to the repo
server.userRepository.AddUser(client)
// Publish user in PubSub
server.publishClientJoined(client)
server.listOnlineClients(client)
server.clients[client] = true
}
func (server *WsServer) unregisterClient(client *Client) {
if _, ok := server.clients[client]; ok {
delete(server.clients, client)
// Remove user from repo
server.userRepository.RemoveUser(client)
// Publish user left in PubSub
server.publishClientLeft(client)
}
}
Private chat
Almost done, the last piece of the puzzle is to let our users start private chats with each other while they are connected with different servers.
Star by changing the logic of client.go
// client.go
import (
...
"github.com/jeroendk/chatApplication/config"
...
)
func (client *Client) handleJoinRoomPrivateMessage(message Message) {
// instead of searching for a client, search for User by the given ID.
target := client.wsServer.findUserByID(message.Message)
if target == nil {
return
}
// create unique room name combined to the two IDs
roomName := message.Message + client.ID.String()
// Join room
joinedRoom := client.joinRoom(roomName, target)
// Instead of instantaneously joining the target client.
// Let the target client join with a invite request over pub/sub
if joinedRoom != nil {
client.inviteTargetUser(target, joinedRoom)
}
}
// JoinRoom now returns a room or nil
func (client *Client) joinRoom(roomName string, sender models.User) *Room {
room := client.wsServer.findRoomByName(roomName)
if room == nil {
room = client.wsServer.createRoom(roomName, sender != nil)
}
// Don't allow to join private rooms through public room message
if sender == nil && room.Private {
return nil
}
if !client.isInRoom(room) {
client.rooms[room] = true
room.register <- client
client.notifyRoomJoined(room, sender)
}
return room
}
// Send out invite message over pub/sub in the general channel.
func (client *Client) inviteTargetUser(target models.User, room *Room) {
inviteMessage := &Message{
Action: JoinRoomPrivateAction,
Message: target.GetId(),
Target: room,
Sender: client,
}
if err := config.Redis.Publish(ctx, PubSubGeneralChannel, inviteMessage.encode()).Err(); err != nil {
log.Println(err)
}
}
So our client is once again able to start a private chat. All we have to do now is to make sure the target client will join as well.
Add the code below to your chatServer.go. The first part adds one extra case in the Switch, to handle private chat invitations.
// chatServer.go
func (server *WsServer) listenPubSubChannel() {
...
switch message.Action {
...
case JoinRoomPrivateAction:
server.handleUserJoinPrivate(message)
}
}
func (server *WsServer) handleUserJoinPrivate(message Message) {
// Find client for given user, if found add the user to the room.
targetClient := server.findClientByID(message.Message)
if targetClient != nil {
targetClient.joinRoom(message.Target.GetName(), message.Sender)
}
}
// Add the findUserByID method used by client.go
func (server *WsServer) findUserByID(ID string) models.User {
var foundUser models.User
for _, client := range server.users {
if client.GetId() == ID {
foundUser = client
break
}
}
return foundUser
}
Result
To test the new set-up, you can start multiple instances of your application on different ports. make sure your Javascript WebSocket actually connects to the correct server. You can change the connection string as follows:
serverUrl: "ws://" + location.host + "/ws",
Then:
go run ./ --addr=:8080
go run ./ --addr=:8090
Done! You finished your Pub/Sub chat application in Go. Stay tuned for the last part in this series. There we will make users log-in before they can participate in chatting.
If you want your users to automatically reconnect after a short outage of some sort, check this out.
Feel free to leave a comment when you have suggestions or questions!
The final source code of this part van be found here:
https://github.com/jeroendk/go-vuejs-chat/tree/v3.0
Top comments (3)
But instead of using a sqlite db , you can use the redis database correct ??
thanks a lot for your effort
Great tutorial. So far the best on creating a chat app using go/redis.
Cannot wait to see the registration part.