TL;DR
In this article, I'll show how I integrated Apache Iceberg with AWS Glue, S3, Athena, Matillion, and Snowflake. We start by collecting data via an API, storing it in S3 as a Parquet file, and then use Glue Crawler and Athena to update the data in an Iceberg table. Matillion orchestrates the entire process, while Snowflake queries the data via an "External Volume." An important step was removing non-ASCII characters from the column names to avoid compatibility issues. In the end, the data can be easily queried in Snowflake. The infrastructure part can be found here.
Working Backwards: The Matillion Orchestration Pipeline
In this case, let's start from the end to see what I implemented regarding Apache Iceberg, Matillion, Snowflake, and AWS. This is the final orchestration pipeline:
As you can see, there isn't much low-code here – and that's actually one of the big benefits of Matillion. Instead, we need to use the Python and Plain SQL components to drive our process.
What's Happening Here?
From Part One, you already know that S3, Glue, and Athena are our AWS powerhouses. The first Python component ("Gather Data") collects data from an API and stores it in S3 as a Parquet file.
# snippet
df = pd.DataFrame.from_dict(response_dict["data"])
df["Upload Date"] = file_date
df.columns = [replace_no_ascii(unicode_string=col) for col in df.columns]
df.to_parquet(
f"s3://{s3_bucket}/{directory}/{year}/{file_date}.snappy.parquet",
engine="auto",
compression="snappy",
storage_options={
"key": access_key,
"secret": secret_access_key,
"token": sessions_token,
},
)
The second Python component starts the Glue crawler run and the Athena INSERT process. The Glue step is crucial for maintaining our Glue database, while Athena maintains the actual Glue Database with our Iceberg Table. Here are some code snippets to give you an idea:
def update_glue_database(self, credentials: dict):
access_key = credentials["Credentials"]["AccessKeyId"]
secret_access_key = credentials["Credentials"]["SecretAccessKey"]
session_token = credentials["Credentials"]["SessionToken"]
glue_client = boto3.client(
"glue",
aws_access_key_id=access_key,
aws_secret_access_key=secret_access_key,
aws_session_token=session_token,
)
try:
glue_client.start_crawler(
Name=self.crawler_name
)
logging.info("Started Glue crawler run")
finished = False
while not finished:
response = glue_client.get_crawler(
Name=self.crawler_name
)
status = response["Crawler"]["State"]
last_run = response["Crawler"]["LastCrawl"]
if status == "READY":
finished = True
logging.info("Finished Glue Crawler run")
if status == "READY" and last_run in ["FAILED", "CANCELLED"]:
raise ValueError("Glue Catalog failed")
time.sleep(30)
except ClientError as e:
logging.exception(e)
raise
It's important to create a condition to wait for the Glue Crawler process. Otherwise, our Athena INSERT process wouldn't maintain the Iceberg table properly. Here’s the code snippet for the Athena process:
def maintain_iceberg_with_athena(self, credentials: dict, database: str, s3_input_prefix: str) -> None:
base_table_name = s3_input_prefix.replace("-", "_")
access_key = credentials["Credentials"]["AccessKeyId"]
secret_access_key = credentials["Credentials"]["SecretAccessKey"]
session_token = credentials["Credentials"]["SessionToken"]
query = f"""INSERT INTO "{base_table_name}_iceberg" SELECT * FROM "{base_table_name}" WHERE "upload date" = (SELECT MAX("Upload Date") FROM "{base_table_name}");"""
logging.info("Running update on Iceberg table")
logging.info(query)
athena_client = boto3.client(
"athena",
aws_access_key_id=access_key,
aws_secret_access_key=secret_access_key,
aws_session_token=session_token,
)
try:
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': database, 'Catalog': 'AwsDataCatalog'},
ResultConfiguration={'OutputLocation': 's3://aws-athena-query-results-673540176684-eu-central-1/'},
WorkGroup='primary',
)
query_execution_id = response["QueryExecutionId"]
finished = False
while not finished:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = response["QueryExecution"]["Status"]["State"]
if status == "SUCCEEDED":
finished = True
logging.info("Finished Iceberg table update")
if status in ["FAILED", "CANCELLED"]:
raise ValueError(f"Updating Iceberg table with query {query} failed")
time.sleep(3)
except (ClientError, KeyError) as e:
logging.exception(e)
raise
As mentioned in the first part, we need to crawl the data in our S3 landing stage via Glue. The Athena process then creates a new data stage. That’s why I decided to clean up my landing stage after the Athena INSERT statement succeeded. With Matillion, we can use environment variables to store the names of files written to S3. We can then use these variables to delete them again. This process is done by the following script block:
def delete_parquet_files_in_s3(self, credentials: dict, parquet_files: list[str]) -> None:
access_key = credentials["Credentials"]["AccessKeyId"]
secret_access_key = credentials["Credentials"]["SecretAccessKey"]
session_token = credentials["Credentials"]["SessionToken"]
s3_client = boto3.client(
"s3",
aws_access_key_id=access_key,
aws_secret_access_key=secret_access_key,
aws_session_token=session_token,
)
for file_key in parquet_files:
try:
s3_client.delete_object(Bucket=self.s3_bucket_name, Key=file_key)
logging.info(f"File {file_key} deleted from S3 bucket {self.s3_bucket_name}.")
except ClientError as e:
logging.exception(e)
raise
The final SQL component uses this SQL statement:
ALTER ICEBERG TABLE ICEBERG_TABLE REFRESH;
This command is necessary, although the Snowflake Docs do not actually talk about this manual freshness step.
The "Easy" Part on the Snowflake Side
The infrastructure on the Snowflake side consists of a Warehouse, databases, a user, and its role. The data is integrated via an "External Volume":
An external volume is an account-level Snowflake object that stores an identity and access management (IAM) entity for your external cloud storage. Snowflake uses the external volume to securely connect to your cloud storage to access table data and metadata.
Based on that quote, if you are familiar with "External Stages" and "Storage Integration", this is very comparable. You can find the Snowflake docs here. There is also an excellent tutorial available here.
CREATE OR REPLACE EXTERNAL VOLUME iceberg_external_volume
STORAGE_LOCATIONS =
(
(
NAME = 'my-s3-us-west-2'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://<my_bucket>/'
STORAGE_AWS_ROLE_ARN = '<arn:aws:iam::123456789012:role/myrole>'
STORAGE_AWS_EXTERNAL_ID = 'iceberg_table_external_id'
)
);
If you are using Glue as the catalog, this catalog also needs to be created.
CREATE or REPLACE CATALOG INTEGRATION GLUE_DATABASE_ICEBERG
CATALOG_SOURCE=GLUE
CATALOG_NAMESPACE='glue-database-iceberg'
TABLE_FORMAT=ICEBERG
GLUE_AWS_ROLE_ARN='arn:aws:iam::012345678910:role/glue-iceberg-role'
GLUE_CATALOG_ID='012345678910'
ENABLED=TRUE;
Finally, you need to create our Iceberg table.
CREATE OR REPLACE ICEBERG TABLE ICEBERG_TABLE
EXTERNAL_VOLUME='S3_EXT_REXX_VOLUME'
CATALOG='GLUE_DATABASE_ICEBERG'
CATALOG_TABLE_NAME='GLUE_DATABASE_ICEBERG_TABLE';
Afterwards, the table can be queried, while it is coming from Glue and S3. But maybe you noticed the quotation marks in the header of this section. You might encounter this situation:
Processing aborted due to error 300010:3639435319; incident 2552819.
After contacting support, I realized that the API I queried used non-ASCII characters in their fields, which I converted to column names. As of August 2024, this is neither supported by Snowflake nor does the error logging help identify this issue by itself. That’s why I integrated this function in my S3 upload script:
def replace_no_ascii(unicode_string: str):
for i in range(0, len(unicode_string)):
try:
unicode_string[i].encode("ascii")
except UnicodeEncodeError:
unicode_string = unicode_string.replace(unicode_string[i], "%")
return unicode_string
This function removes non-ASCII characters from column names. By reloading the data with that function, I was able to query my Snowflake tables like a charm.
Closing Words
It took a while to get a clear idea about the process of leveraging data from an API as an Iceberg table. Although the process was tedious, this is what I love about my job. You can now take the fast lane. Cheers! 😊
Top comments (0)