DEV Community

Darth Espressius
Darth Espressius

Posted on

Automating Data Validation using Kedro Hooks

30 Second Intro to Kedro

Kedro is a fairly un-opinionated Python framework for running data pipelines. On a high-level, Kedro is a DAG-solver, consisting a series of discrete steps abstracted as Nodes, connected by Datasets abstracted as Catalog entries. Nodes are grouped into higher-order constructs called Pipelines, and the order in which nodes are run is determined by the common data-dependencies in the input-output of each node.

Hooks

Hooks, according to the Kedro documentation, allow you to extend the behaviour of Kedro's main exeuction in an easy and consistent manner. A Hook is built from a specification and an implementation. Below shows the general structure of a project created by running kedro new.

Hooks are implemented in hooks.py and should consist a set of related functions grouped into a class (for each set of related) hooks. A hook is then specified in the src/<project_name>/settings.py by registering the hook Class. This is done by importing your newly created hook-class and adding it to the HOOKS key.

There are several types of hooks, depending on what type of event your hook should follow, and when it should execute. In this post, I would be focusing on one specific hook to validate data after it has been loaded

Using Hooks to Validate Data

One Kedro hook after_dataset_loaded allows you to consistently execute a user-defined function every time an entry in your data-catalog is loaded. This is helpful in, for-example: ensuring the distribution of your data-source is as-expected. This can be a common issue in building machine-learning pipelines, where monitoring data-drift is crucial in maintaining the performance and trust-ability of your model. In this post, we will be writing a hook to monitor data-drift using the Population-Stability-Index

Hook Definition

We will be using the after_dataset_loaded Hook to ensure our data for a (potential) machine-learning model is consistent. If we look at the definition of the after_dataset_loaded Hook:

    @hook_spec
    def after_dataset_loaded(self, dataset_name: str, data: Any) -> None:
        """Hook to be invoked after a dataset is loaded from the catalog.
        Args:
            dataset_name: name of the dataset that was loaded from the catalog.
            data: the actual data that was loaded from the catalog.
        """
Enter fullscreen mode Exit fullscreen mode

we see that the hook definition requires a dataset name and the data that was loaded from the catalog. (Don't worry, we don't need to actually specify those as this is handled by Kedro, we simply need to use the above-defined interface in our hooks.py file, and add the hook_impl decorator to the correctly-named function).

For example, let us create a new class in hooks.py, called PSIHooks and create the required hook:

from kedro.framework.hooks import hook_impl

class PSIHooks:

@hook_impl 
def after_dataset_loaded(
  self, 
  dataset_name: str,
  data: Any) -> None:

Enter fullscreen mode Exit fullscreen mode

Let's also assume that our data contains columns, and we would like to validate each column against a series of values stored as arrays. Using this implementation (not mine), for PSI, we can add the following to the body of our hook:

# convert dataframe to numpy matrix 
actual_values = data.values

psi_values = calculate_psi(expected_values, actual_values)

logging.info('f Dataset Name: {dataset_name}')
logging.info('PSI Values')
logging.info(psi_values)
Enter fullscreen mode Exit fullscreen mode

Additionally, you could add conditional logic to determine the data needed to validate each individual dataset, and the option to not monitor data for datasets that are a result of some data-operation.

While the above establishes the general PSI-calculation, there is no way to keep track of what our PSI is, or how the PSI-changes over time. In this case, we can use an experiment-tracking framework such as MLFlow, Neptune.ai or wandb.ai in the body of our hook to log how our PSI changes over time.

Top comments (0)