In a distributed system, maintaining a stable connection between components is crucial for ensuring reliable communication. This article explores strategies for reconnecting to RabbitMQ, ensuring that your applications can recover from network disruptions or broker restarts without losing valuable data.
Setting Up
Importing Dependencies
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"log"
"os"
"os/signal"
"syscall"
"time"
)
We use amqp091-go
as a connection for RabbitMQ
Preparing option
type Config struct {
username string
password string
host string
port int
}
type Container struct {
connection *amqp.Connection
}
type Channel struct {
name string
kind string
durable bool
autoDelete bool
internal bool
noWait bool
args amqp.Table
}
-
Config
holds connection configuration. -
Container
holds the connection to RabbitMQ. In each application, they should have a wrapper for holding the connection. -
Channel
holds channel configuration.
Connection Creation
// dial
// create connection and register channel
func dial(config Config, channels []Channel) (conn *amqp.Connection, err error) {
// create connection
conn, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d", config.username, config.password, config.host, config.port))
if err != nil {
return
}
log.Print("rabbitmq connect success")
// register channel
channel, err := conn.Channel()
for _, c := range channels {
err = channel.ExchangeDeclare(c.name, c.kind, c.durable, c.autoDelete, c.internal, c.noWait, c.args)
if err != nil {
return
}
}
log.Print("rabbitmq channels register success")
return
}
We create a Go function dial
to assist in establishing a connection to RabbitMQ. It takes in a Config
struct containing connection details like username, password, host, and port, and a slice of Channel
structs representing different channels to be registered with the server.
Connection Status
func check(container *Container) {
for {
time.Sleep(time.Duration(1) * time.Second)
log.Printf("rabbitmq connected: %v", !container.connection.IsClosed())
}
}
We create a Go function check
to check and print connection status periodically, to have visibility of the connection status.
Reconnection Code
Below is the full code containing reconnection code. We will walk through step by step to understand how reconnection works in Go with RabbitMQ.
func reconnect(container *Container, config Config, channels []Channel) {
go func() {
for {
reason, ok := <-container.connection.NotifyClose(make(chan *amqp.Error))
if !ok {
log.Print("rabbitmq connection closed")
break
}
log.Printf("rabbitmq connection closed unexpectedly, reason: %v", reason)
for {
time.Sleep(time.Duration(1) * time.Second)
connection, err := dial(config, channels)
if err == nil {
container.connection = connection
log.Print("rabbitmq reconnect success")
break
}
log.Printf("rabbitmq reconnect failed, err: %v", err)
}
}
}()
}
This function is a reconnect loop for a RabbitMQ client, and since reconnection creates a brand-new connection, the reconnect
function is required to have all the values like the initial connection.
- It starts a goroutine to prevent blocking that listens for the closure of the connection.
- When the connection is closed, it enters a loop to attempt reconnection.
- It uses a delay of 1 second (
time.Sleep(time.Duration(1) * time.Second)
) between reconnection attempts. - It attempts to reconnect by calling the
dial
function with the provided configuration and channels. - If the reconnection is successful, it updates the
container.connection
with the new connection. - If the reconnection fails, it logs the error and continues the loop.
Stitching Together
func main() {
config := Config{
username: "guest",
password: "guest",
host: "localhost",
port: 5672,
}
channels := []Channel{
{
name: "reconnection-exchange",
kind: "direct",
durable: true,
autoDelete: false,
internal: false,
noWait: false,
args: nil,
},
}
container := &Container{}
connection, err := dial(config, channels)
if err != nil {
log.Fatal(err)
return
}
container.connection = connection
go check(container)
defer func() {
if err := container.connection.Close(); err != nil {
log.Print(err)
}
log.Print("rabbitmq connection closed")
}()
reconnect(container, config, channels)
{
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-quit
}
}
Now we combine all code together in main
function.
Testing
Running
Before running the code, make sure RabbitMQ is running and accessible. Upon running the code, you should see a message like the one below.
rabbitmq connect success
rabbitmq channels register success
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
...
This message indicates that the RabbitMQ connection is connected normally.
Disconnection & Down
After running and connected normally, you can emulate disconnection & server down by manually shutting down the RabbitMQ server. After the server stops, you should see a message like the one below.
rabbitmq connection closed unexpectedly, reason: Exception (320) Reason: "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
rabbitmq connected: false
rabbitmq connected: false
rabbitmq connected: false
rabbitmq connected: false
rabbitmq reconnect failed, err: dial tcp [::1]:5672: connectex: No connection could be made because the target machine actively refused it.
rabbitmq connected: false
rabbitmq connected: false
rabbitmq connected: false
rabbitmq reconnect failed, err: dial tcp [::1]:5672: connectex: No connection could be made because the target machine actively refused it.
rabbitmq connected: false
...
This message indicates disconnection from the RabbitMQ connection, and you should set up an alert for this event.
Reconnection
After disconnection, you can emulate reconnection by running the RabbitMQ server again. After the server is up and ready, and the reconnection mechanism is successful, you should see a message like the one below.
rabbitmq connect success
rabbitmq channels register success
rabbitmq reconnect success
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
rabbitmq connected: true
Conclusion
In conclusion, the article demonstrates a robust strategy for handling RabbitMQ reconnections in Go, ensuring reliable communication in distributed systems. By implementing a reconnection loop and monitoring the connection status, the application can gracefully recover from network disruptions or broker restarts. This approach enhances the resilience of the system, ensuring that important data is not lost due to temporary connection issues. Using the amqp091-go library, developers can easily implement these reconnection mechanisms, improving the overall reliability of their RabbitMQ-based applications.
Top comments (0)