DEV Community

loading...
Cover image for SE This Week: Build a Network App Using Netty and ProtoBuf

SE This Week: Build a Network App Using Netty and ProtoBuf

jiayanguo profile image jguo ・6 min read

What's Netty and ProtoBuf

  • Netty -- an asynchronous event-driven network application framework. The main purpose of Netty is to build high-performance protocol servers based on NIO with separation and loose coupling of the network and business logic components. It might implement a widely known protocol, such as HTTP, or your own specific protocol.

  • ProtoBuf -- Protocol buffers are a flexible, efficient, automated mechanism for serializing structured data – think XML, but smaller, faster, and simpler. You define how you want your data to be structured once, then you can use special generated source code to easily write and read your structured data to and from a variety of data streams and using a variety of languages. You can even update your data structure without breaking deployed programs that are compiled against the "old" format.

ProtoBuf

First, let's define our Protobuf. Protobuf is a schema for communication between client and server. I defined a demo schema that allows sending plain text messages or files to the server.

    syntax = "proto2";
    package org.demo.nettyprotobuf.proto;


    option optimize_for = SPEED;


    enum Type {
      MSG = 0;
      FILE = 1;
    }


    message DemoRequest {
      required Type type = 1;
      optional string requestMsg = 2;
      optional FileMsg file = 3;
    }


    message DemoResponse {
      optional uint32 code = 1;
      optional string responseMsg = 2;
    }


    message FileMsg{
      optional bytes fileBytes = 1;
      optional string filename = 2;
    }  
Enter fullscreen mode Exit fullscreen mode

Server

  • First, we need to create a handler to handle requests from a client. If it is a text message print it out. If it is a file, save it to /tmp folder. And then respond back to the client.
public class DemoProtocolServerHandler extends SimpleChannelInboundHandler<DemoMessages.DemoRequest> {
      private static final String FILE_DIR = "/tmp/";


      @Override
      protected void channelRead0(ChannelHandlerContext ctx, DemoMessages.DemoRequest msg) {
        if (msg.getType() == DemoMessages.Type.MSG) {
          DemoMessages.DemoResponse.Builder builder = DemoMessages.DemoResponse.newBuilder();
          String message = "Accepted from Server, returning response";
          System.out.println(message);
          builder.setResponseMsg(message)
                  .setCode(0);
          ctx.write(builder.build());
        } else if (msg.getType() == DemoMessages.Type.FILE) {


          byte[] bFile = msg.getFile().toByteArray();
          FileOutputStream fileOuputStream = null;
          try {
            fileOuputStream = new FileOutputStream(FILE_DIR + msg.getFile().getFilename());
            fileOuputStream.write(bFile);
          } catch (Exception e) {
            System.out.println(e);
          }finally {
            try {
              if (fileOuputStream != null) {
                fileOuputStream.close();
              }
            } catch (IOException e) {
              System.out.println(e);
            }
          }
          DemoMessages.DemoResponse.Builder builder = DemoMessages.DemoResponse.newBuilder();
          String message = "File saved to: " + FILE_DIR;
          System.out.println(message);
          builder.setResponseMsg(message)
                  .setCode(0);
          ctx.write(builder.build());
        } else {
          DemoMessages.DemoResponse.Builder builder = DemoMessages.DemoResponse.newBuilder();
          String message = "Unsupported message type " + msg.getType();
          System.out.println(message);
          builder.setResponseMsg(message)
                  .setCode(1);
          ctx.write(builder.build());
        }
      }

      @Override
      public void channelReadComplete(ChannelHandlerContext ctx) {
          ctx.flush();
      }


      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
          cause.printStackTrace();
          ctx.close();
      }


    }
Enter fullscreen mode Exit fullscreen mode
  • Let's create a channel to handle incoming requests. When receiving a request, we need to decode it, encode and request handle. And them the channel pipeline, note the order matters here. (Question: why this order? Please take a look my in code comments).
public class DemoServerChannelInitializer extends ChannelInitializer<SocketChannel> {


      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        // Why this order, decoder, encoder, handler?
        // Note: Every IO operation on a Channel in Netty is non-blocking.
        // This means that every operation is returned immediately after the call. 
        // When recieved message uses decoder, and then response needs encoder,
        // then handling the requests uses handler.
        ChannelPipeline p = ch.pipeline();
        p.addLast(new ProtobufVarint32FrameDecoder());
        p.addLast(new ProtobufDecoder(DemoMessages.DemoRequest.getDefaultInstance()));


        p.addLast(new ProtobufVarint32LengthFieldPrepender());
        p.addLast(new ProtobufEncoder());


        p.addLast(new DemoProtocolServerHandler());
      }


    }
Enter fullscreen mode Exit fullscreen mode
  • Create the server. Let's bind the server to port 8080 by default.
public class DemoServer {

      static final int PORT;

      static {
        if (System.getenv("port") == null){
          PORT = 8080;
        } else {
          PORT = Integer.parseInt(System.getenv("port"));
        }
      }


      public static void main(String[] args) throws InterruptedException {

        // Create event loop groups. One for incoming connections handling and 
        // second for handling actual event by workers
        EventLoopGroup serverGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
          ServerBootstrap bootStrap = new ServerBootstrap();
          bootStrap.group(serverGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new DemoServerChannelInitializer());
          // Bind to port 
          bootStrap.bind(PORT).sync().channel().closeFuture().sync();
        } finally {
          serverGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
        }
      }
    }
Enter fullscreen mode Exit fullscreen mode

Test Client

Now, we need to create a test client to test communication between server and client.

  • Let's create a client handler handler first. The handler could send either a file or message to the server, which depends on DemoMessages.Type.
public class DemoMsgClientHandler extends SimpleChannelInboundHandler<DemoMessages.DemoResponse> {


      private Channel channel;
      private DemoMessages.DemoResponse resp;
      private final BlockingQueue<DemoMessages.DemoResponse> resps = new LinkedBlockingQueue<DemoMessages.DemoResponse>();
      public DemoMessages.DemoResponse sendRequest(DemoMessages.Type type) {


          DemoMessages.DemoRequest req = null;
          // send File request
          if (DemoMessages.Type.FILE == type) {
              InputStream inputStream = null;
              try {
                  inputStream = getClass().getResourceAsStream("/components.png");


                  DemoMessages.FileMsg fileMsg = DemoMessages.FileMsg.newBuilder()
                          .setFileBytes(ByteString.readFrom(inputStream))
                          .setFilename("components.png")
                          .build();
                  req = DemoMessages.DemoRequest.newBuilder()
                          .setType(DemoMessages.Type.FILE)
                          .setFile(fileMsg)
                          .build();
                  // Send request
                  channel.writeAndFlush(req);
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  try {
                      if (inputStream != null) {
                          inputStream.close();
                      }
                  } catch (IOException e) {
                      e.printStackTrace();
                  }
              }
          } else {
              // send message request.
               req = DemoMessages.DemoRequest.newBuilder()
                      .setType(DemoMessages.Type.MSG)
                      .setRequestMsg("From Client").build();
          }
        // Send request
        channel.writeAndFlush(req);

        // Now wait for response from server
        boolean interrupted = false;
        for (;;) {
            try {
                resp = resps.take();
                break;
            } catch (InterruptedException ignore) {
                interrupted = true;
            }
        }


        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        return resp;
      }


      @Override
      public void channelRegistered(ChannelHandlerContext ctx) {
          channel = ctx.channel();
      }

      @Override
      protected void channelRead0(ChannelHandlerContext ctx, DemoMessages.DemoResponse msg)
          throws Exception {
        resps.add(msg);
      }

      @Override
      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
          cause.printStackTrace();
          ctx.close();
      }
    }
Enter fullscreen mode Exit fullscreen mode
  • Let's create a channel to handle sending requests.
public class DemoClientInitializer  extends ChannelInitializer<SocketChannel> {


      @Override
      protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline p = ch.pipeline();

        p.addLast(new ProtobufVarint32FrameDecoder());
        p.addLast(new ProtobufDecoder(DemoMessages.DemoResponse.getDefaultInstance()));


        p.addLast(new ProtobufVarint32LengthFieldPrepender());
        p.addLast(new ProtobufEncoder());


        p.addLast(new DemoMsgClientHandler());
      }

    }
Enter fullscreen mode Exit fullscreen mode
  • Now, let's create a [client] to send requests. This client will send 10 text messages and 1 file to the server.
public class DemoClient {

      static final String HOST;
      static final int PORT;


      static {
        if (System.getenv("port") == null){
          PORT = 8080;
        } else {
          PORT = Integer.parseInt(System.getenv("port"));
        }


        if (System.getenv("host") == null){
          HOST = "127.0.0.1";
        } else {
          HOST =  System.getenv("host");
        }
      }


      public static void main(String[] args) throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();


        try {
          Bootstrap bootstrap = new Bootstrap();
          bootstrap.group(group)
                   .channel(NioSocketChannel.class)
                   .handler(new DemoClientInitializer());

          // Create connection 
          Channel c = bootstrap.connect(HOST, PORT).sync().channel();
          DemoMsgClientHandler handle = c.pipeline().get(DemoMsgClientHandler.class);


          int i = 0;
          while (i++ < 10) {
            DemoMessages.DemoResponse resp = handle.sendRequest(DemoMessages.Type.MSG);
            System.out.println("Got response msg from Server: " + resp.getResponseMsg());
            Thread.sleep(1000);
          }


          DemoMessages.DemoResponse resp = handle.sendRequest(DemoMessages.Type.FILE);
          System.out.println("Got response msg from Server: " + resp.getResponseMsg());


          c.close();




        } finally {
          group.shutdownGracefully();
        }

      }
    }
Enter fullscreen mode Exit fullscreen mode

Test

Now, it's time to start the server and client.

I uploaded the source code to GitHub. There are instructions on how to test it. Please take a look at the readme. There are two ways to test it. You can use java command to start the server and client. Or, there is an easy way, I created a docker-compose. You can start them by one command.

How to build
`mvn clean install`

How to run.

First Option (use docker compose):

start:

`docker-compose up`

build docker image:

`docker-compose build`

stop:

`docker-compose down`

Second Option (Manually):

Start server:

   `java -jar server/target/server-1.0-SNAPSHOT.jar`

Enter fullscreen mode Exit fullscreen mode

What's more

Netty is really a powerful framework. You can use it to build your own Http Webserver. You might wonder why you want to build your own Http Server. A general-purpose Http Server sometimes does not scale very well or not work very well with your cases. For example, you might want to implement an HTTP server that is optimized for AJAX-based chat application, media streaming, or large file transfer. There are a lot of examples in the Netty source code. Check it out here.

Reference:

https://github.com/jiayanguo/nettyprotobufapp
https://netty.io/
https://netty.io/wiki/user-guide-for-4.x.html
https://www.baeldung.com/netty
https://dzone.com/articles/build-a-simple-netty-application-with-and-without
https://github.com/lohitvijayarenu/netty-protobuf

Discussion (0)

pic
Editor guide