DEV Community

Cover image for Spark SQL for taxi trip data
Cris Crawford
Cris Crawford

Posted on

Spark SQL for taxi trip data

Today we learned about how you can use Spark to execute SQL commands. As an example, we took the revenue command from week 4 and wrote it as a Spark command that called SQL.

First, we had to download all the 2020 and 2021 green and yellow taxi trip data to parquet files on our virtual machine. This caused a problem for me which took me some time to figure out. I ran out of storage space on my virtual machine, and rather than increasing the disk size, I increased RAM with no effect other than to increase RAM, which would have been a problem because RAM is the most expensive part of a virtual machine.

So let's start from the beginning. First we opened a jupyter notebook, imported Spark, and created a Spark session.

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()
Enter fullscreen mode Exit fullscreen mode

Next, we used pandas to get the general schema from a file, and we edited the schema in VSCode in order to tell Spark what it's reading. I skipped this step, simply copying what already was in the jupyter notebook. Here's part of the schema, plus the import statement that we use:

from pyspark.sql import types

green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    [etc.]
Enter fullscreen mode Exit fullscreen mode

Then we wrote a function to fetch the csv files from the repository and read them back into parquet files. I set up a directory structure to hold the files: in the same directory, I made a directory called data, then in that directory I made two directories, raw and pq. They were both subdivided by type (green or yellow), year, and month, with one file going into each month directory.

Then we cut and pasted the following code 4 times, for each type and each year (2020 and 2021).

year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'data/raw/green/{year}/{month:02d}/'
    output_path = f'data/pq/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)
Enter fullscreen mode Exit fullscreen mode

This went well for green data, and for yellow data in 2021. But for yellow taxi trips in January and February of 2020 (pre-pandemic), I ran out of memory. I asked ChatGPT how to fix this, and it told me to go to Google cloud, stop the virtual machine, edit it, and choose a new machine with more memory. This was wrong. I did that, changing from a machine with 16G to one with 32G, and nothing changed. I did it again, this time changing from 32G to 64G. Nothing! I still ran out of storage. I looked at the FAQ, and I searched the Slack channel for similar problems. There was a similar problem, and the solution was to increase the disk size. Well, I already did that, right? So I gave up for the day.

The next day I tried different things - cleaning up my disk space (there wasn't much else on it), typing all kinds of terminal commands to view disk space, etc. Finally I posted my dilemma in the Slack channel. While waiting for an answer, I looked back over the questions on Slack and found the one I had seen before. I noticed this time, however, that the instructions were to increase disk space rather than change the machine type. I went back and increased disk space, and it worked! Meanwhile, Bruno Oliveira answered my question, I replied that I found the answer, and he told me to definitely not change the machine type. So I went back, edited the virtual machine, and put it back to 16G.

Now I was ready to start the video where we would reproduce what we did in week 4. This was pretty simple. First I read the parquet files into Spark dataframes. There was no need to specify schema now, because the parquet files had the correct schema. In order to union the two dataframes together, I needed to change a few things. First, I had to change the pickup and dropoff datatimes names and remove the prefix "lpep" from the green taxi data and "tpep" from the yellow taxi data.

df_green = spark.read.parquet('data/pq/green/*/*')

df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')
Enter fullscreen mode Exit fullscreen mode

Next, I made a list of columns common to both types. There are different ways to do this, but I wanted to preserve the order of the columns in the original data.

common_columns = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_columns.append(col)
Enter fullscreen mode Exit fullscreen mode

Then I had to add a column for service_type, green or yellow. This required me to import Spark functions and use F.lit (for literal) function. I don't know why, but that's what the instructor did.

from pyspark.sql import functions as F

df_green_sel = df_green \
    .select(common_columns) \
    .withColumn('service_type', F.lit('green'))

df_yellow_sel = df_yellow \
    .select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))
Enter fullscreen mode Exit fullscreen mode

Now I was ready to union the two datasets:

df_trips_data = df_green_sel.unionAll(df_yellow_sel)
Enter fullscreen mode Exit fullscreen mode

And now I could count the data and see how much I had.

df_trips_data.groupBy('service_type').count().show()
Enter fullscreen mode Exit fullscreen mode
+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+
Enter fullscreen mode Exit fullscreen mode

These were the same numbers that the instructor had, which means I loaded my data correctly.

Finally, I ran the SQL command from week 4. In order to do this, first we have to define a table.

df_trips_data.registerTempTable('trips_data')
Enter fullscreen mode Exit fullscreen mode

To test this, we ran the groupBy above as a SQL command:

spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY 
    service_type
""").show()
Enter fullscreen mode Exit fullscreen mode

The results were the same as for the dataframe groupBy function.
The final SQL command was to generate revenue totals by month for each zone and each service type.

df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

df_result.show()
Enter fullscreen mode Exit fullscreen mode
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|      revenue_month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|         250|2020-02-01 00:00:00|       green|  15359.960000000061|               1282.5|                  117.5|                     56.01|           590.3200000000003|                    180.0000000000011|           17598.43999999995|                                11.0|        1.2394957983193278|       4.962810650887575|
|         158|2020-02-01 00:00:00|       green|              124.36|                 8.25|                    0.5|                       0.0|                         2.8|                   0.8999999999999999|                      136.81|                                null|                      null|      11.090000000000002|
|          15|2020-03-01 00:00:00|       green|  1682.2299999999996|                  5.5|                    6.5|                       0.0|                       79.56|                   19.800000000000026|                     1802.44|                                 0.0|                       1.0|       7.910298507462685|
|         229|2020-03-01 00:00:00|       green|   676.3600000000001|                  0.0|                    1.0|                       0.0|          42.839999999999996|                    8.099999999999998|                       728.3|                                 0.0|                       1.0|       7.691111111111111|
|         137|2020-10-01 00:00:00|       green|  3480.8299999999995|                  0.0|                    0.0|                     297.0|          250.92000000000013|                    32.40000000000005|          4061.1499999999996|                                null|                      null|       9.184722222222224|
|          44|2020-10-01 00:00:00|       green|               146.5|                  3.5|                    2.0|                       5.5|                         0.0|                                  1.2|                       158.7|                                 0.0|                       1.0|                 12.0475|
|         152|2020-02-01 00:00:00|       green|            37630.82|              1541.25|                 1398.5|        2836.6600000000035|           538.3700000000003|                    903.8999999999543|           45984.60000000085|                             1336.25|        1.2240055826936498|        2.72640336678537|
|         136|2020-03-01 00:00:00|       green|  12226.930000000018|                 60.5|                   89.5|                     32.16|           485.2000000000003|                   163.20000000000047|          13123.939999999977|                                8.25|        1.1637426900584795|      6.4872972972972995|
|         119|2020-03-01 00:00:00|       green|             11026.2|                100.5|                  108.5|        40.459999999999994|           525.8000000000003|                    156.0000000000002|           12021.45999999997|                                8.25|        1.0924369747899159|       5.552723880597017|
|         153|2020-03-01 00:00:00|       green|  2039.5400000000006|                 15.0|                   30.5|        29.790000000000006|                        33.4|                   34.800000000000026|          2204.7799999999993|                                 0.0|        1.1333333333333333|       4.165042016806725|
|          26|2020-03-01 00:00:00|       green|   9467.539999999999|                 52.5|                   52.0|                       0.0|          184.35000000000005|                   122.99999999999919|           9929.640000000001|                                 0.0|        1.1397849462365592|       5.237601918465228|
|         243|2019-12-01 00:00:00|       green|               13.99|                  0.0|                    0.0|                       1.0|                         0.0|                                  0.3|                       15.29|                                 0.0|                       1.0|                     0.0|
|          28|2020-03-01 00:00:00|       green|   5891.060000000002|                29.25|                   53.0|         80.32000000000001|           195.8400000000001|                    84.29999999999956|           6368.820000000014|                                2.75|        1.2043795620437956|       5.802602739726027|
|          45|2020-03-01 00:00:00|       green|             2403.39|                 11.0|                    5.0|                       0.0|                        64.0|                    21.30000000000003|          2506.6399999999994|                                 0.0|                       1.2|       9.125633802816903|
|         102|2020-03-01 00:00:00|       green|  2874.7399999999984|                 26.5|                   17.5|                     40.22|                      104.04|                    46.19999999999992|           3135.650000000001|                                 5.5|        1.1481481481481481|       4.857922077922078|
|          48|2020-10-01 00:00:00|       green|             1504.43|                  0.0|                    0.0|                     115.5|           75.72999999999999|                   12.600000000000009|          1708.2600000000007|                                null|                      null|       9.061428571428573|
|          20|2020-01-01 00:00:00|       green|            11375.42|                681.0|                  131.0|         90.61999999999999|           479.8600000000003|                   147.29999999999987|          12923.199999999997|                                11.0|        1.2297872340425533|       4.872311926605508|
|         130|2020-03-01 00:00:00|       green|   60166.09999999997|               1436.5|                 1759.0|        4972.6999999999925|           731.1700000000004|                   1234.1999999999043|           70458.47000000314|                               74.25|        1.1157574079202437|      3.8739044205495867|
|          18|2020-03-01 00:00:00|       green|   9502.130000000005|                 69.5|                  100.0|                     78.06|          414.14000000000027|                   136.19999999999945|          10401.979999999987|                                 5.5|        1.1727272727272726|       5.481708860759492|
|         265|2020-10-01 00:00:00|       green|   5726.509999999998|                16.25|                   33.0|        329.21999999999997|           213.6800000000001|                    38.69999999999999|           6359.310000000006|                                2.75|         1.178082191780822|       12.57108527131784|
+------------+-------------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
only showing top 20 rows
Enter fullscreen mode Exit fullscreen mode

This created a ton of tiny parquet files, so the last thing we did was coalesce it into one file:

df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')
Enter fullscreen mode Exit fullscreen mode

Top comments (0)