Datawarehouse is the single source of truth where data extracted from multiple sources are either first loaded and then transformed which is ELT or first transformed as per the set of business requirements then loaded which is ETL.
The primary use cases of using datawarehouse is in data science and BI.
BigQuery is GCP serverless data warehouse.In BigQuery the storage and compute is separated and can scale separately.
Designing and building a data warehouse starts with business requirements. What problems we are trying to solve and how best we can solve them. Lets say the initial goal for an organization moving the on prem data to cloud might starts with having the OLTP running on Cloud SQL and then building data pipelines to bring data into BigQuery for performing various analytics and machine learning.Visualizations and dashboards can be built with Looker connected to BigQuery.
End users are never given access to the raw tables in BigQuery. Lets say we have a set of Order and Products and Customers data in BigQuery. We may first build denormalized tables satisfying the various business requirements then create some views on top of these denormalized tables that will have subset of the table data and provide access to various users so that they can answer any business questions by querying table data. The process of loading the tables in BigQuery then creating denormalized tables or views or materialized views will fall under Extract Load Transform data pipeline.
In BigQuery tables are created under DataSets. The datasets are the containers to organize and control access to tables and views.
The datasets are created in specific GCP project and in specified geography location.
- Creating the dataset that will be the container for the table.
# with bq command
bq --location=US mk --description "Dataset to store diabetes table" demodataset
With Python code creating the BigQuery dataset:
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
dataset_id = "{}.demods".format(client.project)
# Construct a full Dataset object to send to the API.
dataset = bigquery.Dataset(dataset_id)
# Specify the geographic location where the dataset should reside.
dataset.location = "US"
dataset = client.create_dataset(dataset, timeout=30) # Make an API request.
print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
- Copying the extracted csv file in the GCS bucket We have created a GCS bucket and uploaded diabetes.csv in the bucket. GCS is the staging area.
- Create the BigQuery table from this extracted data
#The schema.json file is created to store the table schema
[
{
"mode": "NULLABLE",
"name": "PatientID",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "Pregnancies",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "PlasmaGlucose",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "DiastolicBloodPressure",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "TricepsThickness",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "SerumInsulin",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "BMI",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "DiabetesPedigree",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "Age",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "Diabetic",
"type": "INTEGER"
}
]
#Empty table is created with this schema in the demodataset
bq mk --table --description "Table containing diabetes data" --label organization:dev demodataset.mytable tblschema.json
Loading the csv file in this empty table:
# skip_leading_rows=1 for ignoring the header
bq load --source_format=CSV --skip_leading_rows=1 demodataset.mytable gs://demo_bq_bucket9282/diabetes.csv
Creating the empty table and loading csv data with Python
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
table_id = "<<project>>.demods.demotbl"
schema = [
{
"mode": "NULLABLE",
"name": "PatientID",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "Pregnancies",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "PlasmaGlucose",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "DiastolicBloodPressure",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "TricepsThickness",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "SerumInsulin",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "BMI",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "DiabetesPedigree",
"type": "FLOAT"
},
{
"mode": "NULLABLE",
"name": "Age",
"type": "INTEGER"
},
{
"mode": "NULLABLE",
"name": "Diabetic",
"type": "INTEGER"
}
]
table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table) # Make an API request.
print(
"Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)
# data load
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
table_id = "<<project>>.demods.demotbl"
job_config = bigquery.LoadJobConfig(
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://demo_bq_bucket9282/diabetes.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))
As part of data migration from On Prem to BigQuery we build the ETL/ELT data pipeline.First the inital data load gets completed then the daily load is scheduled daily/weekly based on the business needs.
BigQuery has three writeDisposition which are write append, write empty and write truncate.writeDisposition determines how data gets written to the table.
Top comments (0)