DEV Community

Cover image for Zenml for beautiful beautiful orchestration
Akan
Akan

Posted on

Zenml for beautiful beautiful orchestration

Let's build this!

Simple ML Pipeline

To follow, please you must have at least built a Jupyter notebook or familiar with basic machine learning workflows - from data ingesting/wrangling to model deployment.

ZenML

Now - ZenML is a beautiful tool I just discovered, thanks to @ayush/@FreeCodeCamp for this introductory tutorial.

If you visit its webpage, ZenML is described as "The bridge between ML and Ops" and it is! I'd show you. However, let's review a few concepts to note on ZenML:

T๐—ต๐—ฒ ๐—ฐ๐—ผ๐—ป๐—ฐ๐—ฒ๐—ฝ๐˜ ๐—ผ๐—ณ ๐—ฎ "step"
Step operators in ZenML are specialized components that allow individual pipeline steps to be executed in specific runtime environments optimized for certain workloads. This can be particularly useful when a step requires resources not available in the default environment provided by the orchestrator.

T๐—ต๐—ฒ ๐—ฐ๐—ผ๐—ป๐—ฐ๐—ฒ๐—ฝ๐˜ ๐—ผ๐—ณ ๐—ฎ "๐˜€๐˜๐—ฎ๐—ฐ๐—ธ"
ZenML Stack

A Simple Linear Regression

See this Kaggle Notebook.

It's a simple linear regression, that predicts (or best fits) Salaries based off the Years of Experience and stores the output of the trained model - what if we wanted more? We will always get new data sources, use a different prediction model, compare metrics, promote the best performing model to "production"?

Essentially, how do we continually orchestrate machine learning exercises through its lifecycle? MLOPs - well, here's where ZenML as an orchestrator tool comes in.

We are going to reproduce this example as a set of steps in a single script orchestrator.

Looking at the notebook above - we had three major steps:

  1. Ingest the Data
  2. Clean/Wrangle/Tidy the Data
  3. Train the Data - Store the output.

We add an additional step

  1. Promote the model

Pre-installation and Virtual Environment

Create your virtual environment - the author recommends python 3.10

python3 venv -m <env_name>
# activate environment
<env_name>\Script\activate
Enter fullscreen mode Exit fullscreen mode

Install dependencies

pip install "zenml[server]"
zenml integration install sklearn -y
zenml integration install mlflow -y
Enter fullscreen mode Exit fullscreen mode

Create a stack

## Initiate 
zenml init
## Register Local artifact store
zenml artifact-store register <artifact_store_name> --flavor=local
## Register stack
zenml stack register <stack_name> -o default -a <artifact_store_name>
## Set stack
zenml stack set <stack_name>
## Review
zenml stack list
Enter fullscreen mode Exit fullscreen mode

A Simple ML Pipeline

touch <simple_ml_pipeline>.py
is whatever name you want to call your script.

Enter the following into your script

## Dependencies
## Import Libraries
import os
import numpy as np
import pandas as pd
from typing import Dict, Tuple
from zenml import Model, pipeline, step
from zenml.client import Client
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import LinearRegression
from zenml.logger import get_logger

logger = get_logger(__name__)

# from sklearn.base import RegressorMixin
df_pth = 'https://github.com/ybifoundation/Dataset/raw/main/Salary%20Data.csv'
local_pth = "./datasets/SalaryData.csv"
local_model_register = "./model_performance.json"
local_model_pth = "./models"
Enter fullscreen mode Exit fullscreen mode

Step 1: Data Ingestion

@step(enable_cache=True)
def load_data() -> pd.DataFrame:
    if os.path.exists(local_pth):
        df = pd.read_csv(local_pth)
    else:
        os.makedirs("./datasets", exist_ok=True)
        df = pd.read_csv(df_pth)
        df.to_csv("./datasets/SalaryData.csv", index=False)
    return df
Enter fullscreen mode Exit fullscreen mode

This indicates that the function load_data is a pipeline step. Since the dataset is static the enable_cache=True parameter means that the output of this step will be cached, so if the step is run again with the same inputs, the cached output will be used instead of re-executing the step.

os commands check for file in our local paths and saves/loads the salary data where it is found...

Now that we have data, we train a simple linear regression model.

Step 2: Train Model

@step
def train_model(data: pd.DataFrame) -> Tuple[Model, Dict[str, float]]:
    y = data['Salary']
    X = data[['Experience Years']]

    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.7, random_state=234)

    model = LinearRegression()
    model.fit(X_train, y_train)

    y_pred = model.predict(X_test)

    rmse = np.sqrt(mean_squared_error(y_test, y_pred))

    # Create a ZenML Model object
    zenml_model = Model(name="salary_prediction_model", model=model, metadata={"rmse": str(rmse)})

    return zenml_model, {"rmse": rmse}
Enter fullscreen mode Exit fullscreen mode

In this train_model is a pipeline step we wisely do not enable caching to accommodate randomness, like you expect we separate the target from response variables and split the data (from the load data step) and train a Linear Regression, from which we retrieve the RMSE evaluation metric.

Now, Model Object - these are specialized components that ensure that models are versioned, tracked, and integrated into pipelines for reliable model management. See how we assign a model name and also store metadata to the object. Very mindful.

We return the Model object and a dictionary of the values.

Step 3: Promote Model

@step
def promote_model(
        model: Model,
        metrics: Dict[str, float],
        stage: str = "production"
) -> bool:
    rmse = metrics["rmse"]

    # Get the ZenML client
    client = Client()

    try:
        # Try to get the previous production model
        previous_production_model = client.get_model_version(name=model.name, version="production")
        previous_production_rmse = float(previous_production_model.run_metadata["rmse"].value)
    except:
        # If there's no production model, set a high RMSE
        previous_production_rmse = float('inf')

    if rmse < previous_production_rmse:
        # Promote the model
        model.set_stage(stage, force=True)
        logger.info(f"Model promoted to {stage}!")
        return True
    else:
        logger.info(
            f"Model not promoted. Current RMSE ({rmse}) is not better than production RMSE ({previous_production_rmse})")
        return False
Enter fullscreen mode Exit fullscreen mode

This step conditional promotes model, first, it checks for an existing model - if it doesn't exist, sets an infinitely positive value.

RMSE is better when smaller, hence the checks, if the current modelโ€™s performance (measured by RMSE) is better/lower than the existing production model. If it is, the model is promoted to the production stage. Otherwise, it remains unchanged. This ensures that only models with improved performance metrics are promoted to production!

That ends it for steps, now let's gather all of these steps into a pipeline, and you guessed it correctly, we get to use a pipeline decorator next.

Pipeline

@pipeline
def simple_ml_pipeline():
    dataset = load_data()
    model, metrics = train_model(dataset)
    is_promoted = promote_model(model, metrics)

    return is_promoted
Enter fullscreen mode Exit fullscreen mode

This simple_ml_pipeline function defines a ZenML pipeline that comprises of the initial steps that:

  1. Loads the dataset.
  2. Trains a model using the dataset.
  3. Evaluates and potentially promotes the model to production based on its performance.

This pipeline ensures a streamlined process for loading data, training a model, and promoting it if it meets the performance criteria.

Add

if __name__ == "__main__":
    run = simple_ml_pipeline()
Enter fullscreen mode Exit fullscreen mode

to call the run method on the pipeline object.

Run python <simple_ml_pipeline>.py from your terminal to run your script, this should return the logs like so, through each step.

logs

Then you may run zenml up or zenml up --blocking for windows users, to view your pipeline on a UI. This should launch a local web server http://127.0.0.1:8237/ - the default login username is default

Pipelie UI

That's it, Zen master. Visit simple_ml_pipeline for the full script.

Next, I'd show you how to induce other ML libraries (DecisionTrees and RandomForests), setup the steps elegantly and integrate with mlflow for experiment tracking. Yes, it integrates beautifully with mlflow.

Please note, the promote model step was deliberately rigged to promote your model regardless, this is to prove efficacy at this level.

Top comments (0)