DEV Community

Rubens Barbosa
Rubens Barbosa

Posted on

ETL with Spark on Azure Databricks and Azure Data Warehouse (Part 2)

Hey y'all, this is a continuation of the previous article. We already have data on Azure Data Lake Storage. Now, we will integrate it with Apache Spark on Azure Databricks to perform a small transformation on top of the JSON, and send the data to Azure SQL Data Warehouse. I'll try to be the most hands on as possible. Let's get started!

What is Apache Spark?

Apache Spark is a framework for processing large-scale data, i.e., Big Data distributed across clusters. It is used for executing data engineering, data science, and machine learning.

Overview

The main abstraction Spark provides is a Resilient Distributed Dataset RDD which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. Thus, Spark running multiple processes concurrently in parallel that don't interfere each other. RDD can be created from text files, SQL databases, NoSQL databases, HDFS, Cloud Storage and so on. The processing of RDD is done entirely in memory.

At a high level, in a Spark cluster you will have a driver node and then several worker nodes. The driver node is running the main program which has all of the transformations that you want to do with your data and then get sent out to the worker nodes who then operate a task and return that result do the driver node. This is the core engine of Spark on top of that there are several library modules that allow developers to easily interact with the core engine. These libraries include: Spark SQL, Spark Streaming, MLLib, GraphX.

Databricks

Databricks is a company that was founded by the creators of Apache Spark with the intention to make Apache Spark much easier to use it. Databricks develops a web-based platform for working with Spark. that provides automated cluster management and IPython style notebooks.

The Databricks workspace is the cloud based environment in which you use Databricks and includes the user interface, integrated storage, security settings, job scheduling, and most importantly notebooks.

Cluster

As mentioned before Spark is all about cluster. That's why, we'll first create a cluster on Databricks. After launch Azure Databricks workspace go to compute, then create cluster.

Create Cluster on Azure Databricks

Once we have our cluster running we can create a notebook and start coding, which I'll use PySpark. We will start a creation of ETL with PySpark on Azure Databricks. In the load phase we will write data on Azure SQL Data Warehouse. So, we must already have our Data Warehouse deployed and get the connection string.

Creation of ETL with PySpark on Azure Databricks Notebook

Let's have a look.

Extract

First of all, we need extract the JSON file from Azure Data Lake Storage and read it into DataFrame. After that, we will ingest data into PySpark DataFrame.

# Title: ETL Spark: extract from Azure Data Lake Storage, and load to Azure SQL Data Warehouse
# Language: PySpark
# Author: Rubens Santos Barbosa

# config the session using spark object and set the key from our azure data lake storage account
spark.conf.set(
  "fs.azure.account.key.YOUR_AZURE_DATA_LAKE_STORAGE_ACCOUNT_NAME.dfs.core.windows.net",
  "YOUR_AZURE_DATA_LAKE_ACCOUNT_KEY"
)

# abfss://AZURE_DL_CONTAINER_NAME@AZURE_DL_STORAGE_ACCOUNT_NAME.dfs.core.windows.net/DIRECTORY_CLIENT
dbutils.fs.ls("abfss://az-covid-data@engdatalake.dfs.core.windows.net/directory-covid19")

# path JSON file on azure data lake storage 
covid_data_json = "abfss://az-covid-data@engdatalake.dfs.core.windows.net/directory-covid19/covid-2022-4-21.json"

# read JSON file into DataFrame
df = spark.read.option("multiline","true").json(covid_data_json)

# PySpark print schema
df.printSchema()
Enter fullscreen mode Exit fullscreen mode

PySpark on Azure Databricks

We might wanna see some content from our DataFrame.

# showing first 5 rows
df.head(5)
Enter fullscreen mode Exit fullscreen mode

Firts rows

Transform

We've just done the data extraction. Now, we will do a little transformation. Let's analyze if there is missing data in our columns on PySpark Dataframe.

# missing values in a specific column of pySpark dataframe
df.filter(df['bairroPaciente'].isNull()).count()

# count null value in every column
for col in df.columns:
  print(col, "\t", "with null values: ", df.filter(df[col].isNull()).count())

df.count()
Enter fullscreen mode Exit fullscreen mode

missing data

We noticed that our PySpark DataFrame there are 82 rows, and there some columns with 81 null values. So, let's drop these columns.

# columns in pyspark dataframe to drop
columns_to_drop = ['classificacaoEstadoSivep', 'comorbidadeAsmaSivep', 'comorbidadeDiabetesSivep', 'comorbidadeHematologiaSivep', 'comorbidadeImunodeficienciaSivep', 'comorbidadeNeurologiaSivep', 'comorbidadeObesidadeSivep', 'comorbidadePneumopatiaSivep', 'comorbidadePuerperaSivep', 'comorbidadeRenalSivep', 'comorbidadeSindromeDownSivep', 'dataEntradaUtisSvep', 'dataEvolucaoCasoSivep', 'dataInternacaoSivep', 'dataResultadoExame', 'dataSolicitacaoExame', 'evolucaoCaso', 'idSivep', 'paisPaciente', 'cnesNotificacaoEsus', 'comorbidadeCardiovascularSivep', 'dataColetaExame', 'resultadoFinalExame', 'tipoTesteExame']

# delete columns in pyspark dataframe
df = df.drop(*columns_to_drop)
Enter fullscreen mode Exit fullscreen mode

delete columns

Let's display our data.

df.describe()
display(df)
Enter fullscreen mode Exit fullscreen mode

display data

As you can see above the columns dataInicioSintomas and dataNotificacao are in timestamp format, I will transform it to date format in our PySpark DataFrame.

from pyspark.sql.functions import to_date
# timestamp to date
df = df.withColumn("dataInicioSintomas", to_date(df['dataInicioSintomas']))
df = df.withColumn("dataNotificacao", to_date(df['dataNotificacao']))

display(df)
Enter fullscreen mode Exit fullscreen mode

timestamp2date

Load

We've done the data transformation. We will load these data into Azure SQL Data Warehouse.

# removing repeated rows
distinctDF = df.distinct()
print("Distinct count: "+str(distinctDF.count()))
display(distinctDF)

# Load PySpark DataFrame to Azure SQL Data Warehouse
db_table = "dbo.COVID"
sql_password = "YOUR_PASSWORD"
jdbc_url = "jdbc:sqlserver://cosmos-database.database.windows.net:1433;database=cosmos-pool;user=rubnsbarbosa@cosmos-database;password=" + sql_password + ";encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"

distinctDF.write.jdbc(url=jdbc_url, table=db_table, mode="append")
Enter fullscreen mode Exit fullscreen mode

load

We've finished the ETL with PySpark on Azure Databricks.

Azure SQL Data Warehouse

Before loading PySpark DataFrame into Azure SQL Data Warehouse, we must have created our table in our SQL DW. So, we must enter the query editor and create. You can see the query I created below.

CREATE TABLE dbo.COVID (
    bairroPaciente VARCHAR(254),
    codigoMunicipioPaciente VARCHAR(254),
    codigoPaciente VARCHAR(254),
    dataInicioSintomas VARCHAR(50),
    dataNotificacao VARCHAR(50),
    estadoPaciente VARCHAR(10),
    idadePaciente VARCHAR(10),
    municipioNotificacaoEsus VARCHAR(60),
    municipioPaciente VARCHAR(100),
    profissionalSaude VARCHAR(60),
    racaCorPaciente VARCHAR(60),
    sexoPaciente VARCHAR(60)
);
Enter fullscreen mode Exit fullscreen mode

It might happen some firewall issues when you try to load the data, you just need go into Firewalls and Virtual Networks [inside of SQL DW] and save the Client IP address. Finally, let's see the data into our Azure SQL Data Warehouse.

dw

Conclusion

We have the second and last part of our project completed. It was created an ETL using Spark on Azure Databricks Cluster. In the extraction phase we got data from Azure Data Lake Storage, we performed a basic transformation, and the data was loaded into Azure SQL Data Warehouse as proposed.

Latest comments (0)