DEV Community

Cover image for GroupBy and Join in Spark
Cris Crawford
Cris Crawford

Posted on

GroupBy and Join in Spark

In the next three videos for the Data Engineering Zoomcamp, we learned about Spark internals and the SQL commands GROUP BY and JOIN.

Spark uses partitioned data. Each executor takes one partition and works on it until it's done, then takes another partition, until all the partitions are done being processed. In the case of a GROUP BY, the first pass can only group whatever is in one partition. The executors do this and store the results in temporary intermediate files. That means that if you are grouping by day, for example, there may be all the days in each parquet file. Each executor groups all the days in one partition, but there are many such groups across all the partitions.

So the output of this first step is a set of temporary files, each belonging to one partition, that has the grouped data in it. The next step is called "reshuffling", where the data from all partitions is combined for the final result.

Here's an example of a GROUP BY command, where we're grouping by the hour and the zone, counting the number of records and adding the revenue for that day and zone:

df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', lpep_pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    lpep_pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2
""")
Enter fullscreen mode Exit fullscreen mode

Here's a visualization from Spark, showing the two stages of the GROUP BY command.

Spark visualization of GROUP BY

JOIN works similarly when the files being joined are large and approximately the same size. We made another set of files from yellow taxi data, similar to the above, and then joined them with the Spark command df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer'). This is an outer join, meaning that if the first dataframe doesn't have any data for the hour and zone, but the second one does, we'll keep the record and put Null for the first dataframe. We'll do the same if the second dataframe doesn't have any data for the hour and zone, but the first dataframe does.

Here's the visualization from the Spark UI:

Spark visualization of complex JOIN

When one file is considerably smaller than the other, Spark can accomplish the join in one pass. It copies the smaller file into each executor along with the partition and does the join completely within the executor.

Here we're joining the zone and borough data from the zones file, which only has 250 or so records.

df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)
Enter fullscreen mode Exit fullscreen mode

And here's the Spark UI visualization:

Spark visualization of simple JOIN

Top comments (0)