BigQuery supports transactions since last year (Presented at Google Cloud Next'21) : it is now possible to perform mutating operations over one or several tables and then commit or rollback the result atomically, by wrapping the script between
Easy enough ! It's all explained in the official documentation
Yet transactions come with a limitation:
A transaction cannot span multiple scripts.
While this is not an issue most of the time, it can be a problem when the scripts enclosed in the transaction become too complex or have too many query parameters, or break any other quota of BigQuery jobs. This can happen when query scripts are auto-generated from a request payload, for example.
There is a way around it, with BigQuery sessions. Let's see how it works
Sessions are a way to link jobs and persist transient data, like temporary tables, between them.
One common use case for sessions is exactly what we want:
Create multi-statement transactions over multiple queries. Within a session, you can begin a transaction, make changes, and view the temporary result before deciding to commit or rollback. You can do this over several queries in the session. If you do not use a session, a multi-statement transaction needs to be completed in a single query.
The idea is to stack the transaction queries inside the same session, beginning with
BEGIN TRANSACTION; and ending with
In between, you can call put as many queries as necessary and the whole session will have atomic behavior.
A session is closed automatically after 24 hours of inactivity. However, when mixed with transactions, it can happen that the targeted table gets "locked" in the session and becomes unusable until the end of the session. That's why I recommend to force the session ending at the end of the script. It is done by invoking the following query:
We are dealing with the notion of session that we need to open and always close at the end of the processing : a context manager is naturally indicated for this.
"""ContextManager wrapping a bigquery session.""" from google.cloud import bigquery class BigquerySession: """ContextManager wrapping a bigquerySession.""" def __init__(self, bqclient: bigquery.Client, bqlocation: str = "EU") -> None: """Construct instance.""" self._bigquery_client = bqclient self._location = bqlocation self._session_id = None def __enter__(self) -> str: """Initiate a Bigquery session and return the session_id.""" job = self._bigquery_client.query( "SELECT 1;", # a query can't fail job_config=bigquery.QueryJobConfig(create_session=True), location=self._location, ) self._session_id = job.session_info.session_id job.result() # wait job completion return self._session_id def __exit__(self, exc_type, exc_value, traceback): """Abort the opened session.""" if self._session_id: # abort the session in any case to have a clean state at the end # (sometimes in case of script failure, the table is locked in # the session) job = self._bigquery_client.query( "CALL BQ.ABORT_SESSION();", job_config=bigquery.QueryJobConfig( create_session=False, connection_properties=[ bigquery.query.ConnectionProperty( key="session_id", value=self._session_id ) ], ), location=self._location, ) job.result()
It then become really easy to use this context to stack jobs into a single session, thus to create a multistatement, multiscripts bigquery transaction:
with BigquerySession(self.bigquery_client, BIGQUERY_LOCATION) as session_id: # open transaction job = self.bigquery_client.query( "BEGIN TRANSACTION;", job_config=bigquery.QueryJobConfig( create_session=False, connection_properties=[ bigquery.query.ConnectionProperty( key="session_id", value=session_id ) ], ), location=BIGQUERY_LOCATION, ) job.result() # stack queries for queryscript in scripts: job = self.bigquery_client.query( queryscript, job_config=bigquery.QueryJobConfig( create_session=False, connection_properties=[ bigquery.query.ConnectionProperty( key="session_id", value=session_id ) ], ), location=BIGQUERY_LOCATION, ) job.result() # end transaction job = self.bigquery_client.query( "COMMIT TRANSACTION;", job_config=bigquery.QueryJobConfig( create_session=False, connection_properties=[ bigquery.query.ConnectionProperty( key="session_id", value=session_id ) ], ), location=BIGQUERY_LOCATION, ) job.result()
Notice how all jobs are run with the same session_id (i.e. within the same session) and in the same location (this is a requirement for sessions).
Hope this helps !
Photo by Caroline Selfors on Unsplash