DEV Community

Eric Katumo for LuxDevHQ

Posted on

6 1 1

Building an Automated Bitcoin Price ETL Pipeline with Airflow and PostgreSQL

Introduction

This article details creating an automated ETL (Extract, Transform, Load) pipeline that retrieves daily Bitcoin price data from the Polygon.io API, performs necessary transformations, and loads the data into a PostgreSQL database. The workflow is orchestrated using Apache Airflow, ensuring reliable daily execution.

This project demonstrates several key data engineering concepts:

  • API data extraction
  • Data transformation using pandas
  • Database integration with PostgreSQL
  • Workflow orchestration with Apache Airflow
  • Deployment to a cloud environment

System Architecture

The pipeline consists of the following components:

  1. Data Source: Polygon.io API providing cryptocurrency price data
  2. ETL Script: Python script that handles extraction, transformation, and loading
  3. Database: PostgreSQL for data storage
  4. Orchestration: Apache Airflow for scheduling and monitoring
  5. Infrastructure: Cloud VM for hosting the pipeline

The system flows in a linear fashion: Airflow triggers the ETL script daily, which extracts the latest BTC prices, transforms the data into a suitable format, and loads it into the PostgreSQL database.

Detailed Implementation

Step 1: Creating the ETL Script

The first component is btc_prices.py, which handles the core ETL functionality:

import requests
import os
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv

# Define API endpoint
url = 'https://api.polygon.io/v1/open-close/crypto/BTC/USD/2025-03-31?adjusted=true&apiKey=YOUR_API_KEY'
response = requests.get(url)

if response.status_code == 200:
    data = response.json()
    open_price = data.get('open')
    close_price = data.get('close')
    date = data.get('day')
    symbol = data.get('symbol')
else:
    print(f"Failed to retrieve data: {response.status}")
    exit()

# Prepare data for insertion
data_df = {
    'symbol': symbol,
    'open_price': open_price,
    'close_price': close_price,
    'date': date
}
df = pd.DataFrame(data_df, index=[0])
df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')

# Load environment variables
load_dotenv()
dbname = os.getenv('dbname')
user = os.getenv('user')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')

# Create database connection
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{dbname}')

df.to_sql("crypto_prices", con=engine, if_exists="append", index=False, schema="dataengineering")
print(f"Successfully loaded crypto data for {df['date'][0]}")
Enter fullscreen mode Exit fullscreen mode

This script:

  • Extracts Bitcoin price data from the Polygon.io API
  • Transforms and structures the data using pandas
  • Loads the data into PostgreSQL
  • Uses environment variables for secure database connection management

Step 2: Creating the Airflow DAG

Next, the btc_dag.py defines the Airflow DAG (Directed Acyclic Graph) that orchestrates the workflow:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# DAG default arguments
default_args = {
    "owner": "data_engineer",
    "depends_on_past": False,
    "start_date": datetime(2025, 3, 31),
    "email_on_failure": False,
    "email_on_retry": True,
    "retries": 2,
    "retry_delay": timedelta(minutes=2)
}

with DAG(
    'polygon_btc_data',
    default_args=default_args,
    schedule_interval='@daily',
) as dag:

    activate_venv = BashOperator(
        task_id='activate_virtual_env',
        bash_command='source /home/user/project/venv/bin/activate',
    )

    execute_file = BashOperator(
        task_id='execute_python_file',
        bash_command='python /home/user/project/btc_prices.py',
    )

    activate_venv >> execute_file
Enter fullscreen mode Exit fullscreen mode

This DAG:

  • Defines the execution schedule
  • Activates the virtual environment
  • Executes the ETL script

Step 3: Setting Up the Environment

  1. Creating a Virtual Environment:
   python -m venv venv
   source venv/bin/activate
Enter fullscreen mode Exit fullscreen mode
  1. Installing Dependencies:
   pip install requests pandas sqlalchemy python-dotenv psycopg2-binary apache-airflow
Enter fullscreen mode Exit fullscreen mode
  1. Setting Up Environment Variables:
   echo "dbname=your_database_name" >> .env
   echo "user=your_database_user" >> .env
   echo "password=your_database_password" >> .env
   echo "host=your_database_host" >> .env
   echo "port=your_database_port" >> .env
Enter fullscreen mode Exit fullscreen mode

Step 4: Server Deployment

  1. SSH into the cloud VM:
   ssh user@your_server_ip
Enter fullscreen mode Exit fullscreen mode
  1. Create necessary directories:
   mkdir -p ~/crypto_price
   mkdir -p ~/airflow/dags
Enter fullscreen mode Exit fullscreen mode
  1. Transfer scripts to the server:
   scp btc_prices.py user@your_server_ip:~/crypto_price/
   scp btc_dag.py user@your_server_ip:~/airflow/dags/
Enter fullscreen mode Exit fullscreen mode

Step 5: PostgreSQL Configuration

  1. Creating Database Schema:
   CREATE SCHEMA IF NOT EXISTS dataengineering;

   CREATE TABLE IF NOT EXISTS dataengineering.crypto_prices (
       id SERIAL PRIMARY KEY,
       symbol VARCHAR(10) NOT NULL,
       open_price NUMERIC(20, 8) NOT NULL,
       close_price NUMERIC(20, 8) NOT NULL,
       date DATE NOT NULL,
       created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
   );
Enter fullscreen mode Exit fullscreen mode

Conclusion

The architecture follows best practices for data engineering:

  • Separation of extraction, transformation, and loading concerns
  • Secure credential management
  • Robust error handling
  • Automated scheduling
  • Cloud-based deployment

The combination of Python, Airflow, and PostgreSQL provides a powerful foundation for financial data analysis, enabling timely insights into cryptocurrency market trends.

Github

Retry later

Top comments (0)

Retry later
Retry later