DEV Community

Rahul Dubey
Rahul Dubey

Posted on

Using DAG to deal with Zip file for Snowflake

In this article, we are going to cover how to deal with Zip files when loading into Snowflake using Airflow in GCP Composer. Its a typical data pipeline, but still can be tricky to deal with if you are a beginner or never dealt with ETL altogether.

Before going further, we assume that you have the following services enabled:

  • Google Cloud Platform Account
  • Snowflake Account
  • GCP Composer Environment with basic integration setup with Snowflake
  • Code Editor
  • Python Installed

Case Study
Suppose, you are working for a renowned organization that has recently shifted their data platform from BigQuery to Snowflake. Now all your organization's data is housed in Snowflake and all BI/DataOps happens exclusively in Snowflake.

One fine morning, you are assigned a task to build a data pipeline to do Attribution Analytics. All the Attribution data is dropped in GCS bucket in the form of Zip files from organization's partner. I know what you are thinking, you can just create a 'COPY INTO ' statement with a File Format enabling 'COMPRESSION=ZIP'. But this is not true, you can't use 'ZIP' directly in File Format. Also you can't use 'DEFLATE' type.

What to do?
You can utilize the GCP's capabilities to orchestrate and automate the data loading. But first, you have to ensure that there is an integration Object created in Snowflake to GCP Bucket. After this you create an external stage to locate the path in GCS bucket for direct loading.

Once the above things are taken care of, you can now implement a DAG script for GCP Composer. GCP Composer is a managed Apache Airflow service, which enables quick deployment of Airflow on top of Kubernetes.

Airflow to the rescue
Apache Airflow is an elegant task scheduling service that allow Data Operation to be handled in effective and efficient manner. It provides an intuitive Web UI which allows users to manage task workflows. You can also create parallel workflows without the headache of creating own custom application dealing with Multiprocessing, Threads, Concurrency.

Enough with the introductions, let's start coding.

First you have to open you code editor and create a Python file. Name it as DAG_Sflk_loader.py.

After the above step, import all the necessary packages.

from datetime import datetime,timedelta,date
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
import pandas as pd
from google.cloud import storage
import zipfile
import io
Enter fullscreen mode Exit fullscreen mode

To declare a DAG script, you have to use DAG object from airflow package like this:

default_args = {
    'owner': 'ORGANIZATION',
    'start_date': datetime(2023, 2, 19),
    'email': ['username@email.com'],
    'email_on_failure': True,
    'email_on_retry': False
}

dag = DAG('SFLK_ZIP_LOAD', description = 'This DAG is loads ZIP files to snowflake', max_active_runs=1, catchup=False,default_args=default_args)
Enter fullscreen mode Exit fullscreen mode

In the above code snippet, first we define the arguments to be passed to DAG object like 'owner','start_date','email','email_on_failure' etc. After this we create a Context Manager for data pipeline using DAG object.

Alright, now is the time to start defining custom tasks in Python and Snowflake. For this, we use Operators. Operators are individual task units that can be of any kind like Snowflake SQL, Python, Bash command, GSUtil etc. For our discussion, we will only use Python and Snowflake Operator.

We will distribute our data pipeline in the following way:

TRUNCATE_TABLE_TASK --> UNZIP_FILES_IN_GCS_TASK --> LOAD_FILES_TO_TABLE_TASK
Enter fullscreen mode Exit fullscreen mode

TRUNCATE TASK
Before loading the data to Snowflake, we will first truncate the table. This is an incremental load, so we won't be using TRUNCATE TABLE <TABLE_NAME> directly. We will just delete data for CURRENT_DATE if present.

TRUNC_QUERY = '''DELETE FROM <DATABASE_NAME.SCHEMA_NAME.TABLE_NAME> WHERE <DATE_FIELD> = CURRENT_DATE'''

trunc_task = SnowflakeOperator(
               task_id='TRUNCATE_TASK',
               sql=[TRUNC_QUERY],
               snowflake_conn_id='<connection_id>',
               database='<DATABASE_NAME>',
               schema='<SCHEMA_NAME>',
               warehouse = '<DATAWAREHOUSE_NAME>',
               role = '<ROLE_NAME>',
               dag=dag) 
Enter fullscreen mode Exit fullscreen mode

UNZIPPING TASK

For unzipping files in GCS bucket, we will use three libraries

  • zipfile
  • io
  • google-cloud-storage

Here, we define this task as a Python callable. This callable is used by Python Operator.

def unzip_file_in_gcs(**context):
    #Define GCS Client parameters
    bucket_name = '<BUCKET_NAME>'
    file_name = '<FILE_NAME>.zip'

    # Connect to the GCS bucket
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)

    # Download the zip file to memory
    zip_file_content = blob.download_as_string()

    # Unzip the file
    zip_file = zipfile.ZipFile(io.BytesIO(zip_file_content))
    zip_file.extractall(path='/home/airflow/gcs/data/temp/')

    # Upload each file in the zip to the GCS bucket
    with open('/home/airflow/gcs/data/temp/<FILE_NAME>.csv', 'rb') as f:
        file_content = f.read()
        new_blob = bucket.blob('<FILE_NAME>.csv')
        new_blob.upload_from_string(file_content)

unzip_task = PythonOperator(
        task_id="UNZIP_TASK",
        python_callable=unzip_file_in_gcs,
        provide_context=True,
        dag=dag
    )
Enter fullscreen mode Exit fullscreen mode

LOADING TASK
Once unzipping is done, now you can use COPY INTO <TABLE_NAME> statement to load the data into Snowflake table.

Here is the task definition:

LOAD_QUERY = '''COPY INTO <DATABASE_NAME>.<SCHEMA_NAME>.<TABLE_NAME> FROM @<DATABASE_NAME>.<SCHEMA_NAME>.<STAGE_NAME>/<FILE_NAME>.csv
file_format = (format_name = <DATABASE_NAME>.<SCHEMA_NAME>.FF_CSV)'''

load_task = SnowflakeOperator(
        task_id='LOAD_TASK',
        sql=[LOAD_QUERY],
        snowflake_conn_id='<connection_id>',
        database='<DATABASE_NAME>',
        schema='<SCHEMA_NAME>',
        warehouse = '<DATAWAREHOUSE_NAME>',
        role = '<ROLE_NAME>',
        dag=dag) 
Enter fullscreen mode Exit fullscreen mode

Providing Task Flow
At last, you have to bring all the tasks together in one liner. Use this code snippet at the end:

with dag:
  trunc_task >> unzip_task >> load_task
Enter fullscreen mode Exit fullscreen mode

Upload and Run the DAG script
Now upload the DAG script you just created into GCS bucket attached to Composer environment bucket. Apache Airflow WebUI will automatically reflect the new DAG after few minutes and will start running.

Conclusion
In this article, we learnt how to load zip files stored in GCS bucket using Apache Airflow into Snowflake table. We also went through the creation and deployment of DAG in GCS Composer. For future discussions, we will explore other integration methods in Apache Airflow.

Till then, Goodbye!!

Top comments (0)