DEV Community

Cover image for Getting Started With RSocket Part 2
mydeveloperplanet
mydeveloperplanet

Posted on • Originally published at mydeveloperplanet.com

Getting Started With RSocket Part 2

In this blog, you will continue where you left off after Part 1. You will explore the RSocket communication models Fire-and-Forget, Request-Stream and Channel. For all of these models, you will create the server, client and a unit test.

1. Introduction

In Part 1, you learnt the basics of the RSocket communication protocol. It is advised to read Part 1 first before continuing with Part 2. Remember that RSocket provides 4 communication models:

  • Request-Response (a stream of 1)
  • Fire-and-Forget (no response)
  • Request-Stream (a stream of many)
  • Channel (bi-directional streams)

You covered Request-Response in Part 1, the others will be covered in Part 2.

The source code being used in this post is of course available at GitHub.

2. Fire-and-Forget Model

The Fire-and-Forget model is quite similar to the Request-Response model. The only difference is that you do not expect a response to your request.

2.1 The Server Side

In the RSocketServerController you create a method fireAndForget. Because the request does not return anything, the return type of the method is void. Again, with the annotation @MessageMapping you define the name of the route. Just as with the Request-Response example, the server receives a Notification message. In order to see something happening when the message is received, you just log the Notification message.

@MessageMapping("my-fire-and-forget")
public void fireAndForget(Notification notification) {
    logger.info("Received notification: " + notification);
}
Enter fullscreen mode Exit fullscreen mode

2.2 The Client Side

In the RSocketClientController you create a method fireAndForget. The implementation is identical to the Request-Response example except for the expected return type. Here you use retrieveMono(Void.class) instead of retrieveMono(Notification.class).

@GetMapping("/fire-and-forget")
public Mono<Void> fireAndForget() {
    Notification notification = new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model");
    logger.info("Send notification for my-fire-and-forget: " + notification);
    return rSocketRequester
            .route("my-fire-and-forget")
            .data(notification)
            .retrieveMono(Void.class);
}
Enter fullscreen mode Exit fullscreen mode

Start both the server and the client and invoke the URL:

$ http://localhost:8080/fire-and-forget
Enter fullscreen mode Exit fullscreen mode

As you can see, no response is returned. In the logging of client and server, you can verify the sending and receiving messages.

Client:

Send notification for my-fire-and-forget: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}
Enter fullscreen mode Exit fullscreen mode

Server:

Received notification: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}
Enter fullscreen mode Exit fullscreen mode

2.3 The Test Side

The test is again quite similar to the client code and the Request-Response example. In order to validate whether the Mono does not emit any data, it is sufficient to call verifyComplete. You do not need to call consumeWithNext. If the Mono does emit data, the test should fail. However, replacing the route my-fire-and-forget into my-request-response f.e., does not fail the test. It is unclear why this will not fail the test. If anyone has any suggestions or a solution, please add it into the comments of this blog.

@Test
void testFireAndForget() {
    // Send a fire-and-forget message
    Mono<Void> result = rSocketRequester
            .route("my-fire-and-forget")
            .data(new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model"))
            .retrieveMono(Void.class);

    // Assert that the result is a completed Mono.
    StepVerifier
            .create(result)
            .verifyComplete();
}
Enter fullscreen mode Exit fullscreen mode

3. Request-Stream Model

With the Request-Stream model, you send a request to the server and you will receive a Stream of Notification messages.

3.1 The Server Side

In the RSocketServerController you create a method requestStream. This time, the server will return a Flux of Notification messages. Again, with the annotation @MessageMapping you define the name of the route. In this example, upon receipt of a Notification message, a Flux is returned which emits a new Notification every 3 seconds.

@MessageMapping("my-request-stream")
Flux<Notification> requestStream(Notification notification) {
    logger.info("Received notification for my-request-stream: " + notification);
    return Flux
            .interval(Duration.ofSeconds(3))
            .map(i -> new Notification(notification.getDestination(), notification.getSource(), "In response to: " + notification.getText()));
}
Enter fullscreen mode Exit fullscreen mode

3.2 The Client Side

In the RSocketClientController you create a method requestStream. The implementation is identical to the Request-Response example except for the expected return type. Here you use retrieveFlux(Notification.class) instead of retrieveMono(Notification.class).

@GetMapping("/request-stream")
public ResponseEntity<Flux<Notification>> requestStream() {
    Notification notification = new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model");
    logger.info("Send notification for my-request-stream: " + notification);
    Flux<Notification> notificationFlux = rSocketRequester
            .route("my-request-stream")
            .data(notification)
            .retrieveFlux(Notification.class);
    return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(notificationFlux);
}
Enter fullscreen mode Exit fullscreen mode

Start both the server and the client and invoke the URL:

$ curl http://localhost:8080/request-stream
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}

data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}

data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}

data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
...
Enter fullscreen mode Exit fullscreen mode

As you can see, the server emits every 3 seconds a Notification. In the logging of client and server, you can verify the sending and receiving messages.

Client:

Send notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
Enter fullscreen mode Exit fullscreen mode

Server:

Received notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
Enter fullscreen mode Exit fullscreen mode

3.3 The Test Side

The test is again similar to the client code. During verification, you verify the first message received, then verify whether 5 messages are received and finally verify the last message.

@Test
void testRequestStream() {
    // Send a request message
    Flux<Notification> result = rSocketRequester
            .route("my-request-stream")
            .data(new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model"))
            .retrieveFlux(Notification.class);

    // Verify that the response messages contain the expected data
    StepVerifier
      .create(result)
      .consumeNextWith(notification -> {
         assertThat(notification.getSource()).isEqualTo(SERVER);
         assertThat(notification.getDestination())
                                             .isEqualTo(CLIENT);
         assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
      .expectNextCount(5)
      .consumeNextWith(notification -> {
         assertThat(notification.getSource()).isEqualTo(SERVER);
         assertThat(notification.getDestination())
                                             .isEqualTo(CLIENT);
         assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
      .thenCancel()
      .verify();
}
Enter fullscreen mode Exit fullscreen mode

4. Channel Model

The Channel model is a bit more complicated than the other models you have seen so far. Here you will send a Flux and as a response a Flux will be returned. This provides you the ability to send messages back and forth like within a chat conversation for example.

4.1 The Server Side

In the RSocketServerController you create a method channel. You will upon receipt of a Notification increment a counter and every one second the result of the counter notificationCount will be sent to the client. In order to be able to follow what is happening, you add logging at receiving the Notification and when the result is returned.

@MessageMapping("my-channel")
public Flux<Long> channel(Flux<Notification> notifications) {
    final AtomicLong notificationCount = new AtomicLong(0);
    return notifications
        .doOnNext(notification -> {
            logger.info("Received notification for channel: " + notification);
            notificationCount.incrementAndGet();
         })
        .switchMap(notification -> 
Flux.interval(Duration.ofSeconds(1)).map(new Object() {
                private Function<Long, Long> numberOfMessages(AtomicLong notificationCount) {
                    long count = notificationCount.get();
                    logger.info("Return flux with count: " + count);
                    return i -> count;
                }
         }.numberOfMessages(notificationCount))).log();
}
Enter fullscreen mode Exit fullscreen mode

4.2 The Client Side

In the RSocketClientController you create a method channel. You need to create a Flux. In order to accomplish this, you create 3 Mono Notification items, one with a delay of 0 seconds (notification0), one with a delay of 2 seconds (notification2) and one with a delay of 5 seconds (notification5). You create the Flux notifications with a combination of the Mono‘s you just created. Every time the Flux emits a Notification, you log this in order to be able to follow what is happening. Finally, you send the Flux to the RSocket channel and retrieve the response as a Flux of Long and return it to the caller.

@GetMapping("/channel")
public ResponseEntity<Flux<Long>> channel() {
    Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
    Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
    Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));

    Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2)
            .doOnNext(d -> logger.info("Send notification for my-channel"));

    Flux<Long> numberOfNotifications = this.rSocketRequester
            .route("my-channel")
            .data(notifications)
            .retrieveFlux(Long.class);

    return  ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(numberOfNotifications);
}
Enter fullscreen mode Exit fullscreen mode

Start both the server and the client and invoke the URL:

$ curl http://localhost:8080/channel
data:1
data:1
data:1
data:1
data:3
data:3
data:4
data:4
data:5
data:5
data:6
data:6
...
Enter fullscreen mode Exit fullscreen mode

The result is as expected. First the notification0 is sent, after 5 seconds (notification5) the following Notification is sent together with a notification0, 2 seconds later a notification2, 2 seconds later a new one, and finally 2 seconds later the last one. After the last result, the Flux will keep on transmitting a count of 6. In the logging of client and server, you can verify the sending and receiving messages. This time including the timestamps, the complete logging contains even more information which is left out for brevity purposes. You should take a look at it more closely when running the examples yourself. Important to notice are the onNext log statements which occur each second and correspond with the response the server is sending.

Client:

17:01:19.820 Send notification for my-channel
17:01:24.849 Send notification for my-channel
17:01:24.879 Send notification for my-channel
17:01:26.881 Send notification for my-channel
17:01:28.908 Send notification for my-channel
17:01:30.935 Send notification for my-channel
Enter fullscreen mode Exit fullscreen mode

Server:

17:01:19.945 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:19.947 Return flux with count: 1
17:01:20.949 onNext(1)
17:01:21.947 onNext(1)
17:01:22.947 onNext(1)
17:01:23.947 onNext(1)
17:01:24.881 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.882 Return flux with count: 2
17:01:24.884 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.885 Return flux with count: 3
17:01:25.886 onNext(3)
17:01:26.886 onNext(3)
17:01:26.909 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:26.909 Return flux with count: 4
17:01:27.910 onNext(4)
17:01:28.910 onNext(4)
17:01:28.936 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:28.937 Return flux with count: 5
17:01:29.937 onNext(5)
17:01:30.937 onNext(5)
17:01:30.963 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:30.964 Return flux with count: 6
17:01:31.964 onNext(6)
17:01:32.964 onNext(6)
...
Enter fullscreen mode Exit fullscreen mode

4.3 The Test Side

The test is similar as the client code. You send the messages to the channel and verify the resulting counts. The repeating elements in the test are left out for brevity, the complete test is available at GitHub.

@Test
void testChannel() {
    Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
    Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
    Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));

    Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2);
    // Send a request message
    Flux<Long> result = rSocketRequester
            .route("my-channel")
            .data(notifications)
            .retrieveFlux(Long.class);

    // Verify that the response messages contain the expected data
    StepVerifier
            .create(result)
            .consumeNextWith(count -> {
                assertThat(count).isEqualTo(1);
                })
...
            .consumeNextWith(count -> {
                assertThat(count).isEqualTo(6);
                })
            .thenCancel()
            .verify();
}
Enter fullscreen mode Exit fullscreen mode

5. Conclusion

You have learnt how to create a server, client and a unit test for the RSocket communication models Fire-and-Forget, Request-Streams and Channel. By now, you will have the basic knowledge in order to do some exploring, experimentation yourself.

Top comments (4)

Collapse
 
ramureddymca profile image
ramureddymca

Thanks you for nice tutorial, but how can we close connection in Request-stream scenario ? between client and server ?

Collapse
 
mydeveloperplanet profile image
mydeveloperplanet

Thank you for the comment. It is a good question. I have added a close method now in the RSocketClientController:

@GetMapping("/close")
public void close() {
    rSocketRequester.rsocketClient().dispose();
}
Enter fullscreen mode Exit fullscreen mode

I hope this answers your question.

Collapse
 
ramureddymca profile image
ramureddymca

Thank you so much, the above code is working fine to close connection.

But if I try to invoke again, getting below error for "request-response"

java.util.concurrent.CancellationException: Disposed.

Is it possible to stop stream the data without closing the connection for request- stream scenario?

Thread Thread
 
mydeveloperplanet profile image
mydeveloperplanet

I do not know, I have spent some time searching for a solution, but did not find any. I have found that calling dispose is the way to stop the stream. But I could not find how to reestablish the connection. During my research for the blogs, I had difficult times to find good documentation for rsocket, certainly when trying to solve problems. If you are able to find the answer to your question, I would like to know it also ;-)