DEV Community

mustafasajid
mustafasajid

Posted on

Dynamic way doing ETL through Pyspark

Instead of writing ETL for each table separately, you can have technique of doing it dynamically by using database (MySQL,PostgreSQL,SQL-Server) and Pyspark. Follow some steps to write code , for better understanding I am breaking it into steps.

Step 1

create two tables on database(I am using SQL-SERVER) having name of TEST_DWH :
table etl_metadata for keeping master data of ETL (source and destination information)

CREATE TABLE [dbo].[etl_metadata](
    [id] [int] IDENTITY(1,1) NOT NULL,
    [source_type] [varchar](max) NULL,
    [source_info] [text] NULL,
    [destination_db] [varchar](max) NULL,
    [destination_schema] [varchar](max) NULL,
    [destination_table] [varchar](max) NULL,
    [etl_type] [varchar](max) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
Enter fullscreen mode Exit fullscreen mode

table etl_metadaata_schedule for having progress of daily ETL

CREATE TABLE [dbo].[etl_metadata_schedule](
    [id] [int] NULL,
    [source_type] [varchar](max) NULL,
    [source_info] [text] NULL,
    [destination_db] [varchar](max) NULL,
    [destination_schema] [varchar](max) NULL,
    [destination_table] [varchar](max) NULL,
    [etl_type] [varchar](max) NULL,
    [status] [varchar](max) NULL,
    [started_at] [datetime] NULL,
    [completed_at] [datetime] NULL,
    [schedule_date] [datetime] NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
Enter fullscreen mode Exit fullscreen mode

Step 2

Now write ETL in python using Pyspark

  1. Get data into pandas to loop ETL process
  2. use etl_type to switch sources of reading (In my case I have taken two cases , CSV and database)
  3. write data to destination , destination info will be used from etl_metadata

"""
Created on Thu Mar 17 11:06:28 2022

@author: Administrator
"""

#SPARK LIBRARIES
from pyspark.sql import SparkSession
import pyodbc
import pandas as pd
#initiate spark env
import findspark
findspark.init()
findspark.find()
#print(findspark.find())




spark = SparkSession \
    .builder \
    .appName("Python ETL script for TEST") \
    .master("local[*]")\
    .config("spark.driver.memory", '8g')\
    .config("spark.sql.ansi.enabled ",True)\
    .config("spark.jars", "C:\Drivers\sqljdbc42.jar") \
    .getOrCreate()





source_type = ''
source_info = ''
destination_db=''
destination_schema=''
destination_table = ''
etl_type = ''
query_string = '' 

##Initiatong variable for query establishhing

#- timedelta(43)
#today = (date.today())
#print("Today's date:", "select a.*,null status,null status_description ,null started_at,null completed_at,GETDATE() schedule_date from dbo.etl_metadata_schedule_staging where schedule_date = "+"'"+str(today)+"'")

#set variable to be used to connect the database




database = "TEST_DWH"
user = "user"
password  = "password"


query_string="SELECT a.*,CONCAT(ISNULL(b.status,'Pending'),b.status) status,null status_description ,null started_at,null completed_at FROM (SELECT *,getdate()  schedule_date FROM dbo.etl_metadata ) a LEFT JOIN [dbo].[etl_metadata_schedule] b ON a.id = b.id and  CAST(b.schedule_date AS date)= CAST(getdate() AS date) where ISNULL(b.status,'A') != 'completed'"

#Read ETL Meta Data
etl_meta_data_staging = spark.read\
    .format("jdbc") \
    .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
    .option("query", query_string) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()    


#-------------------CREATE NEW SCHEDULE----------------------------#
etl_meta_data_staging.filter("status == 'Pending'").show()


#THEN READ BASE META DATA AND  CREATE ONE ELSE DONT
etl_meta_data_staging.filter("status == 'Pending'").write \
        .format("jdbc") \
        .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
        .option("dbtable", "dbo.etl_metadata_schedule") \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
        .mode("append")\
        .save()


#-------------------END CREATE NEW SCHEDULE----------------------------#


#--------------SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
conn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
                      "Server=localhost,1433;"
                      "Database="+database+";"
                      "UID="+user+";"
                      "PWD="+password+";")

cursor = conn.cursor()
#--------------END SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#

df_etl_meta_data_staging  = etl_meta_data_staging.toPandas()

df_etl_meta_data_staging=df_etl_meta_data_staging.sort_values('id')
#---LOOP : read FROM SOURCE  (ext☺ract) and write to destination.---#
for etl_id in df_etl_meta_data_staging['id']:



   status = 'In Progress'
   print("Starting for "+ str(etl_id))
   #---------------UPDATE In Progress Status---------------#
   cursor.\
       execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
               SET [status]=\'''' 
               +status+ "',[started_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
   conn.commit()
   #---------------UPDATE In Progress Status---------------#





   # load meta data into variables
   source_type = df_etl_meta_data_staging['source_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
   source_info = df_etl_meta_data_staging['source_info'][df_etl_meta_data_staging['id']==etl_id].values[0]
   destination_db = df_etl_meta_data_staging['destination_db'][df_etl_meta_data_staging['id']==etl_id].values[0]
   destination_schema = df_etl_meta_data_staging['destination_schema'][df_etl_meta_data_staging['id']==etl_id].values[0]
   destination_table = df_etl_meta_data_staging['destination_table'][df_etl_meta_data_staging['id']==etl_id].values[0]
   etl_type = df_etl_meta_data_staging['etl_type'][df_etl_meta_data_staging['id']==etl_id].values[0]





   # initialize empty status for each run
   status = ''

   # Read  data from spurce try to read otherwise through exception

   #print(url_link)
   #print("Reading via ", source_info)
   # Read module data

   try: 
        print("Reading via ", source_info)
        # Read module data 
        if source_type == 'CSV':
           jdbcDF = spark.read\
                       .format("csv") \
                       .option("header", "true") \
                       .option("quote", "\"") \
                       .option("escape", "\"") \
                       .load(source_info) 

           status= 'read_successful'
           jdbcDF.show()

        elif source_type == 'sqlserver':
            jdbcDF = spark.read\
                .format("jdbc") \
                .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
                .option("query", source_info) \
                .option("user", user) \
                .option("password", password) \
                .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                .load() 

        #Try to Write Extracted data relevant to destination table
        try:
          jdbcDF.write \
                  .format("jdbc") \
                  .option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+destination_db+"};") \
                  .option("dbtable", destination_schema+"."+destination_table) \
                  .option("user", user) \
                  .option("password", password) \
                  .option("truncate", "true") \
                  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
                  .mode("overwrite")\
                  .save()

          status = 'completed'
          print("Write Successful")
          #---------------UPDATE Success Status---------------#
          cursor.\
              execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                      SET [status]=\'''' 
                      +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
          conn.commit()
          #---------------UPDATE Success Status---------------#

        #except of Write Extracted data relevant to destination table
        #---------------UPDATE Success Status---------------#
        except Exception as e :
              print('some error in writing')
              status = 'error in writing to destination db, '+str(e)
              #---------------UPDATE Error Status---------------#
              cursor.\
                  execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                          SET [status]=\'''' 
                          +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
              conn.commit()
              #---------------UPDATE Error Status---------------#
    #except of Read module data
   except Exception as e :
         print("some error in reading from source")
         status = 'error reading source , '+str(e)
         print(status)
         #---------------UPDATE Error Status---------------#
         cursor.\
             execute('''UPDATE  [TEST_DWH].[dbo].[etl_metadata_schedule] 
                     SET [status]=\'''' 
                     +status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
         conn.commit()
         #---------------UPDATE Error Status---------------#

Enter fullscreen mode Exit fullscreen mode

Latest comments (2)

Collapse
 
wayofprogramming profile image
Way Of Programming

Great contribution with easy documentation.
Thanks
This way could save time by reducing the need of writing ETL for each table

Collapse
 
mustafasajid profile image
mustafasajid

Thanks <3