DEV Community

chanon-mike
chanon-mike

Posted on

Using Python and ORM (SQLAlchemy) with Google BigQuery

This is a tutorial for using SQLAlchemy with BigQuery. Normally, you can use Python client for BigQuery for connecting it. However, you can use SQLAlchemy for connecting it to BigQuery too.

Check the official document here!

GitHub logo googleapis / python-bigquery-sqlalchemy

SQLAlchemy dialect for BigQuery

SQLAlchemy Dialect for BigQuery

GA pypi versions

SQLALchemy Dialects

Quick Start

In order to use this library, you first need to go through the following steps:

  1. Select or create a Cloud Platform project.
  2. [Optional] Enable billing for your project.
  3. Enable the BigQuery Storage API.
  4. Setup Authentication.

Note

This library is only compatible with SQLAlchemy versions < 2.0.0

Installation

Install this library in a virtualenv using pip. virtualenv is a tool to create isolated Python environments. The basic problem it addresses is one of dependencies and versions, and indirectly permissions.

With virtualenv, it's possible to install this library without needing system install permissions, and without clashing with the installed system dependencies.

Supported Python Versions

Python >= 3.8

Unsupported Python Versions

Python <= 3.7.

Mac/Linux

pip install virtualenv
virtualenv <your-env>
source <your-env>/bin/activate
<your-env>/bin/pip install sqlalchemy-bigquery
Enter fullscreen mode Exit fullscreen mode

Windows

pip install virtualenv
virtualenv <your-env>
<your-env>\Scripts\activate
<your-env>\Scripts\pip.exe install sqlalchemy-bigquery
Enter fullscreen mode Exit fullscreen mode

Installations when processing large datasets

If you want to have overview of BigQuery, try reading this blog!

How to query your data in BigQuery | Google Cloud Blog

Learn how to query datasets in BigQuery using SQL, save and share queries, and create views and materialized views.

favicon cloud.google.com

Installation

pip install sqlalchemy-bigquery
Enter fullscreen mode Exit fullscreen mode

Make a connection

from sqlalchemy import MetaData
from sqlalchemy import create_engine

engine = create_engine(
    'bigquery://',
    credentials_info=credentials,
)
metadata = MetaData()
Enter fullscreen mode Exit fullscreen mode

You can pass credentials info by passing the path or passing the json directly

# Passing with json
credentials = {
    'type': 'xxxx'
    'project_id': 'xxxx'
    # and other more
}
engine = create_engine('bigquery://', credentials_info=credentials)

# Passing with path
engine = create_engine('bigquery://', credentials_path='/path/to/keyfile.json')
Enter fullscreen mode Exit fullscreen mode

You can set default project name and dataset when instantiate the engine

engine = create_engine('bigquery://project_name/dataset_name', credentials_info=credentials)
Enter fullscreen mode Exit fullscreen mode

I recommend you to register the dialect first before having a connection, as I got an error

NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:bigquery

from sqlalchemy.dialects import registry

registry.register('bigquery', 'sqlalchemy_bigquery', 'BigQueryDialect')
Enter fullscreen mode Exit fullscreen mode

Query

Here, I will be using SQLAlchemy syntax for querying without writing SQL.

First, you need to get the table.

from sqlalchemy import Table

def get_table(project_name: str, dataset_name: str, table_name: str) -> Table:
        table = Table(f'{project_name}.{dataset_name}.{table_name}', metadata, autoload_with=engine)
        return table
Enter fullscreen mode Exit fullscreen mode

Then, use that table to query. I create the select method to be able to use that in many query.

from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
from sqlalchemy.sql import Selectable

def select(stmt: Selectable) -> list[dict]:
        with Session(engine) as session:
            results: list[Row] = session.execute(stmt).fetchall()
            return [r._asdict() for r in results]

Enter fullscreen mode Exit fullscreen mode

Then, put that all together

from sqlalchemy.sql import select

def select_something() -> list[dict]:
        table = get_table('project_name', 'dataset', 'table')
        stmt = (
            select(
                table.c.id.label('ID'),
                table.c.name.label('Name'),
                table.c.categoryID.label('CategoryID'),
                table.c.sortID.label('SortID'),
            )
            .filter(
                table.c.categoryID == 1,
            )
            .order_by(table.c.id)
        )
        return select(stmt)
Enter fullscreen mode Exit fullscreen mode

Done!

Summary

orm.py

from sqlalchemy import MetaData
from sqlalchemy import Table
from sqlalchemy import create_engine
from sqlalchemy.dialects import registry
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
from sqlalchemy.sql import Selectable


class SQLAlchemyBigQueryDataSource:
    def __init__(self, credentials: dict) -> None:
        registry.register('bigquery', 'sqlalchemy_bigquery', 'BigQueryDialect')
        self.engine = create_engine(
            'bigquery://',
            credentials_info=credentials,
        )
        self.metadata = MetaData()

    def get_table(self, project_name: str, dataset_name: str, table_name: str) -> Table:
        table = Table(f'{project_name}.{dataset_name}.{table_name}', self.metadata, autoload_with=self.engine)
        return table

    def select(self, stmt: Selectable) -> list[dict]:
        with Session(self.engine) as session:
            results: list[Row] = session.execute(stmt).fetchall()
            return [r._asdict() for r in results]
Enter fullscreen mode Exit fullscreen mode

table_name.py

from sqlalchemy import Table
from sqlalchemy.sql import select

from orm import SQLAlchemyBigQueryDataSource


class SomethingBigQueryDataSource:
    def __init__(self, data_source: SQLAlchemyBigQueryDataSource) -> None:
        self.data_source = data_source

    def select_something(self) -> list[dict]:
        table = get_table('project_name', 'dataset', 'table')
        stmt = (
            select(
                table.c.id.label('ID'),
                table.c.name.label('Name'),
                table.c.categoryID.label('CategoryID'),
                table.c.sortID.label('SortID'),
            )
            .filter(
                table.c.categoryID == 1,
            )
            .order_by(table.c.id)
        )
        return select(stmt)
Enter fullscreen mode Exit fullscreen mode

Top comments (0)