DEV Community

Waldemar Quevedo
Waldemar Quevedo

Posted on

Using Custom Dialers in the NATS Go Client

The Go client from NATS by default has support for reconnection logic, so that whenever it disconnects from a server it will retry for a number of times before giving up and closing the connection.

nc, err := nats.Connect("demo.nats.io", nats.MaxReconnects(10))
// ...
Enter fullscreen mode Exit fullscreen mode

Besides limiting the number of maximum number of reconnect attempts,
the client has a number of knobs to customize how it should reconnect,
such as backing off in between connection attempts, or whether to
disable reconnecting altogether without buffering
enabled for example.

Furthermore, it also supports event callbacks to be able to trace
whenever the client disconnects, reconnects or gives up and closes the
connection
to the server.

nc, err := nats.Connect("demo.nats.io",
        nats.ReconnectWait(2 * time.Second),
        nats.ReconnectHandler(func(c *nats.Conn) {
            log.Println("Reconnected to", c.ConnectedUrl())
        }),
        nats.DisconnectHandler(func(c *nats.Conn) {
            log.Println("Disconnected from NATS")
        }),
        nats.ClosedHandler(func(c *nats.Conn) {
            log.Println("Closed connection to NATS")
        }),
)
// ...
Enter fullscreen mode Exit fullscreen mode

On the other hand, there aren't as many knobs for the initial
connection attempt done by the client. Also, if the initial connect to
a NATS server fails, then nats.Connect will just return an error and
abort.

    nc, err = nats.Connect("127.0.0.1:4223")
    if err != nil {
        // nats: no servers available for connection
        log.Fatal(err)
    }
Enter fullscreen mode Exit fullscreen mode

Instead of exposing a number of options to manage how to Dial to NATS
as part of the client, it is possible to define a CustomDialer that
can be used to replace the default dialer in order customize how to
dial without having to do further internal changes to the library.

type customDialer struct {
}

func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
    // ...
    return conn, nil
}

func main() {
    cd := &customDialer{}
    opts := []nats.Option{
        nats.SetCustomDialer(cd),
        // ...
    }
    nc, err = nats.Connect("demo.nats.io", opts...)
    // ...
}
Enter fullscreen mode Exit fullscreen mode

This usage of Go interfaces makes things very flexible as now the
client could add custom logic as part of the dialer implemention and
everything will work transparently.

For example, let's say that you want to make the client use the
context package to use DialContext and be able to cancel
connecting to NATS altogether with a deadline, one could then
define a Dialer implementation as follows:

package main

import (
    "context"
    "log"
    "net"
    "time"

    "github.com/nats-io/go-nats"
)

type customDialer struct {
    ctx             context.Context
    nc              *nats.Conn
    connectTimeout  time.Duration
    connectTimeWait time.Duration
}

func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
    ctx, cancel := context.WithTimeout(cd.ctx, cd.connectTimeout)
    defer cancel()

    for {
        log.Println("Attempting to connect to", address)
        if ctx.Err() != nil {
            return nil, ctx.Err()
        }

        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
            d := &net.Dialer{}
            if conn, err := d.DialContext(ctx, network, address); err == nil {
                log.Println("Connected to NATS successfully")
                return conn, nil
            } else {
                time.Sleep(cd.connectTimeWait)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

With the dialer implementation above, the NATS client will retry a
number of times to connect to the NATS server until the context is no
longer valid:

func main() {
    // Parent context cancels connecting/reconnecting altogether.
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    var err error
    var nc *nats.Conn
    cd := &customDialer{
        ctx:             ctx,
        nc:              nc,
        connectTimeout:  5 * time.Second,
        connectTimeWait: 1 * time.Second,
    }
    opts := []nats.Option{
        nats.SetCustomDialer(cd),
        nats.ReconnectWait(2 * time.Second),
        nats.ReconnectHandler(func(c *nats.Conn) {
            log.Println("Reconnected to", c.ConnectedUrl())
        }),
        nats.DisconnectHandler(func(c *nats.Conn) {
            log.Println("Disconnected from NATS")
        }),
        nats.ClosedHandler(func(c *nats.Conn) {
            log.Println("NATS connection is closed.")
        }),
        nats.MaxReconnects(2),
    }
    go func() {
        nc, err = nats.Connect("127.0.0.1:4222", opts...)
    }()

    c := make(chan os.Signal, 1)
    signal.Notify(c, syscall.SIGINT)
    go func() {
        for {
            select {
            case sig := <-c:
                switch sig {
                case syscall.SIGINT:
                    log.Println("Shutting down...")
                    cancel()
                    return
                }
            }
        }
    }()

WaitForEstablishedConnection:
    for {
        if err != nil {
            log.Fatal(err)
        }
        select {
        case <-ctx.Done():
            break WaitForEstablishedConnection
        default:
        }
        if nc == nil || !nc.IsConnected() {
            log.Println("Connection not ready yet...")
            time.Sleep(500 * time.Millisecond)
            continue
        }
        break WaitForEstablishedConnection
    }
    if ctx.Err() != nil {
        log.Fatal(ctx.Err())
    }

    // Run until receiving signal
    for range time.NewTicker(1 * time.Second).C {
        select {
        case <-ctx.Done():
            // Disconnect and flush pending messages
            if err := nc.Drain(); err != nil {
                log.Println(err)
            }
            log.Println("Closed connection to NATS")
            return
        default:
        }

        if nc.IsClosed() {
            break
        }
        if err := nc.Publish("hello", []byte("world")); err != nil {
            log.Println(err)
            continue
        }
        log.Println("Published message")
    }
}
Enter fullscreen mode Exit fullscreen mode

Using DialContext as in the example above, also opens the door to be
able to hook to some of the tracing features part of the standard
library, for example to check when a connection attempt will happen or
TCP connection is established.

In the example below, the httptrace.ClientTrace package is being used
in order to activate some of the built-in tracing instrumentation that
is part of the standard library via context and DialContext.

type customDialer struct {
    ctx             context.Context
    nc              *nats.Conn
    connectTimeout  time.Duration
    connectTimeWait time.Duration
}

func (cd *customDialer) Dial(network, address string) (net.Conn, error) {
    d := &net.Dialer{}
    trace := &httptrace.ClientTrace{
        ConnectStart: func(network, addr string) {
            log.Println("Connecting to server at", addr)
        },
        ConnectDone: func(network, addr string, err error) {
            if err == nil {
                log.Println("Established TCP connection")
            }
        },
    }
    ctx := httptrace.WithClientTrace(cd.ctx, trace)
    ctx, cancel := context.WithTimeout(ctx, cd.connectTimeout)
    defer cancel()
    for {
        if ctx.Err() != nil {
            return nil, ctx.Err()
        }

        select {
        case <-ctx.Done():
            return nil, ctx.Err()
        default:
            if conn, err := d.DialContext(ctx, network, address); err == nil {
                log.Println("Connected to NATS successfully")
                return conn, nil
            } else {
                time.Sleep(cd.connectTimeWait)
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Result:

go run custom-connect.go
2018/09/05 09:02:17 Connection not ready yet...
2018/09/05 09:02:17 Attempting to connect to server at 127.0.0.1:4222
2018/09/05 09:02:17 Established TCP connection
2018/09/05 09:02:17 Connected to NATS successfully
2018/09/05 09:02:19 Published message
2018/09/05 09:02:20 Published message
2018/09/05 09:02:21 Published message
2018/09/05 09:02:22 Published message
2018/09/05 09:02:23 Published message
^C2018/09/05 09:02:23 Shutting down...
Enter fullscreen mode Exit fullscreen mode

You can find the full example here. Interested in learning more? Join the NATS slack channel, or you can also find more examples of advanced NATS usage on my book Practical NATS.

Top comments (0)