DEV Community

Cover image for Normalizing data with dlt
Cris Crawford
Cris Crawford

Posted on

Normalizing data with dlt

This is the second in a series of posts about dlt. It was given as part of the Data Talks Club's data engineering zoomcamp. The instructor is Adrian Brudaru, and you can see the dlt repository HERE.

Normalizing Data

There are two parts to normalizing data. The first is normalizing without changing data. There is also filtering data, where you remove outliers or data that has no meaning. This changes the data slightly.

The first method includes adding types, renaming columns, flattening nested dictionaries, and turning lists or arrays into child tables. Lists or arrays can't be flattened, because they contain different amounts of data.

json doesn't describe data. Types are not enforced. If you get one message from the Discord api, you get a string, but if there is more than one, you get a list. json is meant to be a transfer mechanism, not a database. It's not suitable for direct analytical usage.

For this example, the instructor modified a version of the taxi database. He added nested dictionaries, nested lists, and a hash id for each record.

data = [
    {
        "vendor_name": "VTS",
    "record_hash": "b00361a396177a9cb410ff61f20015ad",
        "time": {
            "pickup": "2009-06-14 23:23:00",
            "dropoff": "2009-06-14 23:48:00"
        },
        "Trip_Distance": 17.52,
        # nested dictionaries could be flattened
        "coordinates": {
            "start": {
                "lon": -73.787442,
                "lat": 40.641525
            },
            "end": {
                "lon": -73.980072,
                "lat": 40.742963
            }
        },
        "Rate_Code": None,
        "store_and_forward": None,
        "Payment": {
            "type": "Credit",
            "amt": 20.5,
            "surcharge": 0,
            "mta_tax": None,
            "tip": 9,
            "tolls": 4.15,
        "status": "booked"
        },
        "Passenger_Count": 2,
        # nested lists need to be expressed as separate tables
        "passengers": [
            {"name": "John", "rating": 4.9},
            {"name": "Jack", "rating": 3.9}
        ],
        "Stops": [
            {"lon": -73.6, "lat": 40.6},
            {"lon": -73.5, "lat": 40.5}
        ]
    },
    # ... more data
]
Enter fullscreen mode Exit fullscreen mode

dlt automates normalization. We'll see how it does this next. This removes a huge burden from the data engineer and automatically normalizes the data using best practices.

The instructor created a dlt data pipeline and ran it, creating a table in DuckDB.

# define the connection to load to.
# We now use duckdb, but you can switch to Bigquery later

pipeline = dlt.pipeline(destination='duckdb',
                dataset_name='taxi_rides')

# run with merge write disposition.
# This is so scaffolding is created for the next example,
# where we look at merging data

pipeline.run(data, 
                table_name="rides",         
                write_disposition="merge",
                primary_key="record_hash")
Enter fullscreen mode Exit fullscreen mode

dlt can use schemas as data contracts. You can specify the format, and reject data that doesn't have that format. It will type the data, flatten structures, rename columns to fit database standards, and process a stream of events/rows without filling memory. It will load to a variety of databases or file formats.

Let's look at what we just created:

conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")

# let's see the tables
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
print('Loaded tables: ')
display(conn.sql("show tables"))


print("\n\n\n Rides table below: Note the times are properly typed")
rides = conn.sql("SELECT * FROM rides").df()
display(rides)

print("\n\n\n Passengers table")
passengers = conn.sql("SELECT * FROM rides__passengers").df()
display(passengers)
print("\n\n\n Stops table")
stops = conn.sql("SELECT * FROM rides__stops").df()
display(stops)

Enter fullscreen mode Exit fullscreen mode

The output is:

Loaded tables: 

┌─────────────────────┐
│        name         │
│       varchar       │
├─────────────────────┤
│ _dlt_loads          │
│ _dlt_pipeline_state │
│ _dlt_version        │
│ rides               │
│ rides__passengers   │
│ rides__stops        │
└─────────────────────┘




 Rides table below: Note the times are properly typed

    record_hash     vendor_name     time__pickup    time__dropoff   trip_distance   coordinates__start__lon     coordinates__start__lat     coordinates__end__lon   coordinates__end__lat   payment__type   payment__amt    payment__surcharge  payment__tip    payment__tolls  payment__status     passenger_count     _dlt_load_id    _dlt_id
0   b00361a396177a9cb410ff61f20015ad    VTS     2009-06-14 19:23:00-04:00   2009-06-14 19:48:00-04:00   17.52   -73.787442  40.641525   -73.980072  40.742963   Credit  20.5    0   9   4.15    booked  2   1708172075.423542   OuSbrdhe7+UTfA




 Passengers table

    name    rating  _dlt_root_id    _dlt_parent_id  _dlt_list_idx   _dlt_id
0   John    4.9     OuSbrdhe7+UTfA  OuSbrdhe7+UTfA  0   e8Cw1jDEzQ4Mww
1   Jack    3.9     OuSbrdhe7+UTfA  OuSbrdhe7+UTfA  1   /h4QT5P61zivrQ




 Stops table

    lon     lat     _dlt_root_id    _dlt_parent_id  _dlt_list_idx   _dlt_id
0   -73.6   40.6    OuSbrdhe7+UTfA  OuSbrdhe7+UTfA  0   PYqbqAEcmMiESw
1   -73.5   40.5    OuSbrdhe7+UTfA  OuSbrdhe7+UTfA  1   aSmvX/UKtYsclQ
Enter fullscreen mode Exit fullscreen mode

You can see that the fields for pickup and dropoff time have been converted to columns with the names time_pickup and time_dropoff, and likewise for other fields. Two tables were created from the lists. There is a relationship between these two tables and the top level data that they came from. dlt stores the parent id in the table, so the two tables will have that field to connect back to the parent. We could join the tables back into the main data if we want to have a single table.

Top comments (0)