DEV Community

Cover image for How to use ZeroMQ Pub/Sub Pattern in Golang
Francisco Mendes
Francisco Mendes

Posted on

How to use ZeroMQ Pub/Sub Pattern in Golang

Overview

Pub/Sub is a pattern where the publisher is not programmed to send a message (payload) to a specific receiver. These messages are sent by publishers to specific channels, and receivers can subscribe to one or more channels to consume those same messages.

Imagine that you have a monolithic backend, however you want to add a new feature to that backend, such as sending emails. Instead of this backend being responsible for sending the emails, you can make it a publisher that sends the emails to a channel to be consumed by another backend (receiver) that will be responsible for sending the emails (like newsletters).

Today's example

The implementation of this process is quite simple, and that's why in today's example I decided to create a simple Api so that it will receive the body of our request and will send it to a specific channel to be consumed by a receiver and log it.

Let's code

As you may have already understood, we are going to have two backends. One of the backends we will call a server, which will be our message sender. The other backend will be the worker, which will be our small microservice.

First and foremost, let's install our dependencies:

go get github.com/gofiber/fiber/v2
go get github.com/pebbe/zmq4
Enter fullscreen mode Exit fullscreen mode

Now let's create a simple API:

// @/server/server.go
package main

import "github.com/gofiber/fiber/v2"

func main() {
    app := fiber.New()

    app.Get("/", func(c *fiber.Ctx) error {
        return c.SendString("Hello there 👋")
    })

    app.Listen(":3000")
}
Enter fullscreen mode Exit fullscreen mode

We can start by working on our struct of the data coming from the request body, according to the example it should look like this:

type Input struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}
Enter fullscreen mode Exit fullscreen mode

Now we can import zmq4 into our project and let's create our client.

// @/server/server.go
package main

import (
    "github.com/gofiber/fiber/v2"
    zmq "github.com/pebbe/zmq4"
)

type Input struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    app := fiber.New()
    zctx, _ := zmq.NewContext()

    // ...
}
Enter fullscreen mode Exit fullscreen mode

Then we will create our ZeroMQ socket of the Publisher type and we will accept connections through an address defined by us.

// @/server/server.go
package main

import (
    "encoding/json"

    "github.com/gofiber/fiber/v2"
    zmq "github.com/pebbe/zmq4"
)

type Input struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    app := fiber.New()
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PUB)
    s.Bind("tcp://*:5757")

    // ...
}
Enter fullscreen mode Exit fullscreen mode

Then, in our endpoint, we will change the http verb from Get to Post and then we will parse the data coming from the request body using the c.BodyParser() function.

// @/server/server.go
package main

import (
    "github.com/gofiber/fiber/v2"
    zmq "github.com/pebbe/zmq4"
)

type Input struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    app := fiber.New()
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PUB)
    s.Bind("tcp://*:5757")

    app.Post("/", func(c *fiber.Ctx) error {
        input := new(Input)
        if err := c.BodyParser(input); err != nil {
            panic(err)
        }

        // ...
    })

    app.Listen(":3000")
}
Enter fullscreen mode Exit fullscreen mode

Now let's convert the data from the http request into the buffer.

// @/server/server.go
package main

import (
    "encoding/json"

    "github.com/gofiber/fiber/v2"
    zmq "github.com/pebbe/zmq4"
)

type Input struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    app := fiber.New()
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PUB)
    s.Bind("tcp://*:5757")

    app.Post("/", func(c *fiber.Ctx) error {
        input := new(Input)
        if err := c.BodyParser(input); err != nil {
            panic(err)
        }

        if buffer, err := json.Marshal(input); err != nil {
            panic(err)
        } else {
            // ...
        }

        // ...
    })

    app.Listen(":3000")
}
Enter fullscreen mode Exit fullscreen mode

If there was no error during buffer conversion, we will first define the channel where we want to publish the messages. This way:

// @/server/server.go
package main

import (
    "encoding/json"

    "github.com/gofiber/fiber/v2"
    zmq "github.com/pebbe/zmq4"
)

type Input struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    app := fiber.New()
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PUB)
    s.Bind("tcp://*:5757")

    app.Post("/", func(c *fiber.Ctx) error {
        input := new(Input)
        if err := c.BodyParser(input); err != nil {
            panic(err)
        }

        if buffer, err := json.Marshal(input); err != nil {
            panic(err)
        } else {
            s.Send("dev.to", zmq.SNDMORE)
            // ...
        }

        // ...
    })

    app.Listen(":3000")
}
Enter fullscreen mode Exit fullscreen mode

Afterwards we will send the data in the message, but before we send it we have to convert our buffer into a string.

// @/server/server.go
package main

import (
    "encoding/json"

    "github.com/gofiber/fiber/v2"
    zmq "github.com/pebbe/zmq4"
)

type Input struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    app := fiber.New()
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PUB)
    s.Bind("tcp://*:5757")

    app.Post("/", func(c *fiber.Ctx) error {
        input := new(Input)
        if err := c.BodyParser(input); err != nil {
            panic(err)
        }

        if buffer, err := json.Marshal(input); err != nil {
            panic(err)
        } else {
            s.Send("dev.to", zmq.SNDMORE)
            s.Send(string(buffer), 0)
        }

        // ...
    })

    app.Listen(":3000")
}
Enter fullscreen mode Exit fullscreen mode

Finally, just send a message to the user in the body of the response.

// @/server/server.go
package main

import (
    "encoding/json"

    "github.com/gofiber/fiber/v2"
    zmq "github.com/pebbe/zmq4"
)

type Input struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    app := fiber.New()
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.PUB)
    s.Bind("tcp://*:5757")

    app.Post("/", func(c *fiber.Ctx) error {
        input := new(Input)
        if err := c.BodyParser(input); err != nil {
            panic(err)
        }

        if buffer, err := json.Marshal(input); err != nil {
            panic(err)
        } else {
            s.Send("dev.to", zmq.SNDMORE)
            s.Send(string(buffer), 0)
        }

        return c.SendString("Sent to the subscriber/worker.")
    })

    app.Listen(":3000")
}
Enter fullscreen mode Exit fullscreen mode

With this we have our server finished and now we can start working on our worker. Let's import zmq4 into our project and let's create our client.

// @/worker/worker.go
package main

import (
    zmq "github.com/pebbe/zmq4"
)

func main() {
    zctx, _ := zmq.NewContext()

    // ...
}
Enter fullscreen mode Exit fullscreen mode

Then we will create our ZeroMQ socket of the Subscriber type and we will accept connections through the address that we defined before.

And with an instance of our client created and the connection established, we can subscribe to our channel to receive messages from it.

// @/worker/worker.go
package main

import (
    zmq "github.com/pebbe/zmq4"
)

func main() {
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.SUB)
    s.Connect("tcp://localhost:5757")
    s.SetSubscribe("dev.to")

    // ...
}
Enter fullscreen mode Exit fullscreen mode

Now let's create a struct with the fields exactly the same as the ones we're going to receive in the message.

type Message struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}
Enter fullscreen mode Exit fullscreen mode

Next, let's create a for loop so we can log each of the messages that are published in the specific channel. First we will receive from our socket the name of the channel where the message is coming from.

// @/worker/worker.go
package main

import (
    zmq "github.com/pebbe/zmq4"
)

type Message struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.SUB)
    s.Connect("tcp://localhost:5757")
    s.SetSubscribe("dev.to")

    for {
        address, err := s.Recv(0)
        if err != nil {
            panic(err)
        }

        // ...
    }
}
Enter fullscreen mode Exit fullscreen mode

Next, we will fetch the respective message and if there is no error when receiving it, we will convert the message contents back to JSON.

// @/worker/worker.go
package main

import (
    "encoding/json"

    zmq "github.com/pebbe/zmq4"
)

type Message struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.SUB)
    s.Connect("tcp://localhost:5757")
    s.SetSubscribe("dev.to")

    for {
        address, err := s.Recv(0)
        if err != nil {
            panic(err)
        }

        if msg, err := s.Recv(0); err != nil {
            panic(err)
        } else {
            contents := Message{}
            if err := json.Unmarshal([]byte(msg), &contents); err != nil {
                panic(err)
            }
            // ...
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Finally, let's log the channel the message comes from and let's do the same with the message.

// @/worker/worker.go
package main

import (
    "encoding/json"
    "fmt"

    zmq "github.com/pebbe/zmq4"
)

type Message struct {
    Title   string `json:"title"`
    Content string `json:"content"`
}

func main() {
    zctx, _ := zmq.NewContext()

    s, _ := zctx.NewSocket(zmq.SUB)
    s.Connect("tcp://localhost:5757")
    s.SetSubscribe("dev.to")

    for {
        address, err := s.Recv(0)
        if err != nil {
            panic(err)
        }

        if msg, err := s.Recv(0); err != nil {
            panic(err)
        } else {
            contents := Message{}
            if err := json.Unmarshal([]byte(msg), &contents); err != nil {
                panic(err)
            }
            fmt.Println("Received message from " + address + " channel.")
            fmt.Printf("%+v\n", contents)
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Now when testing our Api with a tool similar to Postman, you can send a json object in the request body with the same properties that are defined in our struct Input.

testing the project

Then you should have something similar to this on your terminal:

terminal logs

Conclusion

As always, I hope you found it interesting. If you noticed any errors in this article, please mention them in the comments. 🧑🏻‍💻

Hope you have a great day! 🤪

Discussion (0)