DEV Community

Oleg Agafonov for SIP3

Posted on • Updated on

Scaling Vert.x application for session dependent data processing.

I'm working on VoIP monitoring and troubleshooting platform called SIP3.

Under the hood SIP3 is an ETL engine powered by incredible Vert.x framework.

Simple ETL pipeline consists of input data, processor and output data as it's shown on the picture below:

Simple ETL

Let's assume that our DataProcessingVerticle looks like this:

class DataProcessingVerticle : AbstractVerticle() {

    override fun start() {
        vertx.eventBus().consumer<JsonObject>("DataProcessingVerticle") {
            event -> handle(event.body())
        }
    }

    private fun handle(inputData: JsonObject) {
        val outputData = doTransformation(inputData)
        vertx.eventBus().send("ConsoleOutputVerticle", outputData)
    }
}
Enter fullscreen mode Exit fullscreen mode

Given that doTransformation() is a very fast method we may be happy handling our data using only one server's core.

But what if we have tons of data and one core is not enough?

Our input data is a steam of events. Let's assume that every event is independent from each other. In this case doTransformation() is a function of event itself and scaling will look like this:

Simple Scaling

We just need to deploy multiple instances of DataProcessingVerticle accordingly to amount of our event-loop threads:

    val deploymentOptions = deploymentOptionsOf(
            instances = threads
    )
    vertx.deployVerticle(DataProcessingVerticle::class.java, deploymentOptions)
Enter fullscreen mode Exit fullscreen mode

As you can see it's as easy as a simple ETL :)

But what if some of our events are dependent from each other?

In case of SIP3 we have SIP messages as an input data and SIP sessions - as an output. Each SIP session consists of a bunch of SIP messages with the same Call-ID. So, how to scale it?

The solution is a little more complicated. But as everything written in Vert.x is simple enough:

Advanced Scaling

We will be balancing our SIP messages using LoadBalancerVerticle to a dedicated instance of DataProcessingVerticle:

class LoadBalancerVerticle : AbstractVerticle() {

    var threads: Int = 1

    override fun start() {
        config().getInteger("instances")?.let {
            threads = it
        }

        vertx.eventBus().consumer<JsonObject>("LoadBalancerVerticle") {
            event -> handle(event.body())
        }
    }

    private fun handle(inputData: JsonObject) {
        val id = inputData.getString("call_id")
        val index = abs(id.hashCode() % threads)
        vertx.eventBus().send("DataProcessingVerticle_$index", inputData)
    }
}
Enter fullscreen mode Exit fullscreen mode

In this case each DataProcessingVerticle has to be started with the unique address. We can guarantee this by using local counter from Vert.x shared data:

class DataProcessingVerticle : AbstractVerticle() {

    override fun start() {
        GlobalScope.launch(vertx.dispatcher()) {
            val index = vertx.sharedData().getLocalCounterAwait(DataProcessingVerticle)
            val address = "DataProcessingVerticle_${index.getAndIncrementAwait()}"
            vertx.eventBus().consumer<JsonObject>(address) {
                event -> handle(event.body())
            }
        }
    }

    private fun handle(inputData: JsonObject) {
        val outputData = doTransformation(inputData)
        vertx.eventBus().send("ConsoleOutputVerticle", outputData)
    }
}
Enter fullscreen mode Exit fullscreen mode

That's all I wanted to show you today. Check out our project to learn more of advanced techniques we use to work with Vert.x framework. If you have any questions, just leave me a message :)

Cheers...

Top comments (0)