DEV Community

Salad Lam
Salad Lam

Posted on

Reactor Netty: HTTP server example

This example is to demonstrate

  • build server which serve HTTP request
  • read/write HTTP header information
  • share ByteBuf instance between HTTP connection

Following library is used

  • netty 4.1.107.Final
  • Project Reactor 3.6.3
  • Reactor Netty 1.1.16

Code

package example;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Scanner;

public class HttpServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(HttpServer.class);

    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);

        // (1)
        ByteBuf buf1 = Unpooled.buffer(8);
        buf1.writeCharSequence("Hello ", Charset.defaultCharset());
        ByteBuf buf2 = Unpooled.buffer(8);
        buf2.writeCharSequence("World!", Charset.defaultCharset());

        DisposableServer server =
                HttpServer.create()
                        .host("localhost").port(8080)
                        .wiretap(true)
                        .doOnChannelInit((observer, channel, remoteAddress) -> {
                            // (2)
                            LOGGER.info("pipeline={}", channel.pipeline());
                        })
                        .route(routes ->
                                routes.get("/", (request, response) -> {    // (3)
                                    // (4)
                                    LOGGER.info("Header: host={}", request.requestHeaders().get("Host"));

                                    // (5)
                                    response.addHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN);
                                    // (6)
                                    return response.send(
                                            Flux.defer( // (7)
                                                    () -> Flux.just(buf1.duplicate(), buf2.duplicate()) // (8)
                                                            .delaySequence(Duration.ofSeconds(1))   // (9)
                                                            .doOnNext(ByteBuf::retain)  // (10)
                                            )
                                    );
                                })
                        )
                        .bindNow();

        // (11)
        server.onDispose().subscribe(
                n -> {
                },
                e -> {
                },
                () -> { // (12)
                    buf1.release();
                    buf2.release();
                    LOGGER.info("Shutdown successfully");
                }
        );

        // (13)
        LOGGER.info("*** Double enter to shutdown ***");
        scanner.nextLine();
        server.dispose();
    }

}
Enter fullscreen mode Exit fullscreen mode

Explanation

(1): create io.netty.buffer.ByteBuf instances and write contents into it.

Data should be written into ByteBuf before reading. And can read once only.

There are reference counting facilities in ByteBuf (with io.netty.util.ReferenceCounted interface). When an instance is created, refCnt() is 1. retain(int) is for setting times this instance is referenced. After using this, calling release() to decrease the refCnt. When refCnt becomes 0, the system knows that this instance is not used anymore and it will be released.

(2): doOnChannelInit() accepts reactor.netty.ChannelPipelineConfigurer, for configuring the channel pipeline while initializing the channel.

The pipeline contains

pipeline=DefaultChannelPipeline{(reactor.left.loggingHandler = reactor.netty.transport.logging.ReactorNettyLoggingHandler), (TransportConfig$TransportChannelInitializer#0 = reactor.netty.transport.TransportConfig$TransportChannelInitializer), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
Enter fullscreen mode Exit fullscreen mode

reactor.netty.transport.TransportConfig$TransportChannelInitializer is the child class of io.netty.channel.ChannelInitializer. It is for setting up pipelines in the "registered" phase of the Channel and will be removed by itself after setup.

io.netty.handler.codec.http.HttpServerCodec and reactor.netty.http.server.HttpTrafficHandler instances is added and as usual, reactor.netty.channel.ChannelOperationsHandler must at the end of the pipeline.

(3): reactor.netty.http.server.HttpServerRoutes#get() set up a handler when HTTP GET requests on a defined path. request is reactor.netty.http.server.HttpServerRequest and response is reactor.netty.http.server.HttpServerResponse. Both of them are reactor.netty.http.server.HttpServerOperations instances.

The return instance is Publisher>, and only the termination signal of it means to close Channel connection.

The actual function processing HTTP GET requests is reactor.netty.http.server.HttpServer.HttpServerHandle#onStateChange.

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void onStateChange(Connection connection, State newState) {
    if (newState == HttpServerState.REQUEST_RECEIVED) {
        try {
            if (log.isDebugEnabled()) {
                log.debug(format(connection.channel(), "Handler is being applied: {}"), handler);
            }
            HttpServerOperations ops = (HttpServerOperations) connection;
            Publisher<Void> publisher = handler.apply(ops, ops);    // <- HERE handler is called
            Mono<Void> mono = Mono.deferContextual(ctx -> {
                ops.currentContext = Context.of(ctx);
                return Mono.fromDirect(publisher);
            });
            if (ops.mapHandle != null) {
                mono = ops.mapHandle.apply(mono, connection);
            }
            mono.subscribe(ops.disposeSubscriber());    // <- subscribe the termination signal
        }
        catch (Throwable t) {
            log.error(format(connection.channel(), ""), t);
            //"FutureReturnValueIgnored" this is deliberate
            connection.channel()
                      .close();
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

(4): reading HTTP request header.

(5): setting HTTP response header.

(6): define response's content publisher.

(7): to instruct the following Flux.just() should instantiate objects in subscription phase.

(8): to send two ByteBuf out. Please note that duplicate() is called on each ByteBuf. A wrapper class io.netty.buffer.DuplicatedByteBuf and it copies readerIndex and writerIndex so that the readerIndex of original ByteBuf will not be changed when read operation is on this "copied" ByteBuf.

If not include this statement, no content in HTTP response after serving several requests.

(9): add 1 second delay between sending two ByteBuf

(10): to increase the refCnt of ByteBuf by 1. This will be decreased when content of ByteBuf is sent out in the lower layer. If not include this statement, the following exception is thrown after serving several requests.

15:48:01.665 [reactor-http-nio-6] ERROR r.n.http.server.HttpServerOperations -- [335257bc-1, L:/127.0.0.1:8080 - R:/127.0.0.1:61531] Error finishing response. Closing connection
io.netty.util.IllegalReferenceCountException: refCnt: 0
Enter fullscreen mode Exit fullscreen mode

(11): to define handler after server shutdown.

(12): to release two ByteBuf.

(13): main thread will be blocked on scanner.nextLine(), and return after pressing 2 times of Enter on console.

Top comments (0)