Introduction
This article details creating an automated ETL (Extract, Transform, Load) pipeline that retrieves daily Bitcoin price data from the Polygon.io API, performs necessary transformations, and loads the data into a PostgreSQL database. The workflow is orchestrated using Apache Airflow, ensuring reliable daily execution.
This project demonstrates several key data engineering concepts:
- API data extraction
- Data transformation using pandas
- Database integration with PostgreSQL
- Workflow orchestration with Apache Airflow
- Deployment to a cloud environment
System Architecture
The pipeline consists of the following components:
- Data Source: Polygon.io API providing cryptocurrency price data
- ETL Script: Python script that handles extraction, transformation, and loading
- Database: PostgreSQL for data storage
- Orchestration: Apache Airflow for scheduling and monitoring
- Infrastructure: Cloud VM for hosting the pipeline
The system flows in a linear fashion: Airflow triggers the ETL script daily, which extracts the latest BTC prices, transforms the data into a suitable format, and loads it into the PostgreSQL database.
Detailed Implementation
Step 1: Creating the ETL Script
The first component is btc_prices.py
, which handles the core ETL functionality:
import requests
import os
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
# Define API endpoint
url = 'https://api.polygon.io/v1/open-close/crypto/BTC/USD/2025-03-31?adjusted=true&apiKey=YOUR_API_KEY'
response = requests.get(url)
if response.status_code == 200:
data = response.json()
open_price = data.get('open')
close_price = data.get('close')
date = data.get('day')
symbol = data.get('symbol')
else:
print(f"Failed to retrieve data: {response.status}")
exit()
# Prepare data for insertion
data_df = {
'symbol': symbol,
'open_price': open_price,
'close_price': close_price,
'date': date
}
df = pd.DataFrame(data_df, index=[0])
df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')
# Load environment variables
load_dotenv()
dbname = os.getenv('dbname')
user = os.getenv('user')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')
# Create database connection
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{dbname}')
df.to_sql("crypto_prices", con=engine, if_exists="append", index=False, schema="dataengineering")
print(f"Successfully loaded crypto data for {df['date'][0]}")
This script:
- Extracts Bitcoin price data from the Polygon.io API
- Transforms and structures the data using pandas
- Loads the data into PostgreSQL
- Uses environment variables for secure database connection management
Step 2: Creating the Airflow DAG
Next, the btc_dag.py
defines the Airflow DAG (Directed Acyclic Graph) that orchestrates the workflow:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
# DAG default arguments
default_args = {
"owner": "data_engineer",
"depends_on_past": False,
"start_date": datetime(2025, 3, 31),
"email_on_failure": False,
"email_on_retry": True,
"retries": 2,
"retry_delay": timedelta(minutes=2)
}
with DAG(
'polygon_btc_data',
default_args=default_args,
schedule_interval='@daily',
) as dag:
activate_venv = BashOperator(
task_id='activate_virtual_env',
bash_command='source /home/user/project/venv/bin/activate',
)
execute_file = BashOperator(
task_id='execute_python_file',
bash_command='python /home/user/project/btc_prices.py',
)
activate_venv >> execute_file
This DAG:
- Defines the execution schedule
- Activates the virtual environment
- Executes the ETL script
Step 3: Setting Up the Environment
- Creating a Virtual Environment:
python -m venv venv
source venv/bin/activate
- Installing Dependencies:
pip install requests pandas sqlalchemy python-dotenv psycopg2-binary apache-airflow
- Setting Up Environment Variables:
echo "dbname=your_database_name" >> .env
echo "user=your_database_user" >> .env
echo "password=your_database_password" >> .env
echo "host=your_database_host" >> .env
echo "port=your_database_port" >> .env
Step 4: Server Deployment
- SSH into the cloud VM:
ssh user@your_server_ip
- Create necessary directories:
mkdir -p ~/crypto_price
mkdir -p ~/airflow/dags
- Transfer scripts to the server:
scp btc_prices.py user@your_server_ip:~/crypto_price/
scp btc_dag.py user@your_server_ip:~/airflow/dags/
Step 5: PostgreSQL Configuration
- Creating Database Schema:
CREATE SCHEMA IF NOT EXISTS dataengineering;
CREATE TABLE IF NOT EXISTS dataengineering.crypto_prices (
id SERIAL PRIMARY KEY,
symbol VARCHAR(10) NOT NULL,
open_price NUMERIC(20, 8) NOT NULL,
close_price NUMERIC(20, 8) NOT NULL,
date DATE NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Conclusion
The architecture follows best practices for data engineering:
- Separation of extraction, transformation, and loading concerns
- Secure credential management
- Robust error handling
- Automated scheduling
- Cloud-based deployment
The combination of Python, Airflow, and PostgreSQL provides a powerful foundation for financial data analysis, enabling timely insights into cryptocurrency market trends.
Top comments (0)