DEV Community

Arka Dash
Arka Dash

Posted on • Edited on

Taming the Machine Learning Pipeline Beast: ZenML Edition

Intro to the Zen of ZenML

Buckle up, because we’re going on a journey from Jupyter jungle to ZenML nirvana. No, ZenML won’t make you a meditation master, but it will make you a pipeline pro. So, set aside your 100+ lines of spaghetti code; it’s time to bring in the big guns.

To follow along, install ZenML (trust me, it’s easier than explaining to your boss why your last model broke). Types matter here, so no freestyle coding; we’ll talk about that as we go.

First Things First: The Sacred pipelines.py

Create a new file called pipelines.py. In this masterpiece, we’ll build our pipeline—something a bit cleaner than a tangled mess of data processing. Start with ZenML’s pipeline decorator:

from zenml import pipeline

@pipeline(name="used_car_price_predictor")
def ml_pipeline():
    # We’ll fill in these dots soon.
    ...
Enter fullscreen mode Exit fullscreen mode

Step 1: Data Ingestion, a.k.a. Opening Pandora’s Zip

Here’s our first ZenML step, where we’ll read in data from a .zip file (because, of course, data never comes in simple CSVs). Meet our data_ingestion_step function, where we import the data and throw it into an artifact—a ZenML term for “we’re passing this mess to the next step, but it’s technically fancy now.”

from zenml import step
import pandas as pd
from typing import Tuple

@step(enable_cache=False)
def data_ingestion_step(file_path: str) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    # Extract zip files and read data with pd.read_csv()
    ...
    return train, test, sample  # This tuple is now an “Artifact” – no fancy unboxing needed
Enter fullscreen mode Exit fullscreen mode

In ml_pipeline, we extract the actual data from the artifact like this:

raw_data_artifacts = data_ingestion_step(file_path="data/playground-series-s4e9.zip")
train, test, sample = raw_data_artifacts
Enter fullscreen mode Exit fullscreen mode

Straightforward Steps (Don’t Get Too Comfortable)

Step 2: Missing Values, Feature Engineering, and Outlier Detection

These steps are relatively painless, but don’t get cocky. Using ZenML’s step decorator, we handle missing values, engineer features, and clean up outliers.

@step(enable_cache=False)
def handle_missing_values_step(df: pd.DataFrame) -> pd.DataFrame:
    # Code to fill missing values
    ...

@step(enable_cache=False)
def feature_engineering_step(df: pd.DataFrame, strategy: str, features: list) -> pd.DataFrame:
    # Log-transform and other fancy tricks
    ...

@step(enable_cache=False)
def outlier_detection_step(df: pd.DataFrame, feature: str, strategy: str, method: str) -> pd.DataFrame:
    # Outlier removal or adjustment
    ...
Enter fullscreen mode Exit fullscreen mode

And in the pipeline:

filled_train = handle_missing_values_step(train)
engineered_train = feature_engineering_step(filled_train, strategy='log', features=['price'])
cleaned_train = outlier_detection_step(df=engineered_train, feature='price', strategy='IQR', method='remove')
Enter fullscreen mode Exit fullscreen mode

Step 3: Data Splitting

Our data is finally clean. Now it’s time to split it into training and testing sets. You’d think this would be the easy part, but you’d be wrong—type casting is key.

X_train, X_test, y_train, y_test = data_splitter(cleaned_train)
Enter fullscreen mode Exit fullscreen mode

The Model Building Labyrinth

Step 4: Building a Model That Doesn’t Break Every Step

Here’s where things get tricky. Sklearn’s RegressorMixin is useful for portability, but ZenML artifacts don’t always play nice. So, we hack it by making a custom PipelineRegressor class:

from sklearn.pipeline import Pipeline
from sklearn.base import RegressorMixin

class PipelineRegressor(Pipeline, RegressorMixin):
    pass
Enter fullscreen mode Exit fullscreen mode

Now, we use this class in our model_building_step. You’ll need to initialize mlflow, log the columns, and wrap up the process:

@step(enable_cache=False)
def model_building_step(x_train: pd.DataFrame, y_train: pd.Series) -> RegressorMixin:
    mlflow.start_run()
    model_pipeline = PipelineRegressor(steps=[...])  # Add preprocessing, etc.
    model_pipeline.fit(x_train, y_train)

    # Log features
    processed_columns = list(model_pipeline.named_steps['preprocessor'].get_feature_names_out())
    mlflow.log_dict({'used_columns': processed_columns}, 'used_columns.json')
    mlflow.end_run()

    return model_pipeline
Enter fullscreen mode Exit fullscreen mode

Evaluating with Just Enough Data to Feel Smart

Step 5: Model Evaluation

With the model built, we make some predictions and log evaluation metrics—if only it were as simple as “look, it’s accurate!” Here’s the ZenML version of that:

@step(enable_cache=False)
def evaluation_step(model_pipeline: RegressorMixin, X_test: pd.DataFrame, y_test: pd.Series) -> pd.DataFrame:
    y_pred = model_pipeline.predict(X_test)
    rmse, r2 = mean_squared_error(y_test, y_pred, squared=False), r2_score(y_test, y_pred)
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)

    return pd.DataFrame({"rmse": [rmse], "r2": [r2]})
Enter fullscreen mode Exit fullscreen mode

The End: A.k.a., Our ZenML Workflow is Complete

Congratulations, you made it! Now, run ml_pipeline() and head over to the ZenML dashboard for a DAG view of the process. The MLFlow UI will display metrics, model details, and features in use.

Useful Links

Top comments (0)