I'm working on VoIP monitoring and troubleshooting platform called SIP3.
Under the hood SIP3 is an ETL engine powered by incredible Vert.x framework.
Simple ETL pipeline consists of input data, processor and output data as it's shown on the picture below:
Let's assume that our DataProcessingVerticle
looks like this:
class DataProcessingVerticle : AbstractVerticle() {
override fun start() {
vertx.eventBus().consumer<JsonObject>("DataProcessingVerticle") {
event -> handle(event.body())
}
}
private fun handle(inputData: JsonObject) {
val outputData = doTransformation(inputData)
vertx.eventBus().send("ConsoleOutputVerticle", outputData)
}
}
Given that doTransformation()
is a very fast method we may be happy handling our data using only one server's core.
But what if we have tons of data and one core is not enough?
Our input data is a steam of events. Let's assume that every event is independent from each other. In this case doTransformation()
is a function of event itself and scaling will look like this:
We just need to deploy multiple instances of DataProcessingVerticle
accordingly to amount of our event-loop threads:
val deploymentOptions = deploymentOptionsOf(
instances = threads
)
vertx.deployVerticle(DataProcessingVerticle::class.java, deploymentOptions)
As you can see it's as easy as a simple ETL :)
But what if some of our events are dependent from each other?
In case of SIP3 we have SIP messages as an input data and SIP sessions - as an output. Each SIP session consists of a bunch of SIP messages with the same Call-ID
. So, how to scale it?
The solution is a little more complicated. But as everything written in Vert.x is simple enough:
We will be balancing our SIP messages using LoadBalancerVerticle
to a dedicated instance of DataProcessingVerticle
:
class LoadBalancerVerticle : AbstractVerticle() {
var threads: Int = 1
override fun start() {
config().getInteger("instances")?.let {
threads = it
}
vertx.eventBus().consumer<JsonObject>("LoadBalancerVerticle") {
event -> handle(event.body())
}
}
private fun handle(inputData: JsonObject) {
val id = inputData.getString("call_id")
val index = abs(id.hashCode() % threads)
vertx.eventBus().send("DataProcessingVerticle_$index", inputData)
}
}
In this case each DataProcessingVerticle
has to be started with the unique address. We can guarantee this by using local counter from Vert.x shared data:
class DataProcessingVerticle : AbstractVerticle() {
override fun start() {
GlobalScope.launch(vertx.dispatcher()) {
val index = vertx.sharedData().getLocalCounterAwait(DataProcessingVerticle)
val address = "DataProcessingVerticle_${index.getAndIncrementAwait()}"
vertx.eventBus().consumer<JsonObject>(address) {
event -> handle(event.body())
}
}
}
private fun handle(inputData: JsonObject) {
val outputData = doTransformation(inputData)
vertx.eventBus().send("ConsoleOutputVerticle", outputData)
}
}
That's all I wanted to show you today. Check out our project to learn more of advanced techniques we use to work with Vert.x framework. If you have any questions, just leave me a message :)
Cheers...
Top comments (0)