DEV Community

Dr. Malte Polley
Dr. Malte Polley

Posted on

Integration of Apache Iceberg in S3, Glue, Athena, Matillion, and Snowflake – Part 2

TL;DR

In this article, I'll show how I integrated Apache Iceberg with AWS Glue, S3, Athena, Matillion, and Snowflake. We start by collecting data via an API, storing it in S3 as a Parquet file, and then use Glue Crawler and Athena to update the data in an Iceberg table. Matillion orchestrates the entire process, while Snowflake queries the data via an "External Volume." An important step was removing non-ASCII characters from the column names to avoid compatibility issues. In the end, the data can be easily queried in Snowflake. The infrastructure part can be found here.

Working Backwards: The Matillion Orchestration Pipeline

In this case, let's start from the end to see what I implemented regarding Apache Iceberg, Matillion, Snowflake, and AWS. This is the final orchestration pipeline:

Matillion Pipeline

As you can see, there isn't much low-code here – and that's actually one of the big benefits of Matillion. Instead, we need to use the Python and Plain SQL components to drive our process.

What's Happening Here?

From Part One, you already know that S3, Glue, and Athena are our AWS powerhouses. The first Python component ("Gather Data") collects data from an API and stores it in S3 as a Parquet file.

# snippet
df = pd.DataFrame.from_dict(response_dict["data"])
df["Upload Date"] = file_date
df.columns = [replace_no_ascii(unicode_string=col) for col in df.columns]
df.to_parquet(
    f"s3://{s3_bucket}/{directory}/{year}/{file_date}.snappy.parquet",
    engine="auto",
    compression="snappy",
    storage_options={
        "key": access_key,
        "secret": secret_access_key,
        "token": sessions_token,
    },
)
Enter fullscreen mode Exit fullscreen mode

The second Python component starts the Glue crawler run and the Athena INSERT process. The Glue step is crucial for maintaining our Glue database, while Athena maintains the actual Glue Database with our Iceberg Table. Here are some code snippets to give you an idea:

def update_glue_database(self, credentials: dict):
    access_key = credentials["Credentials"]["AccessKeyId"]
    secret_access_key = credentials["Credentials"]["SecretAccessKey"]
    session_token = credentials["Credentials"]["SessionToken"]
    glue_client = boto3.client(
        "glue",
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_access_key,
        aws_session_token=session_token,
    )
    try:
        glue_client.start_crawler(
            Name=self.crawler_name
        )
        logging.info("Started Glue crawler run")
        finished = False
        while not finished:
            response = glue_client.get_crawler(
                Name=self.crawler_name
            )
            status = response["Crawler"]["State"]
            last_run = response["Crawler"]["LastCrawl"]
            if status == "READY":
                finished = True
                logging.info("Finished Glue Crawler run")
            if status == "READY" and last_run in ["FAILED", "CANCELLED"]:
                raise ValueError("Glue Catalog failed")
            time.sleep(30)
    except ClientError as e:
        logging.exception(e)
        raise
Enter fullscreen mode Exit fullscreen mode

It's important to create a condition to wait for the Glue Crawler process. Otherwise, our Athena INSERT process wouldn't maintain the Iceberg table properly. Here’s the code snippet for the Athena process:

def maintain_iceberg_with_athena(self, credentials: dict, database: str, s3_input_prefix: str) -> None:
    base_table_name = s3_input_prefix.replace("-", "_")
    access_key = credentials["Credentials"]["AccessKeyId"]
    secret_access_key = credentials["Credentials"]["SecretAccessKey"]
    session_token = credentials["Credentials"]["SessionToken"]
    query = f"""INSERT INTO "{base_table_name}_iceberg" SELECT * FROM "{base_table_name}" WHERE "upload date" = (SELECT MAX("Upload Date") FROM "{base_table_name}");"""
    logging.info("Running update on Iceberg table")
    logging.info(query)
    athena_client = boto3.client(
        "athena",
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_access_key,
        aws_session_token=session_token,
    )
    try:
        response = athena_client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={'Database': database, 'Catalog': 'AwsDataCatalog'},
            ResultConfiguration={'OutputLocation': 's3://aws-athena-query-results-673540176684-eu-central-1/'},
            WorkGroup='primary',
        )
        query_execution_id = response["QueryExecutionId"]
        finished = False
        while not finished:
            response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
            status = response["QueryExecution"]["Status"]["State"]
            if status == "SUCCEEDED":
                finished = True
                logging.info("Finished Iceberg table update")
            if status in ["FAILED", "CANCELLED"]:
                raise ValueError(f"Updating Iceberg table with query {query} failed")
            time.sleep(3)
    except (ClientError, KeyError) as e:
        logging.exception(e)
        raise
Enter fullscreen mode Exit fullscreen mode

As mentioned in the first part, we need to crawl the data in our S3 landing stage via Glue. The Athena process then creates a new data stage. That’s why I decided to clean up my landing stage after the Athena INSERT statement succeeded. With Matillion, we can use environment variables to store the names of files written to S3. We can then use these variables to delete them again. This process is done by the following script block:

def delete_parquet_files_in_s3(self, credentials: dict, parquet_files: list[str]) -> None:
    access_key = credentials["Credentials"]["AccessKeyId"]
    secret_access_key = credentials["Credentials"]["SecretAccessKey"]
    session_token = credentials["Credentials"]["SessionToken"]

    s3_client = boto3.client(
        "s3",
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_access_key,
        aws_session_token=session_token,
    )

    for file_key in parquet_files:
        try:
            s3_client.delete_object(Bucket=self.s3_bucket_name, Key=file_key)
            logging.info(f"File {file_key} deleted from S3 bucket {self.s3_bucket_name}.")
        except ClientError as e:
            logging.exception(e)
            raise
Enter fullscreen mode Exit fullscreen mode

The final SQL component uses this SQL statement:

ALTER ICEBERG TABLE ICEBERG_TABLE REFRESH;
Enter fullscreen mode Exit fullscreen mode

This command is necessary, although the Snowflake Docs do not actually talk about this manual freshness step.

The "Easy" Part on the Snowflake Side

The infrastructure on the Snowflake side consists of a Warehouse, databases, a user, and its role. The data is integrated via an "External Volume":

An external volume is an account-level Snowflake object that stores an identity and access management (IAM) entity for your external cloud storage. Snowflake uses the external volume to securely connect to your cloud storage to access table data and metadata.

Based on that quote, if you are familiar with "External Stages" and "Storage Integration", this is very comparable. You can find the Snowflake docs here. There is also an excellent tutorial available here.

CREATE OR REPLACE EXTERNAL VOLUME iceberg_external_volume
   STORAGE_LOCATIONS =
      (
         (
            NAME = 'my-s3-us-west-2'
            STORAGE_PROVIDER = 'S3'
            STORAGE_BASE_URL = 's3://<my_bucket>/'
            STORAGE_AWS_ROLE_ARN = '<arn:aws:iam::123456789012:role/myrole>'
            STORAGE_AWS_EXTERNAL_ID = 'iceberg_table_external_id'
         )
      );
Enter fullscreen mode Exit fullscreen mode

If you are using Glue as the catalog, this catalog also needs to be created.

CREATE or REPLACE CATALOG INTEGRATION GLUE_DATABASE_ICEBERG
  CATALOG_SOURCE=GLUE
  CATALOG_NAMESPACE='glue-database-iceberg'
  TABLE_FORMAT=ICEBERG
  GLUE_AWS_ROLE_ARN='arn:aws:iam::012345678910:role/glue-iceberg-role'
  GLUE_CATALOG_ID='012345678910'
  ENABLED=TRUE;
Enter fullscreen mode Exit fullscreen mode

Finally, you need to create our Iceberg table.

CREATE OR REPLACE ICEBERG TABLE ICEBERG_TABLE
  EXTERNAL_VOLUME='S3_EXT_REXX_VOLUME'
  CATALOG='GLUE_DATABASE_ICEBERG'
  CATALOG_TABLE_NAME='GLUE_DATABASE_ICEBERG_TABLE';
Enter fullscreen mode Exit fullscreen mode

Afterwards, the table can be queried, while it is coming from Glue and S3. But maybe you noticed the quotation marks in the header of this section. You might encounter this situation:

Processing aborted due to error 300010:3639435319; incident 2552819.
Enter fullscreen mode Exit fullscreen mode

After contacting support, I realized that the API I queried used non-ASCII characters in their fields, which I converted to column names. As of August 2024, this is neither supported by Snowflake nor does the error logging help identify this issue by itself. That’s why I integrated this function in my S3 upload script:

def replace_no_ascii(unicode_string: str):
    for i in range(0, len(unicode_string)):
        try:
            unicode_string[i].encode("ascii")
        except UnicodeEncodeError:
            unicode_string = unicode_string.replace(unicode_string[i], "%")
    return unicode_string
Enter fullscreen mode Exit fullscreen mode

This function removes non-ASCII characters from column names. By reloading the data with that function, I was able to query my Snowflake tables like a charm.

Closing Words

It took a while to get a clear idea about the process of leveraging data from an API as an Iceberg table. Although the process was tedious, this is what I love about my job. You can now take the fast lane. Cheers! 😊

Top comments (0)