In this blog, you will continue where you left off after Part 1. You will explore the RSocket communication models Fire-and-Forget, Request-Stream and Channel. For all of these models, you will create the server, client and a unit test.
1. Introduction
In Part 1, you learnt the basics of the RSocket communication protocol. It is advised to read Part 1 first before continuing with Part 2. Remember that RSocket provides 4 communication models:
- Request-Response (a stream of 1)
- Fire-and-Forget (no response)
- Request-Stream (a stream of many)
- Channel (bi-directional streams)
You covered Request-Response in Part 1, the others will be covered in Part 2.
The source code being used in this post is of course available at GitHub.
2. Fire-and-Forget Model
The Fire-and-Forget model is quite similar to the Request-Response model. The only difference is that you do not expect a response to your request.
2.1 The Server Side
In the RSocketServerController
you create a method fireAndForget
. Because the request does not return anything, the return type of the method is void
. Again, with the annotation @MessageMapping
you define the name of the route. Just as with the Request-Response example, the server receives a Notification
message. In order to see something happening when the message is received, you just log the Notification
message.
@MessageMapping("my-fire-and-forget")
public void fireAndForget(Notification notification) {
logger.info("Received notification: " + notification);
}
2.2 The Client Side
In the RSocketClientController
you create a method fireAndForget
. The implementation is identical to the Request-Response example except for the expected return type. Here you use retrieveMono(Void.class)
instead of retrieveMono(Notification.class)
.
@GetMapping("/fire-and-forget")
public Mono<Void> fireAndForget() {
Notification notification = new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model");
logger.info("Send notification for my-fire-and-forget: " + notification);
return rSocketRequester
.route("my-fire-and-forget")
.data(notification)
.retrieveMono(Void.class);
}
Start both the server and the client and invoke the URL:
$ http://localhost:8080/fire-and-forget
As you can see, no response is returned. In the logging of client and server, you can verify the sending and receiving messages.
Client:
Send notification for my-fire-and-forget: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}
Server:
Received notification: Notification{source='Client', destination='Server', text='Test the Fire-And-Forget interaction model'}
2.3 The Test Side
The test is again quite similar to the client code and the Request-Response example. In order to validate whether the Mono
does not emit any data, it is sufficient to call verifyComplete
. You do not need to call consumeWithNext
. If the Mono
does emit data, the test should fail. However, replacing the route my-fire-and-forget
into my-request-response
f.e., does not fail the test. It is unclear why this will not fail the test. If anyone has any suggestions or a solution, please add it into the comments of this blog.
@Test
void testFireAndForget() {
// Send a fire-and-forget message
Mono<Void> result = rSocketRequester
.route("my-fire-and-forget")
.data(new Notification(CLIENT, SERVER, "Test the Fire-And-Forget interaction model"))
.retrieveMono(Void.class);
// Assert that the result is a completed Mono.
StepVerifier
.create(result)
.verifyComplete();
}
3. Request-Stream Model
With the Request-Stream model, you send a request to the server and you will receive a Stream of Notification messages.
3.1 The Server Side
In the RSocketServerController
you create a method requestStream
. This time, the server will return a Flux
of Notification
messages. Again, with the annotation @MessageMapping
you define the name of the route. In this example, upon receipt of a Notification
message, a Flux
is returned which emits a new Notification
every 3 seconds.
@MessageMapping("my-request-stream")
Flux<Notification> requestStream(Notification notification) {
logger.info("Received notification for my-request-stream: " + notification);
return Flux
.interval(Duration.ofSeconds(3))
.map(i -> new Notification(notification.getDestination(), notification.getSource(), "In response to: " + notification.getText()));
}
3.2 The Client Side
In the RSocketClientController
you create a method requestStream
. The implementation is identical to the Request-Response example except for the expected return type. Here you use retrieveFlux(Notification.class)
instead of retrieveMono(Notification.class)
.
@GetMapping("/request-stream")
public ResponseEntity<Flux<Notification>> requestStream() {
Notification notification = new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model");
logger.info("Send notification for my-request-stream: " + notification);
Flux<Notification> notificationFlux = rSocketRequester
.route("my-request-stream")
.data(notification)
.retrieveFlux(Notification.class);
return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(notificationFlux);
}
Start both the server and the client and invoke the URL:
$ curl http://localhost:8080/request-stream
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
data:{"source":"Server","destination":"Client","text":"In response to: Test the Request-Stream interaction model"}
...
As you can see, the server emits every 3 seconds a Notification. In the logging of client and server, you can verify the sending and receiving messages.
Client:
Send notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
Server:
Received notification for my-request-stream: Notification{source='Client', destination='Server', text='Test the Request-Stream interaction model'}
3.3 The Test Side
The test is again similar to the client code. During verification, you verify the first message received, then verify whether 5 messages are received and finally verify the last message.
@Test
void testRequestStream() {
// Send a request message
Flux<Notification> result = rSocketRequester
.route("my-request-stream")
.data(new Notification(CLIENT, SERVER, "Test the Request-Stream interaction model"))
.retrieveFlux(Notification.class);
// Verify that the response messages contain the expected data
StepVerifier
.create(result)
.consumeNextWith(notification -> {
assertThat(notification.getSource()).isEqualTo(SERVER);
assertThat(notification.getDestination())
.isEqualTo(CLIENT);
assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
.expectNextCount(5)
.consumeNextWith(notification -> {
assertThat(notification.getSource()).isEqualTo(SERVER);
assertThat(notification.getDestination())
.isEqualTo(CLIENT);
assertThat(notification.getText()).isEqualTo("In response to: Test the Request-Stream interaction model");})
.thenCancel()
.verify();
}
4. Channel Model
The Channel model is a bit more complicated than the other models you have seen so far. Here you will send a Flux
and as a response a Flux
will be returned. This provides you the ability to send messages back and forth like within a chat conversation for example.
4.1 The Server Side
In the RSocketServerController
you create a method channel
. You will upon receipt of a Notification
increment a counter and every one second the result of the counter notificationCount
will be sent to the client. In order to be able to follow what is happening, you add logging at receiving the Notification
and when the result is returned.
@MessageMapping("my-channel")
public Flux<Long> channel(Flux<Notification> notifications) {
final AtomicLong notificationCount = new AtomicLong(0);
return notifications
.doOnNext(notification -> {
logger.info("Received notification for channel: " + notification);
notificationCount.incrementAndGet();
})
.switchMap(notification ->
Flux.interval(Duration.ofSeconds(1)).map(new Object() {
private Function<Long, Long> numberOfMessages(AtomicLong notificationCount) {
long count = notificationCount.get();
logger.info("Return flux with count: " + count);
return i -> count;
}
}.numberOfMessages(notificationCount))).log();
}
4.2 The Client Side
In the RSocketClientController
you create a method channel
. You need to create a Flux
. In order to accomplish this, you create 3 Mono
Notification
items, one with a delay of 0 seconds (notification0
), one with a delay of 2 seconds (notification2
) and one with a delay of 5 seconds (notification5
). You create the Flux
notifications
with a combination of the Mono
‘s you just created. Every time the Flux
emits a Notification, you log this in order to be able to follow what is happening. Finally, you send the Flux
to the RSocket channel and retrieve the response as a Flux
of Long
and return it to the caller.
@GetMapping("/channel")
public ResponseEntity<Flux<Long>> channel() {
Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));
Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2)
.doOnNext(d -> logger.info("Send notification for my-channel"));
Flux<Long> numberOfNotifications = this.rSocketRequester
.route("my-channel")
.data(notifications)
.retrieveFlux(Long.class);
return ResponseEntity.ok().contentType(MediaType.TEXT_EVENT_STREAM).body(numberOfNotifications);
}
Start both the server and the client and invoke the URL:
$ curl http://localhost:8080/channel
data:1
data:1
data:1
data:1
data:3
data:3
data:4
data:4
data:5
data:5
data:6
data:6
...
The result is as expected. First the notification0
is sent, after 5 seconds (notification5
) the following Notification
is sent together with a notification0, 2 seconds later a notification2
, 2 seconds later a new one, and finally 2 seconds later the last one. After the last result, the Flux
will keep on transmitting a count of 6. In the logging of client and server, you can verify the sending and receiving messages. This time including the timestamps, the complete logging contains even more information which is left out for brevity purposes. You should take a look at it more closely when running the examples yourself. Important to notice are the onNext
log statements which occur each second and correspond with the response the server is sending.
Client:
17:01:19.820 Send notification for my-channel
17:01:24.849 Send notification for my-channel
17:01:24.879 Send notification for my-channel
17:01:26.881 Send notification for my-channel
17:01:28.908 Send notification for my-channel
17:01:30.935 Send notification for my-channel
Server:
17:01:19.945 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:19.947 Return flux with count: 1
17:01:20.949 onNext(1)
17:01:21.947 onNext(1)
17:01:22.947 onNext(1)
17:01:23.947 onNext(1)
17:01:24.881 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.882 Return flux with count: 2
17:01:24.884 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:24.885 Return flux with count: 3
17:01:25.886 onNext(3)
17:01:26.886 onNext(3)
17:01:26.909 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:26.909 Return flux with count: 4
17:01:27.910 onNext(4)
17:01:28.910 onNext(4)
17:01:28.936 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:28.937 Return flux with count: 5
17:01:29.937 onNext(5)
17:01:30.937 onNext(5)
17:01:30.963 Received notification for channel: Notification{source='Client', destination='Server', text='Test the Channel interaction model'}
17:01:30.964 Return flux with count: 6
17:01:31.964 onNext(6)
17:01:32.964 onNext(6)
...
4.3 The Test Side
The test is similar as the client code. You send the messages to the channel and verify the resulting counts. The repeating elements in the test are left out for brevity, the complete test is available at GitHub.
@Test
void testChannel() {
Mono<Notification> notification0 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model"));
Mono<Notification> notification2 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(2));
Mono<Notification> notification5 = Mono.just(new Notification(CLIENT, SERVER, "Test the Channel interaction model")).delayElement(Duration.ofSeconds(5));
Flux<Notification> notifications = Flux.concat(notification0, notification5, notification0, notification2, notification2, notification2);
// Send a request message
Flux<Long> result = rSocketRequester
.route("my-channel")
.data(notifications)
.retrieveFlux(Long.class);
// Verify that the response messages contain the expected data
StepVerifier
.create(result)
.consumeNextWith(count -> {
assertThat(count).isEqualTo(1);
})
...
.consumeNextWith(count -> {
assertThat(count).isEqualTo(6);
})
.thenCancel()
.verify();
}
5. Conclusion
You have learnt how to create a server, client and a unit test for the RSocket communication models Fire-and-Forget, Request-Streams and Channel. By now, you will have the basic knowledge in order to do some exploring, experimentation yourself.
Top comments (4)
Thanks you for nice tutorial, but how can we close connection in Request-stream scenario ? between client and server ?
Thank you for the comment. It is a good question. I have added a close method now in the
RSocketClientController
:I hope this answers your question.
Thank you so much, the above code is working fine to close connection.
But if I try to invoke again, getting below error for "request-response"
java.util.concurrent.CancellationException: Disposed.
Is it possible to stop stream the data without closing the connection for request- stream scenario?
I do not know, I have spent some time searching for a solution, but did not find any. I have found that calling dispose is the way to stop the stream. But I could not find how to reestablish the connection. During my research for the blogs, I had difficult times to find good documentation for rsocket, certainly when trying to solve problems. If you are able to find the answer to your question, I would like to know it also ;-)