DEV Community

Cover image for Building a Weather Data Pipeline with PySpark, Prefect, and Google Cloud
James Wachuka
James Wachuka

Posted on • Updated on

Building a Weather Data Pipeline with PySpark, Prefect, and Google Cloud

In this article, I'll walk you through how to build a data pipeline that fetches weather data for multiple cities, processes the data using PySpark, and stores the output in Google Cloud.

We'll be using PySpark for distributed data processing, Prefect for workflow management, and Google Cloud Storage and BigQuery for data storage and processing.The code is available on github.
looker dashboard

The pipeline fetches weather data for multiple cities from the OpenWeatherMap API using requests library. The data is then processed and filtered using PySpark, and the output is stored as a CSV file in Google Cloud Storage. Finally, the data is loaded into a BigQuery table for further analysis.

pipeline

data pipeline

from pyspark.sql import SparkSession
from google.cloud import storage
from google.cloud import bigquery
import requests
import json
from google.cloud.exceptions import NotFound
import random


# Define the OpenWeatherMap API key and base URL
api_key = ""
base_url = "https://api.openweathermap.org/data/2.5/weather"

cities_100 = ['New York', 'London', 'Paris', 'Tokyo', 'Sydney', 'Moscow', 'Beijing', 'Rio de Janeiro', 'Mumbai', 'Cairo', 'Rome', 'Berlin', 'Toronto', 'Lagos', 'Bangkok', 'Melbourne', 'Johannesburg']

cities = random.sample(cities_100, 25)

# Initialize a Spark session
spark = SparkSession.builder.appName("WeatherData").getOrCreate()

# Define a function to fetch weather data for a city and return a Spark dataframe
def fetch_weather_data(city):
    # Send a request to the OpenWeatherMap API for the city's weather data
    params = {"q": city, "appid": api_key, "units": "metric"}
    response = requests.get(base_url, params=params)
    data = response.json()

    # Extract the relevant weather data from the API response
    temp = data["main"]["temp"]
    humidity = data["main"]["humidity"]
    wind_speed = data["wind"]["speed"]

    # Create a Spark dataframe with the weather data for the city
    df = spark.createDataFrame([(city, temp, humidity, wind_speed)],
                               ["City", "Temperature", "Humidity", "WindSpeed"])
    return df

# Use the fetch_weather_data function to fetch weather data for all cities and merge them into a single dataframe
weather_data = None
for city in cities:
    city_weather_data = fetch_weather_data(city)
    if weather_data is None:
        weather_data = city_weather_data
    else:
        weather_data = weather_data.union(city_weather_data)

# Perform some basic processing and transformation on the weather data using PySpark
weather_data = weather_data.filter("Temperature > 10") \
                           .groupBy("City") \
                           .agg({"Humidity": "avg", "WindSpeed": "max"}) \
                           .withColumnRenamed("avg(Humidity)", "AverageHumidity") \
                           .withColumnRenamed("max(WindSpeed)", "MaxWindSpeed")

# Show the final processed and transformed weather data
weather_data.show()

# Write the weather data as a CSV file to a Google Cloud Storage bucket
bucket_name = "weather_app_dez"
file_name = "weather_data.csv"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
blob.upload_from_string(weather_data.toPandas().to_csv(index=False), content_type="text/csv")

# Create a new BigQuery table and load the data from the CSV file
table_name = "dez-dtc-23-384116.weather_app.weather_data"
bigquery_client = bigquery.Client()
table = bigquery.Table(table_name)
schema = [bigquery.SchemaField("City", "STRING"),
          bigquery.SchemaField("AverageHumidity", "FLOAT"),
          bigquery.SchemaField("MaxWindSpeed", "FLOAT")]
table.schema = schema
job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True)
job = bigquery_client.load_table_from_uri(f"gs://{bucket_name}/{file_name}", table, job_config=job_config)
job.result()
print(f"Loaded {job.output_rows} rows into BigQuery table {table_name}")

Enter fullscreen mode Exit fullscreen mode

adding prefect functionality

from google.cloud import storage, bigquery
import requests
import json
from google.cloud.exceptions import NotFound
import random
from prefect import task, Flow
from pyspark.sql import SparkSession


# Define the OpenWeatherMap API key and base URL
api_key = ""
base_url = "https://api.openweathermap.org/data/2.5/weather"

spark = SparkSession.builder.appName("WeatherData").getOrCreate()

# Define the cities list
cities_100 = ['New York', 'London', 'Paris', 'Tokyo', 'Sydney', 'Moscow', 'Beijing', 'Rio de Janeiro', 'Mumbai', 'Cairo']
cities = random.sample(cities_100, 5)

# Define a function to fetch weather data for a city and return a Spark dataframe
@task
def fetch_weather_data(cities):
    weather_data = None
    for city in cities:
        # Send a request to the OpenWeatherMap API for the city's weather data
        params = {"q": city, "appid": api_key, "units": "metric"}
        response = requests.get(base_url, params=params)
        data = response.json()

        # Extract the relevant weather data from the API response
        temp = data["main"]["temp"]
        humidity = data["main"]["humidity"]
        wind_speed = data["wind"]["speed"]

        # Create a Spark dataframe with the weather data for the city
        city_weather_data = spark.createDataFrame([(city, temp, humidity, wind_speed)],
                                ["City", "Temperature", "Humidity", "WindSpeed"])
        if weather_data is None:
            weather_data = city_weather_data
        else:
            weather_data = weather_data.union(city_weather_data)
    return weather_data


'''
# Use the fetch_weather_data function to fetch weather data for all cities and merge them into a single dataframe
@task
def merge_weather_data(cities):
    weather_data = None
    for city in cities:
        city_weather_data = fetch_weather_data(city)
        if weather_data is None:
            weather_data = city_weather_data
        else:
            weather_data = weather_data.union(city_weather_data)
    return weather_data
'''
# Perform some basic processing and transformation on the weather data using PySpark
@task
def process_weather_data(weather_data):
    processed_data = weather_data.filter("Temperature > 10") \
                           .groupBy("City") \
                           .agg({"Humidity": "avg", "WindSpeed": "max"}) \
                           .withColumnRenamed("avg(Humidity)", "AverageHumidity") \
                           .withColumnRenamed("max(WindSpeed)", "MaxWindSpeed")
    return processed_data

# Write the weather data as a CSV file to a Google Cloud Storage bucket
@task
def write_weather_data_to_gcs(weather_data):
    bucket_name = "weather_app_dez"
    file_name = "weather_data.csv"
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(file_name)
    blob.upload_from_string(weather_data.toPandas().to_csv(index=False), content_type="text/csv")
    return f"gs://{bucket_name}/{file_name}"

# Create a new BigQuery table and load the data from the CSV file
@task
def write_weather_data_to_bigquery(uri):
    table_name = "dez-dtc-23-384116.weather_app.weather_data"
    bigquery_client = bigquery.Client()
    table = bigquery.Table(table_name)
    schema = [bigquery.SchemaField("City", "STRING"),
              bigquery.SchemaField("AverageHumidity", "FLOAT"),
              bigquery.SchemaField("MaxWindSpeed", "FLOAT")]
    table.schema = schema
    job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.CSV, skip_leading_rows=1, autodetect=True)
    job = bigquery_client.load_table_from_uri(uri, table, job_config=job_config)
    job.result()
    print(f"Loaded {job.output_rows} rows into BigQuery table {table_name}")

with Flow("Weather Data Pipeline") as flow:

    cities = random.sample(cities_100, 5)
    weather_dat = fetch_weather_data(cities)
    processed_data = process_weather_data(weather_dat)
    uri = write_weather_data_to_gcs(processed_data)
    write_weather_data_to_bigquery(uri)

flow.run()
Enter fullscreen mode Exit fullscreen mode

To run the pipeline, you'll need to have a Google Cloud account with billing enabled or free tier, and the necessary API keys for accessing the OpenWeatherMap API.

Configuration

Create a new Google Cloud Storage bucket to store the output data. Create a new BigQuery dataset and table to store the output data. Update the bucket_name and table_name variables in the write_weather_data_to_gcs and write_weather_data_to_bigquery tasks, respectively, with the appropriate names of the bucket and table you created.

Running the Pipeline

To run the pipeline, follow these steps: Open a terminal window and navigate to the project directory. Build the Docker image using the following command: docker build -t weather-data-pipeline . Run the Docker container using the following command: docker run--rm -it -v $(pwd):/app -e GOOGLE_APPLICATION_CREDENTIALS=/app/your-credentials.json weather-data-pipeline Note: Replace your-credentials.json with the name of your Google Cloud Platform service account key file. The pipeline will run and the output data will be written to the Google Cloud Storage bucket and BigQuery table you specified in the configuration step.

Troubleshooting

If you encounter any issues while running the pipeline, please check the following:

Ensure that the Google Cloud Platform credentials you specified are valid and have the appropriate permissions to access GCS and BigQuery. Ensure that the bucket and table names you specified in the configuration step are correct. Check the logs for any error messages that might indicate the cause of the issue.

In conclusion, building a data pipeline that fetches and processes weather data using PySpark, Prefect, and Google Cloud is an exciting project that showcases the power of these technologies. With this pipeline, you can easily collect and analyze weather data for multiple cities, and use it for various applications such as predictive modeling and weather forecasting.

Top comments (0)