DEV Community

Hantsy Bai
Hantsy Bai

Posted on

Building a Chat application with Angular and Spring Reactive WebSocket: Part 2

In this post, we will use MongoDB capped collection instead of the Reactors Sinks.StandaloneFluxSink .

If you have some experience of Linux or Unix, you must know the tail command. When you want to track the last n lines dynamically of some specific files, esp. the server logs while it is growing at runtime, e.g. the access.log file of Apache HTTPd server.

tail -f -n 1000 access.log
Enter fullscreen mode Exit fullscreen mode

Similarly, Mongo provides a tailable cursor feature which acts as the tail command exactly. But there are some limitations, the tailable cursor only works on the capped collections, when a document is inserted, it will be emitted to the subscribers immediately. A capped collection is different from a normal collection, it has a size limit of the whole collection and a maximum count limitation of the stored documents, if the limit is reached, the old documents will be discarded. A document can be inserted into the capped collection like the the normal one, but it can not be deleted by calling delete command.

Let's do a small refactory on the former codes we've completed in the last post and migrate it to use MongoDB tailable cursors based query as an infinite stream.

Firstly add Spring Data MongoDB Reactive as part of dependencies.

implementation("org.springframework.boot:spring-boot-starter-data-mongodb-reactive")
Enter fullscreen mode Exit fullscreen mode

Add a @Document annotation on the Message data class.

@Document(collection = "messages")
data class Message @JsonCreator constructor(
        @JsonProperty("id") @Id var id: String? = null,
        @JsonProperty("body") var body: String,
        @JsonProperty("sentAt") var sentAt: Instant = Instant.now()
)
Enter fullscreen mode Exit fullscreen mode

We specify a collection name here.

Add a Repository for the Message document.

interface MessageRepository : ReactiveMongoRepository<Message, String> {
    @Tailable
    fun getMessagesBy(): Flux<Message>
}
Enter fullscreen mode Exit fullscreen mode

On the getMessagesBy method, we add a @Tailable annotation, which means it will use tailable cursor to retrieve the Message documents from the messages collection.

Modify the ChatSocketHandler to the following.

class ChatSocketHandler(val mapper: ObjectMapper, val messages: MessageRepository) : WebSocketHandler {

    override fun handle(session: WebSocketSession): Mono<Void> {
        println("handling WebSocketSession...")
        session.receive()
                .map { it.payloadAsText }
                .map { Message(body = it, sentAt = Instant.now()) }
                .flatMap { this.messages.save(it) }
                .subscribe({ println(it) }, { println(it) })

        return session.send(
                Mono.delay(Duration.ofMillis(1000))
                        .thenMany(this.messages.getMessagesBy())
                        .map { session.textMessage(toJson(it)) }
        ).then()

    }

    fun toJson(message: Message): String = mapper.writeValueAsString(message)

}
Enter fullscreen mode Exit fullscreen mode

When receiving a message from a WebSocket client, save it into the messages collection, then send the messages in messages collection back to the client.

By default, Spring Data MongoDB does not create a capped collection for the Message document, more details please check the Infinite stream of tailable cursors section of Spring Data Mongo reference.

If the collection is existed, use a convertToCapped to convert it to capped.

@Bean
fun runner(template: ReactiveMongoTemplate) = CommandLineRunner {
    println("running CommandLineRunner...")
    template.insert(Message(body="test")).then().block()
    template.executeCommand("{\"convertToCapped\": \"messages\", size: 100000}")
    .subscribe(::println);
}
Enter fullscreen mode Exit fullscreen mode

Else create a capped collection directly if it is not existed.

template.createCollection("messages", CollectionOptions.empty().capped().size(100000).maxDocuments(1000))
Enter fullscreen mode Exit fullscreen mode

No need change on the client codes.

Make sure there is a running MongoDB server, simply run it in a Docker container.

docker-compose up mongodb
Enter fullscreen mode Exit fullscreen mode

Next let's run the client and server applications respectively.

Open a browser and try to send a message, you can see the changes in the messages collections.

Execute a query in the mongo shell.

> db.messages.find()
{ "_id" : ObjectId("5f130da86bba28157893a9bc"), "body" : "test", "sentAt" : ISODate("2020-07-18T14:56:40.499Z"), "_class" : "com.example.demo.Message" }
{ "_id" : ObjectId("5f130dcd6bba28157893a9bd"), "body" : "test again", "sentAt" : ISODate("2020-07-18T14:57:17.858Z"), "_class" : "com.example.demo.Message" }
Enter fullscreen mode Exit fullscreen mode

Open two browsers, try to send some messages.

run2.png

It works exactly as the former version.

The complete codes for this post can be found check here, clone it and follow the steps described in the README.md and play it yourself..

Top comments (0)