Some time ago I came across a use case where a spark structured streaming job required a join with static data located on very large table.
The first approach taken wasn’t really great. Even with small micro-batches, it increased the batch processing time by orders of magnitude.
A (very) simplified example of this case could be a stream of sales events that needs to be merged with additional product information located on a large table of products.
This post is about using mapPartitions to join Spark Structured Streaming data frames with static data.
Approach #1 — Stream-Static Join
The first approach involved a join of the sales events data frame with the static products table.
Unfortunately, the join caused each micro-batch to do a full scan of the product table, resulting in a high batch duration even if the stream had a single record to process.
The code went like this:
// streamingDS = … Sales stream initialization …
// Read static product table
val staticDS = spark.read
.format("parquet")
.load("/tmp/prods.parquet").as[Product]
// Join of sales stream with products table
streamingDS
.joinWith(staticDS,
streamingDS("productId")===staticDS("productId") &&
streamingDS("category")===staticDS("category"))
.map{
case (sale,product) => new SaleInfo(sale, Some(product))
}
Using a small demo application, the DAG shows the culprit:
The partitioning of the static table was ignored and thus all rows of all partitions (in this case 5) where read.
The full table scan of the product table added >1min to the micro-batch duration, even if it has only one event.
Approach #2 — mapPartitions
The second approach was based on a lookup to a key-value store for each sale event via Spark mapPartitions operation, which allows you to make data frame/data set transformations at the row level.
Neither Parquet nor Delta tables are suitable for individual key lookup, so the prerequisite for this scenario is to have the product information loaded into a key value store (Mongo DB in this example).
The sample code is a bit more complex, but in certain cases well worth the effort to keep the batch duration low. Especially on small micro-batches.
// streamingDS = … Sales stream initialization …
streamingDS.mapPartitions(partition => {
// setup DB connection
val dbService = new ProductService()
dbService.connect()
partition.map(sale => {
// Product lookup and merge
val product = dbService.findProduct(sale.productId)
new SaleInfo(sale, Some(product))
}).iterator
})
The new batch duration graph shows that the problem is long gone, and we’re back to a short batch duration.
Hope you enjoyed reading! Please let me know if you have better approaches to this problem.
Test details: Spark version 3.2.1 running on Ubuntu 20.04 LTS / WSL2.
Test Code: https://github.com/trincaog/spark-mappartitions-test
Photo by Marc Sendra Martorell on Unsplash
Top comments (0)