DEV Community

Cover image for Speeding up Stream-Static Joins on Apache Spark
Gonçalo Trincão Cunha
Gonçalo Trincão Cunha

Posted on • Originally published at Medium

Speeding up Stream-Static Joins on Apache Spark

Some time ago I came across a use case where a spark structured streaming job required a join with static data located on very large table.

The first approach taken wasn’t really great. Even with small micro-batches, it increased the batch processing time by orders of magnitude.

A (very) simplified example of this case could be a stream of sales events that needs to be merged with additional product information located on a large table of products.

This post is about using mapPartitions to join Spark Structured Streaming data frames with static data.

Approach #1 — Stream-Static Join

The first approach involved a join of the sales events data frame with the static products table.

Stream-static Join

Image by Author

Unfortunately, the join caused each micro-batch to do a full scan of the product table, resulting in a high batch duration even if the stream had a single record to process.

join performance

Image by Author

The code went like this:

// streamingDS = … Sales stream initialization …
// Read static product table
val staticDS =
// Join of sales stream with products table
    streamingDS("productId")===staticDS("productId") &&
    case (sale,product) => new SaleInfo(sale, Some(product))
Enter fullscreen mode Exit fullscreen mode

Using a small demo application, the DAG shows the culprit:

The partitioning of the static table was ignored and thus all rows of all partitions (in this case 5) where read.
The full table scan of the product table added >1min to the micro-batch duration, even if it has only one event.

join DAG

Image by Author

Approach #2 — mapPartitions

The second approach was based on a lookup to a key-value store for each sale event via Spark mapPartitions operation, which allows you to make data frame/data set transformations at the row level.

mapPartitions approach

Image by Author

Neither Parquet nor Delta tables are suitable for individual key lookup, so the prerequisite for this scenario is to have the product information loaded into a key value store (Mongo DB in this example).

The sample code is a bit more complex, but in certain cases well worth the effort to keep the batch duration low. Especially on small micro-batches.

// streamingDS = … Sales stream initialization …
streamingDS.mapPartitions(partition => {
  // setup DB connection
  val dbService = new ProductService()
  dbService.connect() => {
    // Product lookup and merge
    val product = dbService.findProduct(sale.productId)
    new SaleInfo(sale, Some(product))
Enter fullscreen mode Exit fullscreen mode

The new batch duration graph shows that the problem is long gone, and we’re back to a short batch duration.

mapPartitions performance

Image by Author

Hope you enjoyed reading! Please let me know if you have better approaches to this problem.

Test details: Spark version 3.2.1 running on Ubuntu 20.04 LTS / WSL2.

Test Code:

Photo by Marc Sendra Martorell on Unsplash

Top comments (0)