DEV Community

Hantsy Bai
Hantsy Bai

Posted on

Building a Chat application with Angular and Spring RSocket

In this post, we will use RSocket protocol to reimplement the chat application.

If you have missed the former posts about implementing the chat application, there is a checklist.

RSocket is a binary protocol for use on byte stream transports, such as TCP, WebSocket, RCP etc.

RSocket embraces ReactiveStreams semantics, and Spring provides excellent RSocket support through the existing messaging infrastructure. I have introduced RSocket in my former posts, check here.

In this post, we will use WebSocket as transport protocol which is good for web application. RSocket defines 4 interaction modes, we will use fire-and-forget to send a message to the server side, and request/streams to retrieve messages as an infinite stream from the server.

Firstly let's create the server application. Generate a project skeleton using Spring Initializr.

  • Project type: Gradle
  • Language: Kotlin
  • Spring Boot version :2.4.0M1
  • Project Metadata/Java: 14
  • Dependencies: Reactive Web, RSocket

Hit the Generate button to download the generated archive, and extract it into your local disk.

Make sure you have installed the latest JDK 14 (AdoptOpenJDK is highly recommended), then import the source codes in your IDEs. eg. Intellij IDEA, and start to implement the server side.

We also skip the discussion of Reactor's Sink implementation here.

Create a Message document definition and a Repository for it.

interface MessageRepository : ReactiveMongoRepository<Message, String> {
    @Tailable
    fun getMessagesBy(): Flux<Message>
}

@Document(collection = "messages")
data class Message(@Id var id: String? = null, var body: String, var sentAt: Instant = Instant.now())
Enter fullscreen mode Exit fullscreen mode

Create a @Controller to handle messages.

@Controller
class MessageController(private val messages: MessageRepository) {
    @MessageMapping("send")
    fun hello(p: String) = this.messages.save(Message(body = p, sentAt = Instant.now())).log().then()

    @MessageMapping("messages")
    fun messageStream(): Flux<Message> = this.messages.getMessagesBy().log()
}
Enter fullscreen mode Exit fullscreen mode

The send route accepts a String based message payload and return a Mono<Void>, which will handle messages of the fire-and-forget mode from clients. The messages route accepts a null payload and return a Flux<Message>, which will act as the handler of request-stream mode.

If you are new to the Spring RSocket , you may be confused how @Controller and MessageMapping are mapped to the interaction modes which the original RSocket message handler used. Spring hides the complexity of the RSocket protocol itself , and reuse the existing messaging infrastructure to handle RSocket messages. Remember, compare the incoming payload and outgoing message type with 4 interaction mode definitions in the official RSocket website, you can determine which interaction mode it is mapped to.

Configure RSocket to use websocket transport in the application.properties file.

# a mapping path is defined
spring.rsocket.server.mapping-path=/rsocket
# websocket is chosen as a transport
spring.rsocket.server.transport=websocket
Enter fullscreen mode Exit fullscreen mode

Start a MongoDB service as follows.

docker-compose up mongodb
Enter fullscreen mode Exit fullscreen mode

As described in the former posts, you have to prepare a capped messages collection, check this post for more details.

Run the following command to start the server side application.

./gradlew bootRun
Enter fullscreen mode Exit fullscreen mode

I have written a small integration test to verify if it works.

@SpringBootTest
class RSocketServerApplicationTests {

    @Autowired
    lateinit var rSocketRequester: RSocketRequester;

    @Test
    fun contextLoads() {

        val verifier= rSocketRequester.route("messages")
                .retrieveFlux(Message::class.java)
                .log()
                .`as` { StepVerifier.create(it) }
                .consumeNextWith { it -> assertThat(it.body).isEqualTo("test message") }
                .consumeNextWith { it -> assertThat(it.body).isEqualTo("test message2") }
                .thenCancel()
                .verifyLater()
        rSocketRequester.route("send").data("test message").send().then().block()
        rSocketRequester.route("send").data("test message2").send().then().block()

        verifier.verify(Duration.ofSeconds(5))
    }

    @TestConfiguration
    class TestConfig {

        @Bean
        fun rSocketRequester(builder: RSocketRequester.Builder) = builder.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
                .connectWebSocket(URI.create("ws://localhost:8080/rsocket")).block()
    }

}
Enter fullscreen mode Exit fullscreen mode

In the above codes, use a test specific @TestConfiguration to define a RSocketRequester bean, which is a helper to communicate with the server side.

Let's move to the frontend application.

Create a new Angular project, and add two dependencies: roscket-core, rsocket-websocket-client.

npm install roscket-core rsocket-websocket-client
Enter fullscreen mode Exit fullscreen mode

Fill the following codes in the app.component.ts file. I've spent some time on making this work with my backend, the article RSocket With Spring Boot + JS: Zero to Hero from Domenico Sibilio is very helpful. The rsocket-js project also includes excellent examples.

export class AppComponent implements OnInit, OnDestroy {

  title = 'client';
  message = '';
  messages: any[];
  client: RSocketClient;
  sub = new Subject();

  ngOnInit(): void {
    this.messages = [];

    // Create an instance of a client
    this.client = new RSocketClient({
      serializers: {
        data: JsonSerializer,
        metadata: IdentitySerializer
      },
      setup: {
        // ms btw sending keepalive to server
        keepAlive: 60000,
        // ms timeout if no keepalive response
        lifetime: 180000,
        // format of `data`
        dataMimeType: 'application/json',
        // format of `metadata`
        metadataMimeType: 'message/x.rsocket.routing.v0',
      },
      transport: new RSocketWebSocketClient({
        url: 'ws://localhost:8080/rsocket'
      }),
    });

    // Open the connection
    this.client.connect().subscribe({
      onComplete: (socket: RSocket) => {

        // socket provides the rsocket interactions fire/forget, request/response,
        // request/stream, etc as well as methods to close the socket.
        socket
          .requestStream({
            data: null, // null is a must if it does not include a message payload, else the Spring server side will not be matched.
            metadata: String.fromCharCode('messages'.length) + 'messages'
          })
          .subscribe({
            onComplete: () => console.log('complete'),
            onError: error => {
              console.log("Connection has been closed due to:: " + error);
            },
            onNext: payload => {
              console.log(payload);
              this.addMessage(payload.data);
            },
            onSubscribe: subscription => {
              subscription.request(1000000);
            },
          });

        this.sub.subscribe({
          next: (data) => {
            socket.fireAndForget({
              data: data,
              metadata: String.fromCharCode('send'.length) + 'send',
            });
          }
        })
      },
      onError: error => {
        console.log("Connection has been refused due to:: " + error);
      },
      onSubscribe: cancel => {
        /* call cancel() to abort */
      }
    });
  }

  addMessage(newMessage: any) {
    console.log("add message:" + JSON.stringify(newMessage))
    this.messages = [...this.messages, newMessage];
  }

  ngOnDestroy(): void {
    this.sub.unsubscribe();
    if (this.client) {
      this.client.close();
    }
  }

  sendMessage() {
    console.log("sending message:" + this.message);
    this.sub.next(this.message);
    this.message = '';
  }
}

Enter fullscreen mode Exit fullscreen mode

Reuse the template file we've used in the former posts.

<div fxFlex>
    <p *ngFor="let m of messages">
        {{m|json}}
    </p>
</div>
<div>
    <form fxLayout="row baseline" #messageForm="ngForm" (ngSubmit)="sendMessage()">
        <mat-form-field fxFlex>
            <input name="message" fxFill matInput #messageCtrl="ngModel" [(ngModel)]="message" required />
            <mat-error fxLayoutAlign="start" *ngIf="messageCtrl.hasError('required')">
                Message body can not be empty.
            </mat-error>
        </mat-form-field>
        <div>
            <button mat-button mat-icon-button type="submit" [disabled]="messageForm.invalid || messageForm.pending">
                <mat-icon>send</mat-icon>
            </button>
        </div>
    </form>
</div>
Enter fullscreen mode Exit fullscreen mode

Next run the client application.

npm run start
Enter fullscreen mode Exit fullscreen mode

Open two browser windows(or two different browsers), type some messages in each window and experience it.

run

I found a weird issue may be caused by the JSON Serializer encode/decode from the roscket-js project, I described it in rsocket-js issues #93, if you have some idea to overcome this, please comment on this issue.

Get the complete codes from my github.

Top comments (0)