DEV Community

Salad Lam
Salad Lam

Posted on

Reactor Netty: UDP DNS client example

This example is to demonstrate

  • send and receive DNS UDP packets
  • add netty's built-in ChannelHandler
  • using Reactor Netty's interface to build send and receive action
  • terminate connection

Code of netty is here and using following library

  • netty 4.1.107.Final
  • Project Reactor 3.6.3
  • Reactor Netty 1.1.16

Code

package example;

import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.dns.*;
import io.netty.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;

import java.net.InetSocketAddress;

public class DnsUdpClient {

    private static final String SERVER_HOST = "8.8.8.8";
    private static final int SERVER_PORT = 53;
    private static final Logger LOGGER = LoggerFactory.getLogger(DnsUdpClient.class);

    // (1)
    private static void handleQueryResp(DatagramDnsResponse msg) {
        if (msg.count(DnsSection.QUESTION) > 0) {
            DnsQuestion question = msg.recordAt(DnsSection.QUESTION, 0);
            LOGGER.info("name: {}", question.name());
        }
        for (int i = 0, count = msg.count(DnsSection.ANSWER); i < count; i++) {
            DnsRecord r = msg.recordAt(DnsSection.ANSWER, i);
            if (r.type() == DnsRecordType.A) {
                //just print the IP after query
                DnsRawRecord raw = (DnsRawRecord) r;
                LOGGER.info("{}", NetUtil.bytesToIpAddress(ByteBufUtil.getBytes(raw.content())));
            }
        }
    }

    public static void main(String[] args) {
        UdpClient client = UdpClient.create()
                .host(SERVER_HOST).port(SERVER_PORT)    // (2)
                .wiretap(true)  // (3)
                /*
                // (4)
                .doOnChannelInit((observer, channel, remoteAddress) -> {
                    Connection c = Connection.from(channel);
                    c.addHandlerLast(new DatagramDnsQueryEncoder());
                    c.addHandlerLast(new DatagramDnsResponseDecoder());
                    LOGGER.info("pipeline={}", channel.pipeline());
                });
                */
                // (5)
                .doOnConnected(c -> {
                    c.addHandlerLast(new DatagramDnsQueryEncoder());
                    c.addHandlerLast(new DatagramDnsResponseDecoder());
                    LOGGER.info("pipeline={}", c.channel().pipeline());
                });

        Connection conn = client.connectNow();  // (6)

        // (7)
        conn.inbound().receiveObject()
                .doOnNext(obj -> {
                    DatagramDnsResponse r = (DatagramDnsResponse) obj;
                    LOGGER.info("response={}", obj);
                    handleQueryResp(r);
                })
                .doOnError(err -> LOGGER.error(String.valueOf(err)))
                .subscribe();

        // (8)
        conn.outbound()
                .sendObject(new DatagramDnsQuery(null, new InetSocketAddress(SERVER_HOST, SERVER_PORT), 0x42b7)
                        .setRecord(
                                DnsSection.QUESTION,
                                new DefaultDnsQuestion("www.google.com", DnsRecordType.A)
                        ))
                .then().subscribe();

        conn.outbound()
                .sendObject(new DatagramDnsQuery(null, new InetSocketAddress(SERVER_HOST, SERVER_PORT), 0x42b8)
                        .setRecord(
                                DnsSection.QUESTION,
                                new DefaultDnsQuestion("projectreactor.io", DnsRecordType.A)
                        ))
                .then().subscribe();

        // (9)
        conn.onReadIdle(5000, () -> {
            LOGGER.error("Request time out");
            conn.disposeNow();
        });

        // (10)
        conn.onDispose().block();
    }

}
Enter fullscreen mode Exit fullscreen mode

Explanation

(1): for decoding DNS reply

(2): destination IP and port number is necessary when create netty's io.netty.channel.Channel

(3): a debug ChannelHandler which prints all events related to Channel in DEBUG level will be added into the start of Channel's pipeline

(4): doOnChannelInit() accepts reactor.netty.ChannelPipelineConfigurer, for configuring the channel pipeline while initializing the channel.

This ChannelPipelineConfigurer is called by reactor.netty.transport.TransportConfig.TransportChannelInitializer#initChannel

@Override
protected void initChannel(Channel channel) {
    ChannelPipeline pipeline = channel.pipeline();

    if (config.metricsRecorder != null) {
        //...
    }

    if (config.loggingHandler != null) {
        pipeline.addFirst(NettyPipeline.LoggingHandler, config.loggingHandler);
    }

    ChannelOperations.addReactiveBridge(channel, config.channelOperationsProvider(), connectionObserver);

    config.defaultOnChannelInit()
          .then(config.doOnChannelInit)
          .onChannelInit(connectionObserver, channel, remoteAddress);   // <- HERE

    pipeline.remove(this);

    if (log.isDebugEnabled()) {
        log.debug(format(channel, "Initialized pipeline {}"), pipeline.toString());
    }
}
Enter fullscreen mode Exit fullscreen mode

To obtain reactor.netty.Connection instance from Netty's Channel instance, the following static method can be used.

Connection c = Connection.from(channel);
Enter fullscreen mode Exit fullscreen mode

At the beginning of function executing, pipeline contains

pipeline=DefaultChannelPipeline{(reactor.left.loggingHandler = reactor.netty.transport.logging.ReactorNettyLoggingHandler), (TransportConfig$TransportChannelInitializer#0 = reactor.netty.transport.TransportConfig$TransportChannelInitializer), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}

Enter fullscreen mode Exit fullscreen mode

At the end of function executing, pipeline contains

pipeline=DefaultChannelPipeline{(reactor.left.loggingHandler = reactor.netty.transport.logging.ReactorNettyLoggingHandler), (TransportConfig$TransportChannelInitializer#0 = reactor.netty.transport.TransportConfig$TransportChannelInitializer), (DatagramDnsQueryEncoder = io.netty.handler.codec.dns.DatagramDnsQueryEncoder), (DatagramDnsResponseDecoder = io.netty.handler.codec.dns.DatagramDnsResponseDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
Enter fullscreen mode Exit fullscreen mode

reactor.netty.channel.ChannelOperationsHandler is the bridge connect netty and Reactor Netty library. It must at the end of pipeline.

DatagramDnsQueryEncoder and DatagramDnsResponseDecoder are netty's build-in ChannelHandler, which response for encoding/decoding raw ByteBuf from/to DNS instances.

(5): doOnConnected() is another way for configuring the channel pipeline. It do the job exactly the same as (4), but it run on Channel's channelConnected event.

(6): block, perform actual establish connection (but there is no need for establish connection of UDP traffic), and return when connection is setup.

(7): define the action on packet recevied. inbound().receiveObject() returns Flux. Remember to append subscribe() at the end.

(8): there is 2 lines, one for one request sent. then() will returns a Mono which will have complete signal when packet is sent. Also it must be subscribed.

(9): terminate connection if do not receive packet in 5 seconds

(10): get a Mono which will have complete signal when connection is shutdown successfully. block() is blocking the main thread. Main thread must end at last.

Top comments (0)