DEV Community

Cover image for Running pyspark jobs on Google Cloud Dataproc
Jader Lima
Jader Lima

Posted on

Running pyspark jobs on Google Cloud Dataproc

This blog focuses on data processing and its tools and techniques, with a particular emphasis on big data tools in cloud environments like GCP, AWS, and Azure. Featuring hands-on material, we will demonstrate various ways to use different tools and techniques. Feel free to use the material as you wish and reach out with any suggestions.

About This Post

In this post, we will process data using batch processing techniques, which handle data manually or via scheduling tools. This technique completes the task at the end of processing and waits until it is manually or automatically triggered again. The data will be stored using cloud storage following the medallion architecture, also known as the multi-hop architecture. This architecture typically consists of three layers: bronze, silver, and gold, and is widely used for creating data lakes and datalakehouses.

In the medallion architecture:

  • Bronze Layer: This is the raw data layer, where data is stored in a format suitable for big data processing, such as Parquet, Delta, or ORC.
  • Silver Layer: In this layer, data is cleaned, organized, and deduplicated but without significant transformations.
  • Gold Layer: This is the layer designed for consumption, where significant business transformations and aggregations take place. Raw, Trusted, and Refined Layers: These terms are equivalent to bronze, silver, and gold, respectively.

The Dataproc service will handle reading, processing, cleaning, and writing the data in the different layers of the data lake.

Apache Spark

Apache Spark is an open-source framework for large-scale data processing. It provides a unified programming interface for cluster computing and enables efficient parallel data processing. Spark supports multiple programming languages, including Python, Scala, and Java, and is widely used for data analysis, machine learning, and real-time data processing.

Google Dataproc

Google Dataproc is a managed service from Google Cloud that simplifies the processing of large datasets using frameworks like Apache Spark and Apache Hadoop. It streamlines the creation, management, and scaling of data processing clusters, allowing you to focus on data analysis rather than infrastructure.

Cloud Storage

Google Cloud Storage is a highly scalable and durable object storage service from Google Cloud. It allows you to store and access large volumes of unstructured data, such as media files, backups, and large datasets. Cloud Storage offers various storage options to meet different performance and cost needs.

Architecture Diagram

Architecture Diagram

Environment Variable Configuration

The commands below can be executed using the Google Cloud Shell or by configuring the GCP CLI on a personal notebook.

To list existing projects, execute the command below:

gcloud projects list
Enter fullscreen mode Exit fullscreen mode

To list available regions, execute the command below:

gcloud compute regions list
Enter fullscreen mode Exit fullscreen mode

To list available zones, execute the command below:

gcloud compute zones list
Enter fullscreen mode Exit fullscreen mode

The variables below are used to create the necessary storages. Three storages will be created: one to serve as a datalake, another to store PySpark scripts and auxiliary JARs, and the last one to store Dataproc cluster information. For a simple test of the Dataproc service, the configurations below are sufficient and work with a free trial account on GCP.

######## STORAGE NAMES AND GENERAL PARAMETERS ##########
PROJECT_ID=<YOUR_GCP_PROJECT_ID>
REGION=<YOUR_GCP_REGION>
ZONE=<YOUR_GCP_ZONE>
GCP_BUCKET_DATALAKE=<YOUR_DATALAKE_STORAGE_NAME>
GCP_BUCKET_BIGDATA_FILES=<YOUR_STORAGE_FILE_NAME>
GCP_BUCKET_DATAPROC=<YOUR_STORAGE_DATAPROC_NAME>

###### DATAPROC ENV #################
DATAPROC_CLUSTER_NAME=<YOUR_DATAPROC_CLUSTER_NAME>
DATAPROC_WORKER_TYPE=n2-standard-2
DATAPROC_MASTER_TYPE=n2-standard-2
DATAPROC_NUM_WORKERS=2
DATAPROC_IMAGE_VERSION=2.1-debian11
DATAPROC_WORKER_NUM_LOCAL_SSD=1
DATAPROC_MASTER_NUM_LOCAL_SSD=1
DATAPROC_MASTER_BOOT_DISK_SIZE=32   
DATAPROC_WORKER_DISK_SIZE=32
DATAPROC_MASTER_BOOT_DISK_TYPE=pd-balanced
DATAPROC_WORKER_BOOT_DISK_TYPE=pd-balanced
DATAPROC_COMPONENTS=JUPYTER

#########
GCP_STORAGE_PREFIX=gs://
BRONZE_DATALAKE_FILES=bronze
TRANSIENT_DATALAKE_FILES=transient
BUCKET_DATALAKE_FOLDER=transient
BUCKET_BIGDATA_JAR_FOLDER=jars
BUCKET_BIGDATA_PYSPARK_FOLDER=scripts
DATAPROC_APP_NAME=ingestion_countries_csv_to_delta 
JAR_LIB1=delta-core_2.12-2.3.0.jar
JAR_LIB2=delta-storage-2.3.0.jar 
APP_NAME='countries_ingestion_csv_to_delta'
SUBJECT=departments
FILE=countries
Enter fullscreen mode Exit fullscreen mode

Creating the Services that Make Up the Solution

  1. Storage Create the storage buckets with the commands below:
gcloud storage buckets create gs://$GCP_BUCKET_BIGDATA_FILES --default-storage-class=nearline --location=$REGION
gcloud storage buckets create gs://$GCP_BUCKET_DATALAKE --default-storage-class=nearline --location=$REGION
gcloud storage buckets create gs://$GCP_BUCKET_DATAPROC --default-storage-class=nearline --location=$REGION
Enter fullscreen mode Exit fullscreen mode

The result should look like the image below:
storage created

Uploading Solution Files

After creating the cloud storages, it is necessary to upload the CSV files that will be processed by Dataproc, as well as the libraries used by the PySpark script and the PySpark script itself.

Before starting the file upload, it's important to understand the project repository structure, which includes folders for the Data Lake files, the Python code, and the libraries.

repo structure

Data Lake Storage

There are various ways to upload the files. We'll choose the easiest method: simply select the storage created to store the Data Lake files, click on "Upload Folder," and select the "transient" folder. The "transient" folder is available in the application repository; just download it to your local machine.

The result should look like the image below:

datalake

datalake2

Big Data Files Storage

Now it's necessary to upload the PySpark script containing the application's processing logic and the required libraries to save the data in Delta format. Upload the "jars" and "scripts" folders.

The result should look like the image below:

big data files

  1. Dataproc Cluster

When choosing between a single-node cluster and a multi-node cluster, consider the following:

Single-Node Cluster: Ideal for proof of concept projects and more cost-effective, but with limited computational power.
Multi-Node Cluster: Offers greater computational power but comes at a higher cost.
For this experiment, you can choose either option based on your needs and resources.

Single-Node Cluster

gcloud dataproc clusters create $DATAPROC_CLUSTER_NAME \
--enable-component-gateway --bucket $GCP_BUCKET_DATAPROC \
--region $REGION --zone $ZONE --master-machine-type $DATAPROC_MASTER_TYPE \
--master-boot-disk-type $DATAPROC_MASTER_BOOT_DISK_TYPE --master-boot-disk-size $DATAPROC_MASTER_BOOT_DISK_SIZE \
--num-master-local-ssds $DATAPROC_MASTER_NUM_LOCAL_SSD --image-version $DATAPROC_IMAGE_VERSION --single-node \
--optional-components $DATAPROC_COMPONENTS --project $PROJECT_ID
Enter fullscreen mode Exit fullscreen mode

Multi-Node Cluster

gcloud dataproc clusters create $DATAPROC_CLUSTER_NAME \
--enable-component-gateway --bucket $GCP_BUCKET_DATAPROC \
--region $REGION --zone $ZONE --master-machine-type $DATAPROC_MASTER_TYPE \
--master-boot-disk-type $DATAPROC_MASTER_BOOT_DISK_TYPE --master-boot-disk-size $DATAPROC_MASTER_BOOT_DISK_SIZE \
--num-master-local-ssds $DATAPROC_MASTER_NUM_LOCAL_SSD --num-workers $DATAPROC_NUM_WORKERS --worker-machine-type $DATAPROC_WORKER_TYPE \
--worker-boot-disk-type $DATAPROC_WORKER_BOOT_DISK_TYPE --worker-boot-disk-size $DATAPROC_WORKER_DISK_SIZE \
--num-worker-local-ssds $DATAPROC_WORKER_NUM_LOCAL_SSD --image-version $DATAPROC_IMAGE_VERSION \
--optional-components $DATAPROC_COMPONENTS --project $PROJECT_ID
Enter fullscreen mode Exit fullscreen mode

Dataproc graphic interface:

dataproc web interface

Dataproc details:

dataproc web interface details

Dataproc web interfaces:
dataproc web interface items

Dataproc jupyter web interfaces:

dataproc jupyter

To list existing Dataproc clusters in a specific region, execute:

gcloud dataproc clusters list --region=$REGION
Enter fullscreen mode Exit fullscreen mode

Running a PySpark Job with Spark Submit

Spark Submit

To use an existing cluster, besides the notebook interface, we can submit a PySpark or Spark script with Spark Submit.

Create new variables for job execution:

PYSPARK_SCRIPT_PATH=$GCP_STORAGE_PREFIX$GCP_BUCKET_BIGDATA_FILES/$BUCKET_BIGDATA_PYSPARK_FOLDER/$PYSPARK_INGESTION_SCRIPT
JARS_PATH=$GCP_STORAGE_PREFIX$GCP_BUCKET_BIGDATA_FILES/$BUCKET_BIGDATA_JAR_FOLDER/$JAR_LIB1
JARS_PATH=$JARS_PATH,$GCP_STORAGE_PREFIX$GCP_BUCKET_BIGDATA_FILES/$BUCKET_BIGDATA_JAR_FOLDER/$JAR_LIB2
TRANSIENT=$GCP_STORAGE_PREFIX$GCP_BUCKET_DATALAKE/$BUCKET_DATALAKE_FOLDER/$SUBJECT/$FILE
BRONZE=$GCP_STORAGE_PREFIX$GCP_BUCKET_DATALAKE/$BRONZE_DATALAKE_FILES/$SUBJECT
Enter fullscreen mode Exit fullscreen mode

To verify the contents of the variables, use the echo command:

echo $PYSPARK_SCRIPT_PATH
echo $JARS_PATH
echo $TRANSIENT
echo $BRONZE
Enter fullscreen mode Exit fullscreen mode

About the PySpark Script

The PySpark script is divided into two steps:

  • Receiving the parameters sent by the spark-submit command. These parameters are:

--app_name - PySpark Application Name
--bucket_transient - URI of the GCS transient bucket
--bucket_bronze - URI of the GCS bronze bucket
Calling the main method

step1

  • Main Method Calls the method that creates the Spark session Calls the method that reads the data from the transient layer stored in the storage Calls the method that writes the data to the bronze layer in the storage

pyspark_functions

Execute Spark Submit with the command below:

gcloud dataproc jobs submit pyspark \
--project $PROJECT_ID --region $REGION \
--cluster $DATAPROC_CLUSTER_NAME \
--jars $JARS_PATH \
$PYSPARK_SCRIPT_PATH \
-- --app_name=$DATAPROC_APP_NAME --bucket_transient=$TRANSIENT \
--bucket_bronze=$BRONZE
Enter fullscreen mode Exit fullscreen mode

To verify dataproc job execution, in select dataproc cluster, click in job to se results and logs

dataproc_execution

In datalake storage, a new folder was created, to bronze layer data. As your datalake is became bigger more and more folder will be created

bronze_folder

bronze_folder_detail

Removing the Created Services

To avoid unexpected costs, remove the created services and resources after use.

To delete the created storages along with their contents, execute:

gcloud storage rm --recursive $GCP_STORAGE_PREFIX$GCP_BUCKET_DATALAKE
gcloud storage rm --recursive $GCP_STORAGE_PREFIX$GCP_BUCKET_BIGDATA_FILES
gcloud storage rm --recursive $GCP_STORAGE_PREFIX$GCP_BUCKET_DATAPROC
Enter fullscreen mode Exit fullscreen mode

To delete the Dataproc cluster created in the experiment, execute:

gcloud dataproc clusters delete $DATAPROC_CLUSTER_NAME --region=$REGION
Enter fullscreen mode Exit fullscreen mode

After deletion, the command below should not list any existing clusters:

gcloud dataproc clusters list --region=$REGION
Enter fullscreen mode Exit fullscreen mode

Conclusion

The need to process data using PySpark exists in various scenarios, including batch and streaming processing. Currently, many cloud providers offer the option of managed clusters, which are easy to manage.

In this blog, we demonstrated how to create a managed cluster on Google Cloud Platform and how to use storage that serves as a distributed file system. Clouds like AWS offer similar services, such as AWS EMR and AWS S3, or Azure with HDInsight and Storage Account Gen2.

Links and References

Top comments (0)