DEV Community

Cover image for Speeding up Stream-Static Joins on Apache Spark
Gonçalo Trincão Cunha
Gonçalo Trincão Cunha

Posted on • Originally published at Medium

Speeding up Stream-Static Joins on Apache Spark

Some time ago I came across a use case where a spark structured streaming job required a join with static data located on very large table.

The first approach taken wasn’t really great. Even with small micro-batches, it increased the batch processing time by orders of magnitude.

A (very) simplified example of this case could be a stream of sales events that needs to be merged with additional product information located on a large table of products.

This post is about using mapPartitions to join Spark Structured Streaming data frames with static data.

Approach #1 — Stream-Static Join

The first approach involved a join of the sales events data frame with the static products table.

Stream-static Join

Image by Author
.

Unfortunately, the join caused each micro-batch to do a full scan of the product table, resulting in a high batch duration even if the stream had a single record to process.

join performance

Image by Author
.

The code went like this:

// streamingDS = … Sales stream initialization …
// Read static product table
val staticDS = spark.read
  .format("parquet")
  .load("/tmp/prods.parquet").as[Product]
// Join of sales stream with products table
streamingDS
  .joinWith(staticDS, 
    streamingDS("productId")===staticDS("productId") &&
    streamingDS("category")===staticDS("category"))
  .map{ 
    case (sale,product) => new SaleInfo(sale, Some(product))
  }
Enter fullscreen mode Exit fullscreen mode

Using a small demo application, the DAG shows the culprit:

The partitioning of the static table was ignored and thus all rows of all partitions (in this case 5) where read.
The full table scan of the product table added >1min to the micro-batch duration, even if it has only one event.

join DAG

Image by Author
.

Approach #2 — mapPartitions

The second approach was based on a lookup to a key-value store for each sale event via Spark mapPartitions operation, which allows you to make data frame/data set transformations at the row level.

mapPartitions approach

Image by Author
.

Neither Parquet nor Delta tables are suitable for individual key lookup, so the prerequisite for this scenario is to have the product information loaded into a key value store (Mongo DB in this example).

The sample code is a bit more complex, but in certain cases well worth the effort to keep the batch duration low. Especially on small micro-batches.

// streamingDS = … Sales stream initialization …
streamingDS.mapPartitions(partition => {
  // setup DB connection
  val dbService = new ProductService()
  dbService.connect()

  partition.map(sale => {
    // Product lookup and merge
    val product = dbService.findProduct(sale.productId)
    new SaleInfo(sale, Some(product))
  }).iterator
})
Enter fullscreen mode Exit fullscreen mode

The new batch duration graph shows that the problem is long gone, and we’re back to a short batch duration.

mapPartitions performance

Image by Author
.

Hope you enjoyed reading! Please let me know if you have better approaches to this problem.

Test details: Spark version 3.2.1 running on Ubuntu 20.04 LTS / WSL2.

Test Code: https://github.com/trincaog/spark-mappartitions-test

Photo by Marc Sendra Martorell on Unsplash

Top comments (0)