DEV Community

Cover image for Golang and Event-Driven Architecture
Tamimi
Tamimi

Posted on

Golang and Event-Driven Architecture

Solace Gopher Solly

On March 31 2022, I gave a talk at Conf42:Golang about using Go in an event-driven architecture entitled "Gopher in an Event Driven Playground". You can check out the talk here or read along to know more about event-driven systems and how different messaging protocols are used in Go!

What's all the hype about?

For all of us Go enthusiasts out there, we truly understand the beauty of using Go in applications and microservices because of its lightweight, high performance, and elegant syntax, to name just a few (let the debate start! 🔥).

So imagine using your favourite programming language (yes, that is Go) with your favourite architecture (yes, that event-driven architecture) - where do you start? Carry on and read further to explore.

Hold on, what is an event-driven architecture (EDA)?

Glad you asked! There are loads of resources online that talk about EDA. At its core, an event-driven architecture involves asynchronous communication between applications via publishing and subscribing to events over an event broker using a messaging protocol.

Some examples of messaging protocols include open standard and open source protocols such as MQTT, AMQP, and JMS.

To delve more into event-driven architecture and explore different use-cases, check out this page on What is Event-Driven Architecture.

I also wrote a blog post talking about how I built an event-driven NodeJS app on real-time COVID-19 data streams, highlighting the advantages of event-driven architecture, if you want to explore it further:

fmt.Println("Show me the CODE!")

Alright, we did a high level overview on what EDA is but let's get to business with some hands-on code. Let me show you how to use:

  1. The Paho MQTT library for Golang
  2. The PubSub+ Messaging API for Go

Note that all the source code can be found in this open source Github repo:

Conf42 Golang 2022: Gopher in an Event-Driven Playground

Talk

Talk found on YouTube here https://www.youtube.com/watch?v=XBkBpFYAHiw&feature=youtu.be

Source Code

This repo contains source code to showcase how to use the

  • Paho MQTT library to publish and subscribe to
  • Solace PubSub+ Messaging API for Go to publish messages

How to run

  • go run solace_publisher.go
  • go run paho_mqtt.go

Note: Update the hostname, username, password, and vpn name to your broker's details

Extra Resources




Hands-on 👩‍💻🧑‍💻

Let's start with creating a new directory and initializing go mod. In a new terminal window, execute the following commands:

mkdir funWithEDA && cd "$_"
go mod init GoEDA
Enter fullscreen mode Exit fullscreen mode

MQTT

As per definition: "MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth"

The Eclipse Paho project provides open source, mainly client side, implementations of MQTT in a variety of programming languages. For today's fun coding session, we will be using the Eclipse Paho MQTT Go client.

Install the Paho MQTT Go library

go get github.com/eclipse/paho.mqtt.golang
Enter fullscreen mode Exit fullscreen mode

Create a new file and open it with your favourite IDE. I named my file go_mqtt.go.

Initialize the file and import the necessary libraries

package main

import (
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)
Enter fullscreen mode Exit fullscreen mode

In your main() function, start with

  1. Creating a new MQTT client and configure it
  2. Add callback functions for
    1. Received Messages
    2. Successful broker connection
    3. Lost broker connection
  3. Subscribe to a topic
  4. Publish on a topic
func main() {
    var broker = "public.messaging.solace.cloud"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    opts.SetUsername("conf42")
    opts.SetPassword("public")
    opts.SetDefaultPublishHandler(messageHandler)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    sub(client)
    publish(client)

    client.Disconnect(250)
}
Enter fullscreen mode Exit fullscreen mode

We will need to define the callback functions as follows

var messageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    options := client.OptionsReader()
    fmt.Println("Connected to: ", options.Servers())
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    fmt.Printf("Connect lost: %v", err)
}
Enter fullscreen mode Exit fullscreen mode

It is important to note that these functions will be triggered "on" particular actions. So for example, the messageHandler function will be triggered whenever the MQTT client receives a message via mqtt.MessageHandler.

And finally, define your publish and subscribe functions as follows:

func publish(client mqtt.Client) {
    num := 10
    for i := 0; i < num; i++ {
        text := fmt.Sprintf("Message %d", i)
        token := client.Publish("conf42/go", 0, false, text)
        token.Wait()
        time.Sleep(time.Second)
    }
}

func sub(client mqtt.Client) {
    topic := "conf42/#"
    token := client.Subscribe(topic, 1, nil)
    token.Wait()
    fmt.Printf("Subscribed to topic: %s\n", topic)
}
Enter fullscreen mode Exit fullscreen mode

And that's it! Run the application and observe the results

go run go_mqtt.go
Enter fullscreen mode Exit fullscreen mode

Solace PubSub+ Messaging API for Go

Now that you are an expert on messaging concepts with Go, let's take it up a notch and delve into a more advanced messaging API! We'll be using the Solace PubSub+ Messaging API for Go.

Install the Solace Native Go API

go get solace.dev/go/messaging
Enter fullscreen mode Exit fullscreen mode

Create a new file and open it with your favourite IDE. I named my file solace_publisher.go.

Import the necessary packages

package main

import (
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"

    "solace.dev/go/messaging"
    "solace.dev/go/messaging/pkg/solace/config"
    "solace.dev/go/messaging/pkg/solace/resource"
)
Enter fullscreen mode Exit fullscreen mode

Define the configuration parameters to connect to the Solace PubSub+ Broker

// Configuration parameters
    brokerConfig := config.ServicePropertyMap{
        config.TransportLayerPropertyHost:                "tcp://public.messaging.solace.cloud",
        config.ServicePropertyVPNName:                    "public",
        config.AuthenticationPropertySchemeBasicUserName: "conf42",
        config.AuthenticationPropertySchemeBasicPassword: "public",
    }
Enter fullscreen mode Exit fullscreen mode

Initialize a messaging service and connect to it

messagingService, err := messaging.NewMessagingServiceBuilder().FromConfigurationProvider(brokerConfig).Build()

if err != nil {
    panic(err)
}

// Connect to the messaging serice
if err := messagingService.Connect(); err != nil {
    panic(err)
}
Enter fullscreen mode Exit fullscreen mode

Build a Direct Message Publisher and start it

//  Build a Direct Message Publisher
directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build()
if builderErr != nil {
    panic(builderErr)
}

// Start the publisher
startErr := directPublisher.Start()
if startErr != nil {
    panic(startErr)
}
Enter fullscreen mode Exit fullscreen mode

Publish messages in a loop

msgSeqNum := 0

//  Prepare outbound message payload and body
messageBody := "Hello from Conf42"
messageBuilder := messagingService.MessageBuilder().
    WithProperty("application", "samples").
    WithProperty("language", "go")

// Run forever until an interrupt signal is received
go func() {
    for directPublisher.IsReady() {
        msgSeqNum++
        message, err := messageBuilder.BuildWithStringPayload(messageBody + " --> " + strconv.Itoa(msgSeqNum))
        if err != nil {
            panic(err)
        }
        topic := resource.TopicOf("conf42/solace/go/" + strconv.Itoa(msgSeqNum))

        // Publish on dynamic topic with dynamic body
        publishErr := directPublisher.Publish(message, topic)
        if publishErr != nil {
            panic(publishErr)
        }

        fmt.Println("Published message on topic: ", topic.GetName())
        time.Sleep(1 * time.Second)
    }
}()
Enter fullscreen mode Exit fullscreen mode

This is the final application.

package main

import (
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"

    "solace.dev/go/messaging"
    "solace.dev/go/messaging/pkg/solace/config"
    "solace.dev/go/messaging/pkg/solace/resource"
)

func main() {
    // Configuration parameters
    brokerConfig := config.ServicePropertyMap{
        config.TransportLayerPropertyHost:                "tcp://public.messaging.solace.cloud",
        config.ServicePropertyVPNName:                    "public",
        config.AuthenticationPropertySchemeBasicUserName: "conf42",
        config.AuthenticationPropertySchemeBasicPassword: "public",
    }

    messagingService, err := messaging.NewMessagingServiceBuilder().FromConfigurationProvider(brokerConfig).Build()

    if err != nil {
        panic(err)
    }

    // Connect to the messaging serice
    if err := messagingService.Connect(); err != nil {
        panic(err)
    }

    //  Build a Direct Message Publisher
    directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build()
    if builderErr != nil {
        panic(builderErr)
    }

    // Start the publisher
    startErr := directPublisher.Start()
    if startErr != nil {
        panic(startErr)
    }

    msgSeqNum := 0

    //  Prepare outbound message payload and body
    messageBody := "Hello from Conf42"
    messageBuilder := messagingService.MessageBuilder().
        WithProperty("application", "samples").
        WithProperty("language", "go")

    // Run forever until an interrupt signal is received
    go func() {
        for directPublisher.IsReady() {
            msgSeqNum++
            message, err := messageBuilder.BuildWithStringPayload(messageBody + " --> " + strconv.Itoa(msgSeqNum))
            if err != nil {
                panic(err)
            }
            topic := resource.TopicOf("conf42/solace/go/" + strconv.Itoa(msgSeqNum))

            // Publish on dynamic topic with dynamic body
            publishErr := directPublisher.Publish(message, topic)
            if publishErr != nil {
                panic(publishErr)
            }

            fmt.Println("Published message on topic: ", topic.GetName())
            time.Sleep(1 * time.Second)
        }
    }()

    // Handle OS interrupts
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)

    // Block until an OS interrupt signal is received.
    <-c

    // Terminate the Direct Publisher
    directPublisher.Terminate(1 * time.Second)
    fmt.Println("\nDirect Publisher Terminated? ", directPublisher.IsTerminated())
    // Disconnect the Message Service
    messagingService.Disconnect()
    fmt.Println("Messaging Service Disconnected? ", !messagingService.IsConnected())

}
Enter fullscreen mode Exit fullscreen mode

And that's it! Run the publisher as follows

go run solace_publisher.go
Enter fullscreen mode Exit fullscreen mode

Note: You can use the Solace PubSub+ TryMe tab to connect to the broker and subscribe to any topic you want. Subscribe to topic conf42/solace/>

Bonus! You can run a subscriber application in another terminal, subscribe to conf42/solace/>, and observe the results. You can find more about this on the SolaceSample github org.

Challenge! Run a publisher and a subscriber in the same application! 🤯

Solace PubSub+ Messaging API for Go

This repo hosts sample code to showcase how the Solace PubSub+ Go API could be used. You can find:

  1. /patterns --> runnable code showcasing different message exchange patters with the PubSub+ Go API.
  2. /howtos --> code snippets showcasing how to use different features of the API. All howtos are named how_to_*.go with some sampler files under sub-folders.

Environment Setup

  1. Install the latest supported version of Go from https://go.dev/doc/install. Currently, the samples are run and tested against Go v1.17.
  2. Install the Solace PubSub+ Messaging API for Go into the root of this directory. This is done by either
    1. run go get solace.dev/go/messaging
    2. Downloading the API archive from the Solace Community
    3. Clone the source code into the root of this repo

Run Patterns

  1. [Skip if existing] Initialize the directory with Go modules go mod init SolaceSamples.com/PubSub+Go.
  2. [For local development] Modify the go.mod file…

Note on protocol interoperability

Using the Solace PubSub+ Event Broker, you can leverage the protocol translation and interoperability features. Even though the Go API was used to connect to the broker, you can use other Solace APIs and/or open standard protocols to connect to the broker and still have the microservices send and receive messages to each other.

Solace Protocol translation diagram

Why use Solace native APIs??

You might be wondering, "Why should I use Solace native APIs as opposed to an open-standard open-source messaging protocol like MQTT?" Glad you asked! Check out this blog that talks about the advanced features you can get access to through using Solace's API?

Use Solace Messaging APIs for Advanced Event Mesh Features and the Best Performance - Solace

Using Solace’s native messaging APIs gives you access to the full range of PubSub+ Event Broker features and the best performance.

favicon solace.com

This is all cool. How can I know more 👀

In summary, here are some quick links

If you want to see my colleagues and I during a live streaming session talking about EDA, the Solace PubSub+ Messaging API for Go, and coding (on LIVE television!), check out this event 👀

What about you?

I am curious to hear from the go community!

  • What is it about Golang that you enjoy?
  • Have you ever used Golang in an event-driven application?
  • What messaging protocols and/or open standards have you used?
  • What message brokers do you use?

Open to all sort of discussions, comments, and questions!

PeaceOut Gif

And P.S. You can check out the other talks from Conf42 at conf42.com/golang2022

Top comments (1)

Collapse
 
nitinpuranik profile image
Nitin Puranik

Hi Tamimi,

Thanks for the helpful article. I tried running your first MQTT example and ran into 'panic: bad user name or password'. Your github for that repo says - Note: Update the hostname, username, password, and vpn name to your broker's details.

For someone that's an absolute beginner and is just trying out some sample code, they wouldn't know what to do here. I don't know what my hostname, username, password and vpn name of my broker should be.

A simple example that completely works out of the box would be really helpful here without login and password issues getting in the way, otherwise all your hardwork in explaining the concept and sample source code might not be of any use to a beginner.

Thank you :)
Image description