This is a tutorial for using SQLAlchemy with BigQuery. Normally, you can use Python client for BigQuery for connecting it. However, you can use SQLAlchemy for connecting it to BigQuery too.
Check the official document here!
googleapis / python-bigquery-sqlalchemy
SQLAlchemy dialect for BigQuery
SQLAlchemy Dialect for BigQuery
Quick Start
In order to use this library, you first need to go through the following steps:
- Select or create a Cloud Platform project.
- [Optional] Enable billing for your project.
- Enable the BigQuery Storage API.
- Setup Authentication.
Installation
Install this library in a virtualenv using pip. virtualenv is a tool to create isolated Python environments. The basic problem it addresses is one of dependencies and versions, and indirectly permissions.
With virtualenv, it's possible to install this library without needing system install permissions, and without clashing with the installed system dependencies.
Supported Python Versions
Python >= 3.8
Unsupported Python Versions
Python <= 3.7.
Mac/Linux
pip install virtualenv
virtualenv <your-env>
source <your-env>/bin/activate
<your-env>/bin/pip install sqlalchemy-bigquery
Windows
pip install virtualenv
virtualenv <your-env>
<your-env>\Scripts\activate
<your-env>\Scripts\pip.exe install sqlalchemy-bigquery
Installations when processing large datasets
When handling large datasets, you may see speed increases by also…
If you want to have overview of BigQuery, try reading this blog!
Installation
pip install sqlalchemy-bigquery
Make a connection
from sqlalchemy import MetaData
from sqlalchemy import create_engine
engine = create_engine(
'bigquery://',
credentials_info=credentials,
)
metadata = MetaData()
You can pass credentials info by passing the path or passing the json directly
# Passing with json
credentials = {
'type': 'xxxx'
'project_id': 'xxxx'
# and other more
}
engine = create_engine('bigquery://', credentials_info=credentials)
# Passing with path
engine = create_engine('bigquery://', credentials_path='/path/to/keyfile.json')
You can set default project name and dataset when instantiate the engine
engine = create_engine('bigquery://project_name/dataset_name', credentials_info=credentials)
I recommend you to register the dialect first before having a connection, as I got an error
NoSuchModuleError: Can't load plugin: sqlalchemy.dialects:bigquery
from sqlalchemy.dialects import registry
registry.register('bigquery', 'sqlalchemy_bigquery', 'BigQueryDialect')
Query
Here, I will be using SQLAlchemy syntax for querying without writing SQL.
First, you need to get the table.
from sqlalchemy import Table
def get_table(project_name: str, dataset_name: str, table_name: str) -> Table:
table = Table(f'{project_name}.{dataset_name}.{table_name}', metadata, autoload_with=engine)
return table
Then, use that table to query. I create the select method to be able to use that in many query.
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
from sqlalchemy.sql import Selectable
def select(stmt: Selectable) -> list[dict]:
with Session(engine) as session:
results: list[Row] = session.execute(stmt).fetchall()
return [r._asdict() for r in results]
Then, put that all together
from sqlalchemy.sql import select
def select_something() -> list[dict]:
table = get_table('project_name', 'dataset', 'table')
stmt = (
select(
table.c.id.label('ID'),
table.c.name.label('Name'),
table.c.categoryID.label('CategoryID'),
table.c.sortID.label('SortID'),
)
.filter(
table.c.categoryID == 1,
)
.order_by(table.c.id)
)
return select(stmt)
Done!
Summary
orm.py
from sqlalchemy import MetaData
from sqlalchemy import Table
from sqlalchemy import create_engine
from sqlalchemy.dialects import registry
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
from sqlalchemy.sql import Selectable
class SQLAlchemyBigQueryDataSource:
def __init__(self, credentials: dict) -> None:
registry.register('bigquery', 'sqlalchemy_bigquery', 'BigQueryDialect')
self.engine = create_engine(
'bigquery://',
credentials_info=credentials,
)
self.metadata = MetaData()
def get_table(self, project_name: str, dataset_name: str, table_name: str) -> Table:
table = Table(f'{project_name}.{dataset_name}.{table_name}', self.metadata, autoload_with=self.engine)
return table
def select(self, stmt: Selectable) -> list[dict]:
with Session(self.engine) as session:
results: list[Row] = session.execute(stmt).fetchall()
return [r._asdict() for r in results]
table_name.py
from sqlalchemy import Table
from sqlalchemy.sql import select
from orm import SQLAlchemyBigQueryDataSource
class SomethingBigQueryDataSource:
def __init__(self, data_source: SQLAlchemyBigQueryDataSource) -> None:
self.data_source = data_source
def select_something(self) -> list[dict]:
table = get_table('project_name', 'dataset', 'table')
stmt = (
select(
table.c.id.label('ID'),
table.c.name.label('Name'),
table.c.categoryID.label('CategoryID'),
table.c.sortID.label('SortID'),
)
.filter(
table.c.categoryID == 1,
)
.order_by(table.c.id)
)
return select(stmt)
Top comments (0)