I've been working with the Vert.x framework for more than 4 years but I won't stop being excited how simple, lightweight and elegant it is (especially the event loop thread model). In this blog post I will tell you how we implemented PeriodicallyExpiringHashMap
data structure in less than 100 lines of code. But first let me give you a bit of a context about why do we need it.
Problem
SIP3 is a very advanced VoIP monitoring and troubleshooting platform. To provide detailed information about calls quality we need to:
- Aggregate RTP packets into RTP streams in a real-time
- Periodically walk though all the RTP streams and terminate ones that haven't been updated for a certain period of time.
Let's stay away from telecom specific and take a look at a simplified code example:
class RtpStreamHandler : AbstractVerticle() {
var expirationDelay: Long = 1000
var aggregationTimeout: Long = 30000
private val rtpStreams = mutableMapOf<String, RtpStream>()
override fun start() {
vertx.setPeriodic(expirationDelay) {
val now = System.currentTimeMillis()
rtpStreams.filterValues { rtpStream -> rtpStream.updatedAt + aggregationTimeout < now }
.forEach { (rtpStreamId, rtpStream) ->
terminateRtpStream(rtpStream)
rtpStreams.remove(rtpStreamId)
}
}
vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
val rtpPacket = event.body()
handleRtpPacket(rtpPacket)
}
}
fun handleRtpPacket(rtpPacket: RtpPacket) {
val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
rtpStream.addPacket(rtpPacket)
}
fun terminateRtpStream(rtpStream: RtpStream) {
vertx.eventBus().localSend("on_rtp_stream", rtpStream)
}
}
Now let's imagine that we constantly have 30K of active RTP streams. Also every second we terminate approximately a thousand of old steams but get a thousand of new ones instead. In these circumstances our code doesn't look very efficient and we certainly need a better solution.
Solution
As you can see from the first code snippet once an RTP stream was updated it won't be terminated at least for the next aggregationTimeout
. This means that we can simply do not bother about it for some time.
And this is the key idea behind the SIP3 PeriodicallyExpiringHashMap
implementation:
class PeriodicallyExpiringHashMap<K, V> private constructor(
vertx: Vertx,
private val delay: Long,
private val period: Int,
private val expireAt: (K, V) -> Long,
private val onExpire: (K, V) -> Unit
) {
private val objects = mutableMapOf<K, V>()
private val expiringSlots = (0 until period).map { mutableMapOf<K, V>() }.toList()
private var expiringSlotIdx = 0
init {
vertx.setPeriodic(delay) {
terminateExpiringSlot()
expiringSlotIdx += 1
if (expiringSlotIdx >= period) {
expiringSlotIdx = 0
}
}
}
fun getOrPut(key: K, defaultValue: () -> V): V {
return objects.getOrPut(key) {
defaultValue.invoke().also { expiringSlots[expiringSlotIdx][key] = it }
}
}
private fun terminateExpiringSlot() {
val now = System.currentTimeMillis()
expiringSlots[expiringSlotIdx].apply {
forEach { (k, v) ->
val expireAt = expireAt(k, v)
when {
expireAt <= now -> {
objects.remove(k)?.let { onExpire(k, it) }
}
else -> {
var shift = ((expireAt - now) / delay).toInt() + 1
if (shift >= period) {
shift = period - 1
}
val nextExpiringSlotIdx = (expiringSlotIdx + shift) % period
expiringSlots[nextExpiringSlotIdx][k] = v
}
}
}
clear()
}
}
data class Builder<K, V>(
var delay: Long = 1000,
var period: Int = 60,
var expireAt: (K, V) -> Long = { _: K, _: V -> Long.MAX_VALUE },
var onExpire: (K, V) -> Unit = { _: K, _: V -> }
) {
fun delay(delay: Long) = apply { this.delay = delay }
fun period(period: Int) = apply { this.period = period }
fun expireAt(expireAt: (K, V) -> Long) = apply { this.expireAt = expireAt }
fun onExpire(onExpire: (K, V) -> Unit) = apply { this.onExpire = onExpire }
fun build(vertx: Vertx) = PeriodicallyExpiringHashMap(vertx, delay, period, expireAt, onExpire)
}
}
Here are the benefits of this data structure:
- Now we just have a bunch of time slots. So, instead of walking through all the objects in our map every
expirationDelay
we can walk trough a single slot. So, instead of checking on 30K objects every second we will check on 1K only. - We don't need to create a copy of original map every time we decide to walk though it. In the previous example it also was an issue, because
rtpSteams.filtervalues
creates a copy of the original map. - The last and the most important. Our implementation will stay consistent within a particular verticle context. That means you can simply extend it and implement the rest of the methods (including tricky ones, like
size()
).
Conclusions
Finally let's see how our verticle will look like with the new PeriodicallyExpiringHashMap
data structure:
class RtpStreamHandler : AbstractVerticle() {
var expirationDelay: Long = 1000
var aggregationTimeout: Long = 30000
private lateinit var rtpStreams: PeriodicallyExpiringHashMap<String, RtpStream>
override fun start() {
rtpStreams = PeriodicallyExpiringHashMap.Builder<String, RtpStream>()
.delay(expirationDelay)
.period((aggregationTimeout / expirationDelay).toInt())
.expireAt { _, rtpStream -> rtpStream.updatedAt + aggregationTimeout }
.onExpire { _, rtpStream -> terminateRtpStream(rtpStream) }
.build(vertx)
vertx.eventBus().localConsumer<RtpPacket>("on_rtp_packet") { event ->
val rtpPacket = event.body()
handleRtpPacket(rtpPacket)
}
}
fun handleRtpPacket(rtpPacket: RtpPacket) {
val rtpStream = rtpStreams.getOrPut(rtpPacket.rtpStreamId) { RtpStream() }
rtpStream.addPacket(rtpPacket)
}
fun terminateRtpStream(rtpStream: RtpStream) {
vertx.eventBus().localSend("on_rtp_stream", rtpStream)
}
}
And here are the load tests results (purple - is our new implementation):
The tests look great and the code looks clean and simple thanks to the Vert.x event loop thread model.
👨💻 Happy coding,
Your SIP3 team.
Top comments (2)
Folks, try LinkedHashMap - all your entries will be iterated in the order of insertion. Which coincides with the order of expiration.
Which means you only need to iterate and remove expired until you reach the first non-expired entry. Which makes checking for expired entries amortized constant. And it also simplifies the code
Thank you for a good hint. Unfortunately it won't work in our case because our entries are not ordered. Also there are more complicated scenarios I haven't shown in this post.
Like when we aggregate SIP transaction the expiration time will be different depending on the transaction's state: github.com/sip3io/sip3-salto-ce/bl...
That's why we use a method called
touch
to update the entry expiration slot. And it's not really possible to implement withLinkedHashMap
only: github.com/sip3io/sip3-commons/blo...