DEV Community

Cover image for Tidying up Pipelines with DataClasses

Tidying up Pipelines with DataClasses

sephib profile image Sephi Berry Originally published at ・5 min read


Tidy code makes everyone's life easier.

The code in an ML project will probably be read many times, so making our workflow easier to understand will be appreciated later by everyone on the team.
During ML projects, we need to access data in a similar manner (throughout our workflow) for training, validating and predicting our model and data. A clear semantic for accessing the data allows for easier code management between projects. Additionally naming conventions are also very useful in order to be able to understand and reuse the code in an optimal manner.

There are some tools that can assist in this cleanliness such as the usage of Pipelines and Dataclasses.

MLEngineer is 10% ML 90% Engineer.


Pipeline is a meta object that assists in managing the processes in a ML model. Pipelines can encapsulate separate processes which can later on be combined together.

Forcing a workflow to be implemented within a Pipeline objects can be nuisance at the beginning (especially the conversion between pandas DataFrame and np.ndarray ), but down-the-line it guaranties the quality of the model (no data leakage, modularity etc.). Here is Kevin Markham 4 min. video explaining pipeline advantages.


Another useful Python object to save datasets along the pipeline are dataclasses. Before Python 3.7 you may have been using namedtuple, however after Python 3.7 dataclasses were introduced, and are now a great candidate for storing such data objects. Using dataclasses allows for access consistency to the various datasets throughout the ML Pipeline.


Since we are not analysing any dataset, this blog post is an example of an advance pipeline that incorporates non standard pieces (none standard sklearn modules).

Assuming that we have a classification problem and our data has numeric and categorical column types, the pipeline incorporates:

  1. Preprocess data preparation per column type
  2. Handle the categorical columns using the vtreat package
  3. Run a catboost classifier.

We may build our pipeline as follows:

y = df.pop("label")
X = df.copy(True)

num_pipe = Pipeline([("scaler",StanderdScaler()),
preprocess_pipe = ColumnTransformer(
   transformers=[("num_pipe", num_pipe, X.select_dtypes("number"))]

pipe = Pipeline([("preprocess_pipe", preprocess_pipe),
               ("vtreat", BinomiaOutcomeTreatmentPlan()),

Enter fullscreen mode Exit fullscreen mode

In this pseudo code our Pipeline has some preprocessing to the numeric columns followed by the processing of the categorical columns with the vtreat package (it will pass-through all the non-categorical and numeric columns).


  1. Since catboost does not have a transform method we are going to introduce it later on.
  2. The usage of vtreat is an example of the possibility to use nonstandard modules within the classifications (assuming they follow sklearn paradigms)

So now the time has come to cut up our data...

Test vs. Train vs. Valid

A common workflow when developing an ML model is the necessity to split the date into Test/Train/Valid datasets.

source: Shan-Hung Wu & DataLab, National Tsing Hua University

In a nut shell the difference between the data are:

  1. Test - put aside - don't look until final model estimation
  2. Train - dataset to train model
  3. Valid - dataset to validate model during the training phase (this can be via Cross Validation iteration, GridSearch, etc.)

Each dataset will have similar attributes that we will need to save and access throughout the ML workflow.

In order to prevent confusion lets create a dataclass to save the datasets in a structured manner.

# basic dataclass 
import numpy as np
from dataclasses import dataclass

class Split:
    X: np.ndarray = None
    y: np.array = None
    idx: np.array = None
    pred_class: np.array = None
    pred_proba: np.ndarray = None
    kwargs: Dict = None

    def __init__(self, name:str): = name

Enter fullscreen mode Exit fullscreen mode

Now we can create the training and test datasets as follows:

train = Split(name='train')   
test = Split(name='test')  
Enter fullscreen mode Exit fullscreen mode

Each dataclass will have the following fields:

  1. X - a numpy ndarray storing all the features
  2. y - a numpy array storing the labeling classification
  3. idx - the index for storing the original indexes useful for referencing at the end of the pipe line
  4. pred_class - a numpy array storing the predicted classification
  5. pred_proba - a numpy ndarray for storing the probabilities of the classifications

Additionally we will store a name for the dataclass (in the init function) to easily referencing it along the pipeline.

Splitting In Action

There are several methods that can be used to split the datasets. When data are imbalanced it is important to split the data with a stratified method. In our case, we chose to use StratifiedShuffleSplit however, in contrast to the simple train-test split which returns the datasets themselves, the StratifiedShuffleSplit returns only the indices for each group, thus we will need a helper function to get the dataset themselves (our helper function is nice and minimal for the usage of our dataclasses).

def get_split_from_idx(X, y, split1: Split, split2: Split):
    split1.X, split2.X = X.iloc[split1.idx], X.iloc[split2.idx]
    split1.y, split2.y = y.iloc[split1.idx], y.iloc[split2.idx]
    return split1, split2  

for fold_name, (train.idx, test.idx) in enumerate( StratifiedSplitValid(X, y, n_split=5, train_size=0.8) ):
    train, test = get_split_from_idx(X, y, train, test)  # a helper function

Enter fullscreen mode Exit fullscreen mode

Pipeline in action

Now we can run the first part of our Pipeline

_train_X = pipe.fit_transform(train.X)  

Enter fullscreen mode Exit fullscreen mode

Once we have fit_transform our data (allowing for vtreat magic to work), we can introduce the catboost classifier into our Pipeline.

catboost_clf = CatBoostClassifier()

train_valid = Split(name="train_valid")
valid = Split(name="valid")
for fold_name, (train_valid.idx, valid.idx) in enumerate(StratifiedSplitValid(_train_X, train.y, n_split=10, train_size=0.9) ):
    train_valid, valid = get_split_from_idx(_train_X, train.y, train_valid, valid)

    pipe.steps.append(("catboost_clf",catboost_clf)), train_valid.y,
            catboost_clf__eval_set=[(valid.X, valid.y)],

Enter fullscreen mode Exit fullscreen mode

Notice the two following points:

  1. Using pipe.steps.append we are able to introduce steps into the pipeline that could not be initially part of the workflow.
  2. Adding parameters into the steps within the pipeline requires the use of double dunder for nested paramters.

Finally we can get some results

test.pred_class = pipe.(test.X)  
test.pred_proba = pipe.pred_proba(test.X)[:,1]
Enter fullscreen mode Exit fullscreen mode

Now when we analyse our model we can generate our metrics (e.g. confusion_matrix) by easily referencing to the relevant dataset as follows:

from sklearn.metrics import confusion_matrix
conf_matrix_test = confusion_matrix(y_true=test.y, y_pred=test.pred_class )

Enter fullscreen mode Exit fullscreen mode


This blog post outlines the advantages for using Pipelines and Dataclasses.

Working with the Dataclasses is really a no-brainer since it is very simple and can easily be incorporated into any code base. Pipelines require more effort while integrating them into the code, but the benefits are substantial and well worth it.
I hope the example illustrated the potential for such usage and will inspire and encourage you to try it out.

Discussion (0)

Editor guide