AWS Forecast is a fully managed AWS service for time-series analysis. It can select from multiple time series prediction models to find the best one for your particular data sets. Amazon Forecast automatically examines the historical data provided (including any additional features that can impact the forecast), and identify what is meaningful, and produce a forecasting model capable of making highly accurate predictions.
This blog illustrates the use of the AWS Forecast service using the Manning Dataset. This is part of the FbProphet Library example dataset which is a time series of the Wikipedia search frequencies (log-transformed) for Peyton Manning taken over an 8 year period. For further background on the dataset and library please refer to this research paper
American Football begins in September and ends in early January every year, with playoffs scheduled thereafter. Games tend to be televised on Sunday and Monday nights. By using historical data of wikipedia page hits of a popular player like Peyton Manning, we could model weekly and yearly seasonality of wikipedia page hits and use that as an indication of football hype. We may expect to see a lot more activity during the playoff season and superbowl for example.
We will perform all the steps in the workflow below in python, including processing data, importing data into AWS Forecast training the model and generating forecasts.
The modules and functions referenced in the code blocks in this exercise can be accessed from my github repository. These contain helper functions for importing data into S3, creating an AWS forecast dataset and importing data into it from S3, training a predictor and then forecasting using the model using the AWS SDK for Python(Boto). The code snippets and outputs used in the next few sections, can also be accessed in the notebook.
Data Processing
The dataset is first filtered to only include historical data for one year (2015) and then reformatted to have columns (timestamp
, target_value
,item_id
) and values as expected by AWS Forecast api. The dataset is then saved in csv format.
Assign the local path to the directory containing the dataset to the variable DIR_PATH
.
from pathlib import Path
import re
DIR_PATH="/e/projects/aws_forecast/dataset/"
def read_raw_csv(path):
p = Path(path)
for x in p.iterdir():
if x.suffix == ".csv":
m = re.search(r"(?<=_)\w+", x.name)
assert m.group(0) == "manning"
basepath = str(x)
print(f"Reading data from {basepath}")
df = dd.read_csv(basepath)
return df, basepath
def parse_dt_to_year(df):
df["year"] = dd.to_datetime(df["ds"], format="%Y-%m-%d").dt.year
return df
def filter_df_by_year(df, year=2015):
df_cleaned = df.loc[df["year"] >= year, ["ds", "y"]].reset_index(drop=True)
return df_cleaned
def reformat_for_aws_forecast(df):
df_renamed = df.rename(columns={"ds": "timestamp", "y": "target_value"})
df_renamed["item_id"] = "1"
return df_renamed
def save_data_for_s3(df, basepath, filename):
filepath = Path(basepath).parents[0].joinpath("data", filename)
print(filepath)
p = filepath.parents[0]
destination_path = str(filepath)
p.mkdir(parents=True, exist_ok=True)
df.to_csv(destination_path, index=False)
return destination_path
df, basepath = read_raw_csv(DIR_PATH)
df_parsed = parse_dt_to_year(df)
df_filtered = filter_df_by_year(df_parsed)
df_renamed = reformat_for_aws_forecast(df_filtered)
The functions in the code block below create an S3 bucket and calls the s3 PutObject api to add the csv file to created S3 bucket. You must have s3:CreateBucket
and s3:PutObject
in your IAM permissions. At the end of this tutorial, if you intend to delete the bucket and all its objects then you will need to have s3:DeleteBucket
and s3:DeleteObject
permissions as well.
import boto3
s3_client = boto3.client("s3", region_name="us-east-1")
def create_bucket(bucket_name):
try:
response = s3_client.create_bucket(Bucket=bucket_name)
print(response)
except s3_client.BucketAlreadyExists as e:
print(f"bucket {bucket_name} already exists")
def put_object_in_s3_bucket(bucket_name, filepath):
create_bucket(s3_client, bucket_name)
filename = Path(filepath).name
s3_client.upload_file(filepath, bucket_name, filename)
The time series dataset imported into S3 has the following profile. This will now be imported into AWS forecast.
The first step is to create an AWS Forecast dataset group and dataset as described in AWS docs. Here we only use the target time series dataset type. The dataset group must include a target time series dataset which includes the target attribute (item_id) and timestamp attribute, as well as any dimensions. Related time series and Item metadata is optional. When pre-processing the data before uploading to S3, an itemid
column was created and set to an arbitary value as all the items belong to the same group (i.e Manning’s wikipedia hits)
Before doing this, we will need to grant permission to be able to access AWS Forecast and all its resources and supported operations. We can attach the AmazonForecastFullAccess
AWS managed policy to the IAM user (see AWS Managed (Predefined) Policies for Amazon Forecast
section in the AWS docs). Alternatively, if you want to add a custom policy to the user, you can do something similar as the json policy shown below which is also shown in example 2
in the following [AWS docs(https://docs.aws.amazon.com/forecast/latest/dg/security_iam_id-based-policy-examples.html)
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "forecast:*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "iam:PassRole" ], "Resource": "*", "Condition": { "StringEquals": { "iam:PassedToService": "forecast.amazonaws.com" } } } ] }
We will now create a boto client interface object to manage and create the AWS Forecast resources. Then the dataset and dataset group resources can be created using the snippet below after setting the data frequency for daily frequency and defining the schema.
import boto3
forecast = boto3.client("forecast")
DATASET_FREQUENCY = "D"
ts_schema ={
"Attributes":[
{
"AttributeName":"timestamp",
"AttributeType":"timestamp"
},
{
"AttributeName":"target_value",
"AttributeType":"float"
},
{
"AttributeName":"item_id",
"AttributeType":"string"
}
]
}
PROJECT = 'manning_ts'
DATA_VERSION = 1
def create_dataset(dataset_name, freq, schema):
response = forecast.create_dataset(
Domain="CUSTOM",
DatasetType="TARGET_TIME_SERIES",
DatasetName=dataset_name,
DataFrequency=freq,
Schema=schema,
)
dataset_arn = response["DatasetArn"]
print(forecast.describe_dataset(DatasetArn=dataset_arn))
return dataset_arn
def create_dataset_group_with_dataset(dataset_name, dataset_arn):
dataset_arns = [dataset_arn]
try:
create_dataset_group_response = forecast.create_dataset_group(
Domain="CUSTOM", DatasetGroupName=dataset_name, DatasetArns=dataset_arns
)
dataset_group_arn = create_dataset_group_response["DatasetGroupArn"]
return dataset_group_arn
except forecast.exceptions.ResourceAlreadyExistsException:
print("Dataset group already exists")
dataset_name = f"{PROJECT}_{DATA_VERSION}"
dataset_arn = create_dataset(dataset_name, DATASET_FREQUENCY, ts_schema)
dataset_group_arn = create_dataset_group_with_dataset(dataset_name, dataset_arn)
This will create the dataset, add the dataset to the dataset group. Each dataset created needs to be associated with a domain, which can be predefined for a specific use case (e.g. retail, web traffic forecasting, work force planning etc) or a custom domain if the use case does not fall into one of the pre-defined categories. In this case, we will select the CUSTOM domain. For a list of available domains, please refer to the AWS documentation.
In addition, we are only interested in a single time series, so we will use the TARGET_TIME_SERIES
for the DatasetType
,
which requires the item_id
, timestamp
and target_value
defined in the schema attributes
Next, we will create the import job but before that we will need to create a new role for AWS Forecast to assume, which would give AWS Forecast permissions to access S3 on your behalf. The policy will be similar to below. You can amend the arn if name of the bucket you have chosen is something other than manning_dataset
{
"Version":"2012-10-17",
"Statement":[
{
"Effect":"Allow",
"Action":[
"s3:Get*",
"s3:List*",
"s3:PutObject"
],
"Resource":[
"arn:aws:s3:::manning_dataset",
"arn:aws:s3:::manning_dataset/*"
]
}
]
}
and the trust relationship as below. This can be further restricted to a specific source account(s) if needed and aws forecast arn. Please refer to the following doc to do this from the console or aws cli.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "forecast.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
We then create an import job to import the time series dataset from s3 into AWS forecast dataset so it is ready for training.
bucket_name = 'aws-forecast-demo-examples'
key = "manning_ts_2015.csv"
def create_import_job(
bucket_name, key, dataset_arn, role_arn, import_job_name, timestamp_format
):
ts_s3_data_path = "s3://" + bucket_name + "/" + key
print(f"S3 URI for your data file = {ts_s3_data_path}")
ts_dataset_import_job_response = forecast.create_dataset_import_job(
DatasetImportJobName=import_job_name,
DatasetArn=dataset_arn,
DataSource={"S3Config": {"Path": ts_s3_data_path, "RoleArn": role_arn}},
TimestampFormat=timestamp_format,
)
return ts_dataset_import_job_response
ts_dataset_import_job_response = create_import_job(bucket_name, key, dataset_arn, role_arn)
dataset_import_job_arn=ts_dataset_import_job_response['DatasetImportJobArn']
After creating the import job, we can check for job status programatically before progressing to the training step, as this can take a while to complete. The check_job_status()
function in the code block will call the
DescribeDatasetImportJob operation to poll for the job status. A successful response will contain a status element which can have one of the following states as described in the docs:
- ACTIVE
- CREATE_PENDING, CREATE_IN_PROGRESS, CREATE_FAILED
- CREATE_STOPPING, CREATE_STOPPED
- DELETE_PENDING, DELETE_IN_PROGRESS, DELETE_FAILED
We can create a script like in the code block below, which polls for the job status, with a wait time of around 60 seconds between operation calls if the previous response returned a job status which is anything other than ACTIVE
. If the response status returns ACTIVE
then it
import time
job_status = forecast.describe_dataset_import_job(DatasetImportJobArn=arn)[
"Status"
]
while job_status != "ACTIVE":
time.sleep(60)
job_status = forecast.describe_dataset_import_job(DatasetImportJobArn=arn)[
"Status"
]
print(f"Data Import job complete with job status {job_status}")
Model Training
In this section, we will create a predictor (an Amazon Forecast model) that is trained using the target time series. Predictors can be used to generate forecasts based on the imported time-series data.
Amazon Forecast requires the Dataset group, Forecast frequency and Forecast horizon inputs, when training predictors. Hence these are passed into the custom functions for creating the predictor, with the following settings:
- Dataset group as defined previously
- Forecast frequency represents the granularity of the forecasts which in this case is daily.
- Forecast horizon is the number of time steps being forecast. Here, this is set to 35 days)
In the code block below, we have created a function train_aws_forecast_model
that runs auto_ml
by default i.e. trains an optimal combination of algorithms to each time series in the dataset. However, if the auto_ml
parameter is set to False
, it will run manual selection based on the algorithm parameter set. This defaults to Non Parametric Time Series
but can be overidden.
FORECAST_LENGTH = 35
DATASET_FREQUENCY = "D"
PROJECT = 'manning_ts'
DATA_VERSION = 1
predictor_name = f"{PROJECT}_{DATA_VERSION}_automl"
def train_aws_forecast_model(
predictor_name,
forecast_length,
dataset_frequency,
dataset_group_arn,
auto_ml=True,
explain=False,
algorithm="NPTS",
backtest_windows=1,
holidays_code="US",
):
if auto_ml:
create_predictor_response = forecast.create_auto_predictor(
PredictorName=predictor_name,
ForecastHorizon=forecast_length,
ForecastFrequency=dataset_frequency,
ExplainPredictor=explain,
DataConfig={
"DatasetGroupArn": dataset_group_arn,
"AttributeConfigs": [
{
"AttributeName": "target_value",
"Transformations": {
"aggregation": "sum",
"middlefill": "zero",
"backfill": "zero",
},
},
],
"AdditionalDatasets": [
{
"Name": "holiday",
"Configuration": {"CountryCode": [holidays_code]},
}
],
},
)
else:
create_predictor_response = forecast.create_predictor(
PredictorName=predictor_name,
ForecastHorizon=forecast_length,
AlgorithmArn=f"arn:aws:forecast:::algorithm/{algorithm}",
EvaluationParameters={"NumberOfBacktestWindows": backtest_windows,},
InputDataConfig={
"DatasetGroupArn": dataset_group_arn,
"SupplementaryFeatures": [{"Name": "holiday", "Value": holidays_code}],
},
FeaturizationConfig={"ForecastFrequency": dataset_frequency},
)
predictor_arn = create_predictor_response["PredictorArn"]
return create_predictor_response, predictor_arn
create_predictor_response , predictor_arn = train_aws_forecast_model(predictor_name, FORECAST_LENGTH, DATASET_FREQUENCY, dataset_group_arn)
Note The code block above is calling the legacy predictor CreatePredictor
api for manual selection. We could also upgrade to use AutoPredictor
to create predictors as suggested in the AWS docs. This has two advantages. Forecast Explainability and predictor retraining features are only available for predictors created with AutoPredictor. AutoPredictor is the default and preferred method to create a predictor with Amazon Forecast as the predictors are more accurate. It applies the optimal combination of algorithms to the time series in the dataset as described in the docs. If the script is run without changing any parameters, it will create predictors using AutoPredictor
.
Backtest Results
Amazon Forecast uses backtesting to compute metrics, for evaluating predictors. Some of these are summarised from the AWS docs in the table below, along with the common use cases for applying each one.
Metric | Definition | When to use |
---|---|---|
RMSE | square root of the average of squared errors. It is sensitive to large deviations (outliers) between the actual demand and forecasted values. | Useful in cases when you want to penalise outliers where a few large incorrect predictions from a model on some items can be very costly to the business. For sparse datasets where demand for items in historical data is low, it would be better to use WAPE or wQL instead as RMSE will not account for scale of total demand |
wQL | measures accuracy of a model at a specified quantile. An extension of this is the Average wQL metric which is the mean of wQL values for all quantiles (forecast types) selected during predictor creation. | wQL is particularly useful when there are different costs for underpredicting and overpredicting. By setting the weight of the wQL function, you can automatically incorporate differing penalties for underpredicting and overpredicting. The Average wQL can be used for evaluating forecasts at multiple quantiles together. |
MASE | divides the average error by a scaling factor which is dependent on the seasonality value, that is selected based on the forecast frequency. MASE is a scale-free metric, which makes it useful for comparing models from different datasets. MASE values can be used to meaningfully compare forecast error across different datasets regardless of the scale of total demand. | ideal for datasets that are cyclical in nature or have seasonal properties e.g. forecasting for products that are in high demand in summer compared to winter |
MAPE | percentage difference of the mean forecasted and actual value) averaged over all time points. The normalization in the MAPE allows this metric to be compared across datasets with different scales. | useful for datasets where forecasting errors need to be emphasised equally on all items regardless of demand. It also equally penalises for under-forecasting or over-forecasting, so useful metric to use when the difference in costs of under-forecasting or over-forecasting is negligible |
WAPE | sum of the absolute error normalized by the total demand. A high total demand results in a low WAPE and vice versa.The weighting allows these metrics to be compared across datasets with different scales. | useful in evaluating datasets that contain a mix of items with large and small demand. A retailer may want to prioritise forecasting errors for standard items with high sales compared to special edition items which are sold infrequently. WAPE would be a good choice in such a case. For sparse datasets where a large proportion of products are sold infrequently (i.e. demand is 0 for most of the historical data), WAPE would be a better choice compared to RMSE as it accounts for the total scale of demand. |
The metrics are provided for each backtest window specified. For multiple backtest windows, the metrics are averaged across all the windows. The user can adjust the backtest window length (testing set) and the number of backtests (can vary from 1 to 5) when training a predictor as described in the docs. However, the backtest window length must be at least as large as the prediction window or forecast horizon (this is the default setting if not overriden by the user). It also cannot exceed more that half the length of the entire time series. The metrics are computed from the forecasted values and observed values during backtesting. Missing values which are filled in the dataset using one of the AWS forecast supported methods (see Missing value section), are not used when computing the metrics as they are not classed as observed values.
We can generate the backtesting metrics by calling the function with the predictor arn value, which calls the get_accuracy_metrics method from the forecast api.
def evaluate_backtesting_metrics(predictor_arn):
error_metrics =
forecast.get_accuracy_metrics(PredictorArn=predictor_arn)
print(error_metrics["PredictorEvaluationResults"])
return error_metrics
error_metrics = evaluate_backtesting_metrics(predictor_arn)
We can also create a function to plot the backtest results for all the metrics, using the pandas.DataFrame.plot
(https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.plot.html) method.
import numpy as np
import pandas as pd
import json
def plot_backtest_metrics(error_metrics):
parsed_json = {
"Algorithm": [],
"WQuantLosses": [],
"WAPE": [],
"RMSE": [],
"MASE": [],
"MAPE": [],
"AvgWQuantLoss": [],
}
for v in error_metrics:
algo = v["AlgorithmArn"].split("/")[-1]
weighted_quantile_losses = v["TestWindows"][0]["Metrics"][
"WeightedQuantileLosses"
]
wape = v["TestWindows"][0]["Metrics"]["ErrorMetrics"][0]["WAPE"]
rmse = v["TestWindows"][0]["Metrics"]["ErrorMetrics"][0]["RMSE"]
mase = v["TestWindows"][0]["Metrics"]["ErrorMetrics"][0]["MASE"]
mape = v["TestWindows"][0]["Metrics"]["ErrorMetrics"][0]["MAPE"]
avg_weighted_quantile_losses = v["TestWindows"][0]["Metrics"][
"AverageWeightedQuantileLoss"
]
parsed_json["Algorithm"].append(algo)
parsed_json["WQuantLosses"].append(json.dumps(weighted_quantile_losses))
parsed_json["WAPE"].append(np.round(wape, 4))
parsed_json["RMSE"].append(np.round(rmse, 4))
parsed_json["MASE"].append(np.round(mase, 4))
parsed_json["MAPE"].append(np.round(mape, 4))
parsed_json["AvgWQuantLoss"].append(np.round(avg_weighted_quantile_losses, 4))
df = (
pd.DataFrame(parsed_json)
.set_index("Algorithm")
.T.rename_axis("Metric", axis=0)
.rename_axis(None, axis=1)
.reset_index()
)
df.iloc[1::, :].plot(x="Metric", kind="bar", figsize=(15, 8), legend=True)
return df
plot_backtest_metrics(error_metrics)
Looking at the results, seems like Non-Parametric Time Series (NPTS) is the winning algorithm followed by DeepAR+. So AWS Forecast, will use the NPTS model for serving forecasts. We can also see MASE metric better highlights the difference in performance between various algorithms as it is more suited to this dataset due to cyclical/seasonal properties in data
Forecast and Query
Now we have a trained model so we can create a forecast. This includes predictions for every item (item_id) in the dataset group that was used to train the predictor.
import boto3
forecast = boto3.client("forecast")
PROJECT = 'manning_ts'
DATA_VERSION = 1
def create_forecast(forecast_name, predictor_arn):
create_forecast_response = forecast.create_forecast(
ForecastName=forecast_name, PredictorArn=predictor_arn
)
forecast_arn = create_forecast_response["ForecastArn"]
print(forecast_arn)
return forecast_arn
forecast_name = f"{PROJECT}_{DATA_VERSION}_automl_forecast"
forecast_arn = create_forecast(forecast_name, predictor_arn)
Once this is done, we can then query the forecast by passing a filter (key-value pair), where the key/values are one of the schema attribute names and valid values respectively. This will return forecast for only those items that satisfy the criteria (see docs). In this case, we query the forecast and return all the items by using the item id dimension. We will use the forecast_arn
value returned from the previous code block.
import boto3
forecastquery = boto3.client(service_name="forecastquery")
def run_forecast_query(forecast_arn, filters):
forecast_response = forecastquery.query_forecast(
ForecastArn=forecast_arn, Filters=filters
)
return forecast_response["Forecast"]["Predictions"]
def create_forecast_plot(forecast_response):
ts = {}
timestamp = [k["Timestamp"] for k in forecast_response["p10"]]
p10 = [k["Value"] for k in forecast_response["p10"]]
p50 = [k["Value"] for k in forecast_response["p50"]]
p90 = [k["Value"] for k in forecast_response["p90"]]
ts["timestamp"] = timestamp
ts["p10"] = p10
ts["p50"] = p50
ts["p90"] = p90
df = pd.DataFrame(ts)
df.plot(x="timestamp", figsize=(15, 8))
return df
filters = {"item_id":"1"}
forecast_response = run_forecast_query(forecast_arn, filters)
create_forecast_plot(forecast_response)
We can also further analyse the results stored in S3 directly from Athena or create external tables in Redshift Spectrum which reference database in Glue Data Catalog. A detailed explanation of this is out of scope of this blog but for further reference please consult this AWS blog and the AWS documentation. This will allow simple queries to be run without having to load data into Redshift tables. We can also query the data using Athena as it can access the Glue Data Catalog. When mapping data stored in S3 bucket as external tables, the path to the S3 file needs to be passed as location
in the create external schema query.
Terminating Resources
Finally we can tear down all the AWS Forecast resources (predictor, forecast and dataset group) from the console or programmatically via boto or aws cli. The code block below shows an example of deleting the predictor and forecast resources, named PREDICTOR_NAME
and FORECAST_NAME
.
- First, we use the ListForecast action to return a list of forecasts created.
- We do a check to see if any forecast job exists in the list
- If it does it will check if the
ForecastName
property has the value ofFORECAST_NAME
- if the previous check is true, then it fetches the corresponding arn value from
ForecastArn
property and then calls the DeleteForecast action.
We can repeat the similar process for the deleting the predictor resource.
Note Since forecast is the child resource, generated from the parent predictor resource, it will need to be deleted first. Otherwise an error will be thrown due to the dependency.
forecast_name = 'FORECAST_NAME'
forecast_list = forecast.list_forecasts()["Forecasts"]
if not forecast_list:
print("No forecast job currently exists")
else:
for i in forecast_list:
if i["ForecastName"] == forecast_name:
forecast_arn = i["ForecastArn"]
print(f"Deleting {forecast_arn}")
forecast.delete_forecast(ForecastArn=forecast_arn)
Alternatively, we could also delete an entire resource tree in one operation using the delete_resource_tree operation. This will delete all the associated parent and child resources.
If you want to delete the bucket as well, you can follow these steps. You will need to have the appropriate permissions added to the policy attached to your IAM user.
References
- Importing datasets https://docs.aws.amazon.com/forecast/latest/dg/howitworks-datasets-groups.html
- Training Predictors https://docs.aws.amazon.com/forecast/latest/dg/howitworks-predictor.html
- Predictor metrics and backtesting https://docs.aws.amazon.com/forecast/latest/dg/metrics.html
- Handling missing values https://docs.aws.amazon.com/forecast/latest/dg/howitworks-missing-values.html
- Generating and Querying forecasts https://docs.aws.amazon.com/forecast/latest/dg/howitworks-forecast.html
- Query S3 data from Athena https://aws.amazon.com/blogs/big-data/analyzing-data-in-s3-using-amazon-athena/
- Create External Tables Redshift Spectrum https://aws.amazon.com/premiumsupport/knowledge-center/redshift-spectrum-external-table/
- Facebook Prophet paper using Manning dataset https://peerj.com/preprints/3190/
Top comments (0)