DEV Community

Using Athena Views As A Source In Glue

Whilst working with AWS Glue recently I noticed that I was unable to use a view created in Athena as a source for an ETL job in the same way that I could use a table that had been cataloged.

The error I received was this.

An error occurred while calling o73.getCatalogSource. No classification or connection in mydatabase.v_my_view
Enter fullscreen mode Exit fullscreen mode

Rather than try and recreate the view using a new PySpark job I used the Athena JDBC drivers as a custom JAR in a glue job to be able to query the view I wanted to use.

This blog are my notes on how this works.

Drivers

Create or reuse an existing S3 bucket to store the Athena JDBC drivers JAR file. The JAR files are available to download from AWS. I used the latest version which at the time of writing was JDBC Driver with AWS SDK AthenaJDBC42_2.0.27.1000.jar (compatible with JDBC 4.2 and requires JDK 8.0 or later).

IAM

The Glue job will need not only Glue Service privileges but also IAM privileges to access the S3 Buckets and also the AWS Athena Service.

For Athena this would provide Glue will full permissions.

{
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:AssociateKmsKey",
                "athena:*",
                "logs:CreateLogGroup",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:athena:*:youraccount:workgroup/*",
                "arn:aws:athena:*:youracccont:datacatalog/*",
                "arn:aws:logs:*:*:/aws-glue/*"
            ]
        }
Enter fullscreen mode Exit fullscreen mode

Create Glue ETL Job

My use case for the Glue job was to query the view I had and save the results into Parquet format to speed up future queries against the same data.

The following code allows you to query an Athena view as a source for a data frame.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

athena_view_dataframe = (
    glueContext.read.format("jdbc")
    .option("driver", "com.simba.athena.jdbc.Driver")
    .option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider")
    .option("url", "jdbc:awsathena://athena.eu-west-1.amazonaws.com:443")
    .option("dbtable", "AwsDataCatalog.yourathenadatabase.yourathenaview")
    .option("S3OutputLocation","s3://yours3bucket/temp")
    .load()
    )

athena_view_dataframe.printSchema()
Enter fullscreen mode Exit fullscreen mode

The key things in this code snippet to be aware of are.

.option("driver", "com.simba.athena.jdbc.Driver")
Enter fullscreen mode Exit fullscreen mode

We are telling Glue which class within the JDBC driver to use.

.option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider")
Enter fullscreen mode Exit fullscreen mode

This uses the IAM role assigned to the Glue job to authenticate to Athena. You can use other authentication method like AWS_ACCESS_KEY or federated authentication but using IAM I think makes most sense for an ETL job that will most likely run on a schedule or event.

.option("url", "jdbc:awsathena://athena.eu-west-1.amazonaws.com:443")
Enter fullscreen mode Exit fullscreen mode

I am using Athena in Ireland (EU-WEST-1) if you are using a different region update this accordingly.

.option("dbtable", "AwsDataCatalog.yourathenadatabase.yourathenaview")
Enter fullscreen mode Exit fullscreen mode

The fully qualified name of view in your Athena catalog. It's in the format of 'AwsDataCatalog.Database.View'. For example this query run in Athena.

SELECT * FROM "AwsDataCatalog"."vehicles"."v_electric_cars";
Enter fullscreen mode Exit fullscreen mode

You would set the dbtable option to this

.option("dbtable", "AwsDataCatalog.vehicles.v_electric_cars")
Enter fullscreen mode Exit fullscreen mode

The last option tells Glue which S3 location to use as temporary storage to store the data returned from Athena.

.option("S3OutputLocation","s3://yours3bucket/temp")
Enter fullscreen mode Exit fullscreen mode

At this point you can test it works. When running the job you need to tell Glue about the location for the Athena JDBC drivers JAR file that was uploaded to S3.

If you are working in the AWS Glue Console the parameter to set can be found under Job Details --> Advanced --> Dependent JARs path.

The parameter needs to be set to the full path and filename of the JAR file. For example s3://yours3bucket/jdbc-drivers/AthenaJDBC42_2.0.27.1000.jar

By setting this in the console it ensures that the correct argument is passed into the Glue job.

--extra-jars s3://yours3bucket/jdbc-drivers/AthenaJDBC42_2.0.27.1000.jar
Enter fullscreen mode Exit fullscreen mode

The final code including the conversion to Parquet format looked like this.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

athena_view_dataframe = (
    glueContext.read.format("jdbc")
    .option("driver", "com.simba.athena.jdbc.Driver")
    .option("AwsCredentialsProviderClass","com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider")
    .option("url", "jdbc:awsathena://athena.eu-west-1.amazonaws.com:443")
    .option("dbtable", "AwsDataCatalog.vehicles.v_electric_cars")
    .option("S3OutputLocation","s3://yours3bucket/temp")
    .load()
    )

athena_view_dataframe.printSchema()

athena_view_datasource = DynamicFrame.fromDF(athena_view_dataframe, glueContext, "athena_view_source")

pq_output = glueContext.write_dynamic_frame.from_options(
    frame=athena_view_datasource,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://yourotherS3Bucket/",
        "partitionKeys": [],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="ParquetConversion",
)

job.commit()

Enter fullscreen mode Exit fullscreen mode

Discussion (0)