DEV Community

loading...

Using Custom Dialers in the NATS Go Client

wallyqs profile image Waldemar Quevedo ・5 min read

The Go client from NATS by default has support for reconnection logic, so that whenever it disconnects from a server it will retry for a number of times before giving up and closing the connection.

nc, err := nats.Connect("demo.nats.io", nats.MaxReconnects(10))
// ...

Besides limiting the number of maximum number of reconnect attempts,
the client has a number of knobs to customize how it should reconnect,
such as backing off in between connection attempts, or whether to
disable reconnecting altogether without buffering
enabled for example.

Furthermore, it also supports event callbacks to be able to trace
whenever the client disconnects, reconnects or gives up and closes the
connection
to the server.

nc, err := nats.Connect("demo.nats.io",
        nats.ReconnectWait(2 * time.Second),
        nats.ReconnectHandler(func(c *nats.Conn) {
            log.Println("Reconnected to", c.ConnectedUrl())
        }),
        nats.DisconnectHandler(func(c *nats.Conn) {
            log.Println("Disconnected from NATS")
        }),
        nats.ClosedHandler(func(c *nats.Conn) {
            log.Println("Closed connection to NATS")
        }),
)
// ...

On the other hand, there aren't as many knobs for the initial
connection attempt done by the client. Also, if the initial connect to
a NATS server fails, then nats.Connect will just return an error and
abort.

    nc, err = nats.Connect("127.0.0.1:4223")
    if err != nil {
        // nats: no servers available for connection
        log.Fatal(err)
    }

Instead of exposing a number of options to manage how to Dial to NATS
as part of the client, it is possible to define a CustomDialer that
can be used to replace the default dialer in order customize how to
dial without having to do further internal changes to the library.

type customDialer struct {
}

func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
    // ...
    return conn, nil
}

func main() {
    cd := &customDialer{}
    opts := []nats.Option{
        nats.SetCustomDialer(cd),
        // ...
    }
    nc, err = nats.Connect("demo.nats.io", opts...)
    // ...
}

This usage of Go interfaces makes things very flexible as now the
client could add custom logic as part of the dialer implemention and
everything will work transparently.

For example, let's say that you want to make the client use the
context package to use DialContext and be able to cancel
connecting to NATS altogether with a deadline, one could then
define a Dialer implementation as follows:

package main

import (
    "context"
    "log"
    "net"
    "time"

    "github.com/nats-io/go-nats"
)

type customDialer struct {
    ctx             context.Context
    nc              *nats.Conn
    connectTimeout  time.Duration
    connectTimeWait time.Duration
}

func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
    ctx, cancel := context.WithTimeout(cd.ctx, cd.connectTimeout)
    defer cancel()

    for {
        log.Println("Attempting to connect to", address)
        if ctx.Err() != nil {
            return nil, ctx.Err()
        }

        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
            d := &net.Dialer{}
            if conn, err := d.DialContext(ctx, network, address); err == nil {
                log.Println("Connected to NATS successfully")
                return conn, nil
            } else {
                time.Sleep(cd.connectTimeWait)
            }
        }
    }
}

With the dialer implementation above, the NATS client will retry a
number of times to connect to the NATS server until the context is no
longer valid:

func main() {
    // Parent context cancels connecting/reconnecting altogether.
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    var err error
    var nc *nats.Conn
    cd := &customDialer{
        ctx:             ctx,
        nc:              nc,
        connectTimeout:  5 * time.Second,
        connectTimeWait: 1 * time.Second,
    }
    opts := []nats.Option{
        nats.SetCustomDialer(cd),
        nats.ReconnectWait(2 * time.Second),
        nats.ReconnectHandler(func(c *nats.Conn) {
            log.Println("Reconnected to", c.ConnectedUrl())
        }),
        nats.DisconnectHandler(func(c *nats.Conn) {
            log.Println("Disconnected from NATS")
        }),
        nats.ClosedHandler(func(c *nats.Conn) {
            log.Println("NATS connection is closed.")
        }),
        nats.MaxReconnects(2),
    }
    go func() {
        nc, err = nats.Connect("127.0.0.1:4222", opts...)
    }()

    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGINT)
    go func() {
        for {
            select {
            case sig := <-c:
                switch sig {
                case syscall.SIGINT:
                    log.Println("Shutting down...")
                    cancel()
                    return
                }
            }
        }
    }()

WaitForEstablishedConnection:
    for {
        if err != nil {
            log.Fatal(err)
        }
        select {
        case <-ctx.Done():
            break WaitForEstablishedConnection
        default:
        }
        if nc == nil || !nc.IsConnected() {
            log.Println("Connection not ready yet...")
            time.Sleep(500 * time.Millisecond)
            continue
        }
        break WaitForEstablishedConnection
    }
    if ctx.Err() != nil {
        log.Fatal(ctx.Err())
    }

    // Run until receiving signal
    for range time.NewTicker(1 * time.Second).C {
        select {
        case <-ctx.Done():
            // Disconnect and flush pending messages
            if err := nc.Drain(); err != nil {
                log.Println(err)
            }
            log.Println("Closed connection to NATS")
            return
        default:
        }

        if nc.IsClosed() {
            break
        }
        if err := nc.Publish("hello", []byte("world")); err != nil {
            log.Println(err)
            continue
        }
        log.Println("Published message")
    }
}

Using DialContext as in the example above, also opens the door to be
able to hook to some of the tracing features part of the standard
library, for example to check when a connection attempt will happen or
TCP connection is established.

In the example below, the httptrace.ClientTrace package is being used
in order to activate some of the built-in tracing instrumentation that
is part of the standard library via context and DialContext.

type customDialer struct {
    ctx             context.Context
    nc              *nats.Conn
    connectTimeout  time.Duration
    connectTimeWait time.Duration
}

func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
    d := &net.Dialer{}
    trace := &httptrace.ClientTrace{
        ConnectStart: func(network, addr string) {
            log.Println("Connecting to server at", addr)
        },
        ConnectDone: func(network, addr string, err error) {
            if err == nil {
                log.Println("Established TCP connection")
            }
        },
    }
    ctx := httptrace.WithClientTrace(cd.ctx, trace)
    ctx, cancel := context.WithTimeout(ctx, cd.connectTimeout)
    defer cancel()
    for {
        if ctx.Err() != nil {
            return nil, ctx.Err()
        }

        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
            if conn, err := d.DialContext(ctx, network, address); err == nil {
                log.Println("Connected to NATS successfully")
                return conn, nil
            } else {
                time.Sleep(cd.connectTimeWait)
            }
        }
    }
}

Result:

go run custom-connect.go
2018/09/05 09:02:17 Connection not ready yet...
2018/09/05 09:02:17 Attempting to connect to server at 127.0.0.1:4222
2018/09/05 09:02:17 Established TCP connection
2018/09/05 09:02:17 Connected to NATS successfully
2018/09/05 09:02:19 Published message
2018/09/05 09:02:20 Published message
2018/09/05 09:02:21 Published message
2018/09/05 09:02:22 Published message
2018/09/05 09:02:23 Published message
^C2018/09/05 09:02:23 Shutting down...

You can find the full example here. Interested in learning more? Join the NATS slack channel, or you can also find more examples of advanced NATS usage on my book Practical NATS.

Discussion

pic
Editor guide