30 Second Intro to Kedro
Kedro is a fairly un-opinionated Python framework for running data pipelines. On a high-level, Kedro is a DAG-solver, consisting a series of discrete steps abstracted as Nodes, connected by Datasets abstracted as Catalog entries. Nodes are grouped into higher-order constructs called Pipelines, and the order in which nodes are run is determined by the common data-dependencies in the input-output of each node.
Hooks
Hooks, according to the Kedro documentation, allow you to extend the behaviour of Kedro's main exeuction in an easy and consistent manner. A Hook is built from a specification and an implementation. Below shows the general structure of a project created by running kedro new
.
Hooks are implemented in hooks.py
and should consist a set of related functions grouped into a class (for each set of related) hooks. A hook is then specified in the src/<project_name>/settings.py
by registering the hook Class. This is done by importing your newly created hook-class and adding it to the HOOKS
key.
There are several types of hooks, depending on what type of event your hook should follow, and when it should execute. In this post, I would be focusing on one specific hook to validate data after it has been loaded
Using Hooks to Validate Data
One Kedro hook after_dataset_loaded
allows you to consistently execute a user-defined function every time an entry in your data-catalog is loaded. This is helpful in, for-example: ensuring the distribution of your data-source is as-expected. This can be a common issue in building machine-learning pipelines, where monitoring data-drift is crucial in maintaining the performance and trust-ability of your model. In this post, we will be writing a hook to monitor data-drift using the Population-Stability-Index
Hook Definition
We will be using the after_dataset_loaded
Hook to ensure our data for a (potential) machine-learning model is consistent. If we look at the definition of the after_dataset_loaded
Hook:
@hook_spec
def after_dataset_loaded(self, dataset_name: str, data: Any) -> None:
"""Hook to be invoked after a dataset is loaded from the catalog.
Args:
dataset_name: name of the dataset that was loaded from the catalog.
data: the actual data that was loaded from the catalog.
"""
we see that the hook definition requires a dataset name and the data that was loaded from the catalog. (Don't worry, we don't need to actually specify those as this is handled by Kedro, we simply need to use the above-defined interface in our hooks.py
file, and add the hook_impl
decorator to the correctly-named function).
For example, let us create a new class in hooks.py
, called PSIHooks
and create the required hook:
from kedro.framework.hooks import hook_impl
class PSIHooks:
@hook_impl
def after_dataset_loaded(
self,
dataset_name: str,
data: Any) -> None:
Let's also assume that our data contains columns, and we would like to validate each column against a series of values stored as arrays. Using this implementation (not mine), for PSI, we can add the following to the body of our hook:
# convert dataframe to numpy matrix
actual_values = data.values
psi_values = calculate_psi(expected_values, actual_values)
logging.info('f Dataset Name: {dataset_name}')
logging.info('PSI Values')
logging.info(psi_values)
Additionally, you could add conditional logic to determine the data needed to validate each individual dataset, and the option to not monitor data for datasets that are a result of some data-operation.
While the above establishes the general PSI-calculation, there is no way to keep track of what our PSI is, or how the PSI-changes over time. In this case, we can use an experiment-tracking framework such as MLFlow, Neptune.ai or wandb.ai in the body of our hook to log how our PSI changes over time.
Top comments (0)