This is the second in a series of posts about dlt. It was given as part of the Data Talks Club's data engineering zoomcamp. The instructor is Adrian Brudaru, and you can see the dlt repository HERE.
Normalizing Data
There are two parts to normalizing data. The first is normalizing without changing data. There is also filtering data, where you remove outliers or data that has no meaning. This changes the data slightly.
The first method includes adding types, renaming columns, flattening nested dictionaries, and turning lists or arrays into child tables. Lists or arrays can't be flattened, because they contain different amounts of data.
json doesn't describe data. Types are not enforced. If you get one message from the Discord api, you get a string, but if there is more than one, you get a list. json is meant to be a transfer mechanism, not a database. It's not suitable for direct analytical usage.
For this example, the instructor modified a version of the taxi database. He added nested dictionaries, nested lists, and a hash id for each record.
data = [
{
"vendor_name": "VTS",
"record_hash": "b00361a396177a9cb410ff61f20015ad",
"time": {
"pickup": "2009-06-14 23:23:00",
"dropoff": "2009-06-14 23:48:00"
},
"Trip_Distance": 17.52,
# nested dictionaries could be flattened
"coordinates": {
"start": {
"lon": -73.787442,
"lat": 40.641525
},
"end": {
"lon": -73.980072,
"lat": 40.742963
}
},
"Rate_Code": None,
"store_and_forward": None,
"Payment": {
"type": "Credit",
"amt": 20.5,
"surcharge": 0,
"mta_tax": None,
"tip": 9,
"tolls": 4.15,
"status": "booked"
},
"Passenger_Count": 2,
# nested lists need to be expressed as separate tables
"passengers": [
{"name": "John", "rating": 4.9},
{"name": "Jack", "rating": 3.9}
],
"Stops": [
{"lon": -73.6, "lat": 40.6},
{"lon": -73.5, "lat": 40.5}
]
},
# ... more data
]
dlt automates normalization. We'll see how it does this next. This removes a huge burden from the data engineer and automatically normalizes the data using best practices.
The instructor created a dlt data pipeline and ran it, creating a table in DuckDB.
# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later
pipeline = dlt.pipeline(destination='duckdb',
dataset_name='taxi_rides')
# run with merge write disposition.
# This is so scaffolding is created for the next example,
# where we look at merging data
pipeline.run(data,
table_name="rides",
write_disposition="merge",
primary_key="record_hash")
dlt can use schemas as data contracts. You can specify the format, and reject data that doesn't have that format. It will type the data, flatten structures, rename columns to fit database standards, and process a stream of events/rows without filling memory. It will load to a variety of databases or file formats.
Let's look at what we just created:
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))
print("\n\n\n Rides table below: Note the times are properly typed")
rides = conn.sql("SELECT * FROM rides").df()
display(rides)
print("\n\n\n Passengers table")
passengers = conn.sql("SELECT * FROM rides__passengers").df()
display(passengers)
print("\n\n\n Stops table")
stops = conn.sql("SELECT * FROM rides__stops").df()
display(stops)
The output is:
Loaded tables:
┌─────────────────────┐
│ name │
│ varchar │
├─────────────────────┤
│ _dlt_loads │
│ _dlt_pipeline_state │
│ _dlt_version │
│ rides │
│ rides__passengers │
│ rides__stops │
└─────────────────────┘
Rides table below: Note the times are properly typed
record_hash vendor_name time__pickup time__dropoff trip_distance coordinates__start__lon coordinates__start__lat coordinates__end__lon coordinates__end__lat payment__type payment__amt payment__surcharge payment__tip payment__tolls payment__status passenger_count _dlt_load_id _dlt_id
0 b00361a396177a9cb410ff61f20015ad VTS 2009-06-14 19:23:00-04:00 2009-06-14 19:48:00-04:00 17.52 -73.787442 40.641525 -73.980072 40.742963 Credit 20.5 0 9 4.15 booked 2 1708172075.423542 OuSbrdhe7+UTfA
Passengers table
name rating _dlt_root_id _dlt_parent_id _dlt_list_idx _dlt_id
0 John 4.9 OuSbrdhe7+UTfA OuSbrdhe7+UTfA 0 e8Cw1jDEzQ4Mww
1 Jack 3.9 OuSbrdhe7+UTfA OuSbrdhe7+UTfA 1 /h4QT5P61zivrQ
Stops table
lon lat _dlt_root_id _dlt_parent_id _dlt_list_idx _dlt_id
0 -73.6 40.6 OuSbrdhe7+UTfA OuSbrdhe7+UTfA 0 PYqbqAEcmMiESw
1 -73.5 40.5 OuSbrdhe7+UTfA OuSbrdhe7+UTfA 1 aSmvX/UKtYsclQ
You can see that the fields for pickup and dropoff time have been converted to columns with the names time_pickup and time_dropoff, and likewise for other fields. Two tables were created from the lists. There is a relationship between these two tables and the top level data that they came from. dlt stores the parent id in the table, so the two tables will have that field to connect back to the parent. We could join the tables back into the main data if we want to have a single table.
Top comments (0)