DEV Community

huhu wang
huhu wang

Posted on

Kubeflow deployment and main function usage plans

This article briefly introduces kubeflow and its deployment and use methods. Recently, I was researching the kubeflow platform and found that there is less information online. In addition, its version iteration is faster and some of the calling and usage methods have changed. Due to the work content, some of its main functions have been run through, such as katib parameter tuning, kubeflow pipline construction, and multi-user creation. Wait for specific cases.

kubeflow

Kubeflow is an open source platform designed for deploying, managing, and scaling machine learning (ML) workflows on Kubernetes, aiming to simplify the deployment and operation of machine learning projects in production environments. Container orchestration and resource management functions based on Kubernetes. By splitting the machine learning workflow into a series of containerized tasks, Kubeflow can take advantage of Kubernetes' automatic scaling, fault tolerance, and scheduling capabilities to ensure efficient execution of machine learning tasks. Kubeflow provides a complete set of tools and services to support everything from The entire machine learning life cycle from data preparation, model training, tuning to deployment. This end-to-end solution helps data scientists and engineers quickly transition from development to production.

Overall, kubeflow consists of the following components

  1. Kubeflow Central Dashboard provides a web interface for accessing Kubeflow and its components. It serves as a centralized hub, unifying various tools and services within the cluster for easier management of the machine learning platform.
  2. Kubeflow integrates with Jupyter Notebooks to offer an interactive environment for data exploration, experimentation, and model development. Notebooks support multiple languages like Python, R, and Scala, enabling collaborative and reproducible ML workflows.
  3. Kubeflow Pipelines allow users to define and run complex ML workflows as Directed Acyclic Graphs (DAGs). These pipelines help automate and manage end-to-end processes like data preprocessing, model training, evaluation, and deployment, enhancing reproducibility and collaboration. The Kubeflow Pipelines SDK is a set of Python packages for efficiently defining and executing ML workflows.
  4. Kubeflow Training Operator provides tools for large-scale training of ML models, supporting distributed training with frameworks like TensorFlow, PyTorch, and XGBoost. It leverages Kubernetes' scalability and resource management for efficient model training across clusters.
  5. Kubeflow Serving enables the deployment of trained ML models as scalable, production-ready services, supporting frameworks like TensorFlow Serving, Seldon Core, or custom servers. Models can be deployed for real-time or batch predictions via HTTP endpoints.
  6. Kubeflow Metadata is a centralized repository for managing metadata related to ML experiments, runs, and artifacts, ensuring reproducibility, collaboration, and governance across the workflow.

Environment Setup

The prerequisite for deploying Kubeflow is having a cluster. I am using Kubernetes version 1.26, installed and deployed with kubeadm. The corresponding Kubeflow version is 1.8.1.

NFS Installation

Since deploying Kubeflow requires a default storage class, here's how to install and deploy the NFS file system with the nfs-subdir-external-provisioner.

According to the official documentation, installing Kubeflow using the Kubeflow manifest requires a default storageclass, kustomize, and kubectl tools. To use the default storage class, the nfs-subdir-external-provisioner is deployed. It can automatically create and manage PV and PVC relationships. Below is the process for installing and deploying NFS and the NFS plugin.

Install the NFS service using the command apt install -y nfs-kernel-server.

Create a data storage directory with mkdir -p /data/redis.

Modify the contents of the /etc/exports file by adding the new configuration directory: /data/redis 192.168.0.0/24(rw,sync,no_all_squash,no_subtree_check,no_root_squash).

Restart the NFS service after completing the above steps.

Deploying NFS-Subdir-External-Provisioner

NFS-Subdir-External-Provisioner is a dynamic volume provisioner that uses an existing and configured NFS server to support dynamic provisioning of Kubernetes persistent volumes through persistent volume claims.

Before deploying the NFS-subdir service, ensure you have a node with NFS service and know the corresponding storage directory. In the example above, the node and directory are 192.168.0.208 and /data/redis, respectively.

  • Create a service account
apiVersion: v1

kind: ServiceAccount

metadata:

  name: nfs-client-provisioner

  namespace: default _#_ _替换成你要部署的_ _Namespace_

---

kind: ClusterRole

apiVersion: rbac.authorization.k8s.io/v1

metadata:

  name: nfs-client-provisioner-runner

rules:

  - apiGroups: [""]

    resources: ["persistentvolumes"]

    verbs: ["get", "list", "watch", "create", "delete"]

  - apiGroups: [""]

    resources: ["persistentvolumeclaims"]

    verbs: ["get", "list", "watch", "update"]

  - apiGroups: ["storage.k8s.io"]

    resources: ["storageclasses"]

    verbs: ["get", "list", "watch"]

  - apiGroups: [""]

    resources: ["events"]

    verbs: ["create", "update", "patch"]

---

kind: ClusterRoleBinding

apiVersion: rbac.authorization.k8s.io/v1

metadata:

  name: run-nfs-client-provisioner

subjects:

  - kind: ServiceAccount

    name: nfs-client-provisioner

    namespace: default

roleRef:

  kind: ClusterRole

  name: nfs-client-provisioner-runner

  apiGroup: rbac.authorization.k8s.io

---

kind: Role

apiVersion: rbac.authorization.k8s.io/v1

metadata:

  name: leader-locking-nfs-client-provisioner

  namespace: default

rules:

  - apiGroups: [""]

    resources: ["endpoints"]

    verbs: ["get", "list", "watch", "create", "update", "patch"]

---

kind: RoleBinding

apiVersion: rbac.authorization.k8s.io/v1

metadata:

  name: leader-locking-nfs-client-provisioner

  namespace: default

subjects:

  - kind: ServiceAccount

    name: nfs-client-provisioner

    namespace: default

roleRef:

  kind: Role

  name: leader-locking-nfs-client-provisioner

  apiGroup: rbac.authorization.k8s.io
Enter fullscreen mode Exit fullscreen mode
  • Deploy NFS-Subdir-External-Provisioner and modify the configuration parameters as needed:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: nfs-client-provisioner
  labels:
    app: nfs-client-provisioner
spec:
  replicas: 1
  strategy:
    type: Recreate  # Set upgrade strategy to Recreate (default is RollingUpdate)
  selector:
    matchLabels:
      app: nfs-client-provisioner
  template:
    metadata:
      labels:
        app: nfs-client-provisioner
    spec:
      serviceAccountName: nfs-client-provisioner
      containers:
        - name: nfs-client-provisioner
          # image: gcr.io/k8s-staging-sig-storage/nfs-subdir-external-provisioner:v4.0.0
          image: registry.cn-beijing.aliyuncs.com/xngczl/nfs-subdir-external-provisione:v4.0.0
          volumeMounts:
            - name: nfs-client-root
              mountPath: /persistentvolumes
          env:
            - name: PROVISIONER_NAME  # Name of the provisioner, must match the name set in the storage class
              value: nfs-client
            - name: NFS_SERVER  # NFS server address, must match the value in the volumes parameter
              value: 192.168.0.208
            - name: NFS_PATH  # NFS server data storage directory, must match the value in the volumes parameter
              value: /data/redis
      volumes:
        - name: nfs-client-root
          nfs:
            server: 192.168.0.208  # NFS server address
            path: /data/redis  # NFS server data storage directory
Enter fullscreen mode Exit fullscreen mode
  • Create NFS StorageClass:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: nfs-storage
  annotations:
    storageclass.kubernetes.io/is-default-class: "true"  # Set as the default storage class
provisioner: nfs-client  # Must match the provisioner name set in the deployment
parameters:
  archiveOnDelete: "true"  # Set to "false" to delete PVC without retaining data; "true" retains data
Enter fullscreen mode Exit fullscreen mode

Afterwards, apply the above files using kubectl apply -f. Also, grant permissions to the NFS storage directory with chmod 777 /data/redis.

Installing Kubeflow

Download the corresponding binary package skustomize_v5.0.3_linux_amd64.tar.gz from the official GitHub repository of the Kubeflow manifest project, and extract it to the /usr/local/bin directory. Download the official Kubeflow manifest package mainfests-1.8.1.tar.gz.

Modify Default StorageClass

Edit the YAML files by adding storageClassName: nfs-storage in the following files located in the mainfests-1.8.1 directory:

  • apps/katib/upstream/components/mysql/pvc.yaml
  • common/oidc-client/oidc-authservice/base/pvc.yaml
  • apps/pipeline/upstream/third-party/minio/base/minio-pvc.yaml
  • apps/pipeline/upstream/third-party/mysql/base/mysql-pv-claim.yaml

Modify APP_SECURE_COOKIES

Edit the configuration file to set APP_SECURE_COOKIES=false, which disables the use of secure cookies.

Open the file with:
vim ./apps/jupyter/jupyter-web-app/upstream/base/params.env
params.env

Set APP_SECURE_COOKIES to false to ensure dashboard access to Kubeflow after deployment. You can also check for other instances that may need this change using the following command:

find ./ -type f -exec grep -l "APP_SECURE_COOKIES" {} ;
In most cases, only the Jupyter Web App (JWP) configuration needs this update.

JWP

Deploy Kubeflow

After making the necessary configuration changes, navigate to the manifest directory and run the following command to automatically deploy Kubeflow. Due to the large number of dependencies, issues may arise during installation. The previous configurations should resolve most problems, but if the installation fails or pods do not run correctly, further troubleshooting may be required.

while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done

Modify SVC to NodePort Mode

Since the deployed services are all of the ClusterIP type, they cannot be accessed externally. Therefore, you need to manually change the istio-ingressgateway service to NodePort type.
k edit svc -n istio-system istio-ingressgateway

The default login credentials are:

  • Username: user@example.com
  • Password: 12341234

Login

Using Kubeflow

Kubeflow Overview

Below is an introduction to the key components of Kubeflow:

  • Notebook Servers: These serve as tools for managing interactive experiments, helping researchers quickly conduct algorithm experiments. They also provide unified document management capabilities.
  • AutoML: This automates processes such as feature engineering, model selection, hyperparameter tuning, and model evaluation, reducing the number of manual experiments required.
  • Pipeline: An engineering tool that organizes the different stages of an algorithmic workflow into a topology diagram. It can be combined with Argo to implement MLOps.
  • Serverless: This allows models to be deployed directly as services, shortening the path from experimentation to production.

Kubeflow Components

Model Development - Notebook

Kubeflow Notebooks is a critical component of the Kubeflow platform, enabling data scientists and ML engineers to run Jupyter Notebooks on Kubernetes. This significantly simplifies managing and using Jupyter Notebooks in cloud environments. With Kubeflow Notebooks, users can easily create and manage multiple Jupyter Notebook instances within a Kubernetes cluster. These instances can be configured with specific resources such as CPU, memory, and GPU, ensuring efficient resource isolation and utilization in a multi-user environment.

Notebook Setup

Create a new task through the notebook interface:
New Task

In the configuration page, you can set the name and allocate resources like CPU and GPU. After creation, the corresponding pod will appear. For GPU usage, refer to the official documentation.
Pod Configuration
GPU Configuration

Click "connect" to link to the corresponding pod. At this step, you might encounter issues with pulling images, requiring manual image pulls to the specified node before the pod can run.

Model Training

Model Training

When creating a notebook, you can select an image with TensorFlow, allowing you to use the framework directly. Other images like PyTorch-CUDA are also available. After setting up, you can run a simple training code as shown below:

import numpy as np

import tensorflow as tf

from tensorflow.keras.datasets import mnist

from tensorflow.keras.utils import to_categorical

from tensorflow.keras.models import Sequential

from tensorflow.keras.layers import Dense, Flatten, Dropout, Conv2D, MaxPooling2D

_#_ _加载__MNIST__数据集_

(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

_#_ _预处理数据:调整形状并归一化_

train_images = train_images.reshape(-1, 28, 28, 1).astype('float32') / 255.0

test_images = test_images.reshape(-1, 28, 28, 1).astype('float32') / 255.0

_#_ _将标签转换为_ _one-hot_ _编码_

train_labels = to_categorical(train_labels, 10)

test_labels = to_categorical(test_labels, 10)

_#_ _构建模型_

model = Sequential([

    Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),

    MaxPooling2D(pool_size=(2, 2)),

    Conv2D(64, kernel_size=(3, 3), activation='relu'),

    MaxPooling2D(pool_size=(2, 2)),

    Flatten(),

    Dense(128, activation='relu'),

    Dropout(0.5),

    Dense(10, activation='softmax')

])

_#_ _编译模型_

model.compile(optimizer='adam',

              loss='categorical_crossentropy',

              metrics=['accuracy'])

_#_ _训练模型_

model.fit(train_images, train_labels, epochs=10, batch_size=64, validation_data=(test_images, test_labels))

_#_ _评估模型_

test_loss, test_accuracy = model.evaluate(test_images, test_labels, verbose=0)

print(f"Test accuracy: {test_accuracy:.4f}")

# 保存模型

model.save('mnist_cnn_model.keras')

print("模型保存成功")
Enter fullscreen mode Exit fullscreen mode

and we have

also we can edit in a new jupyter books

import numpy as np

import tensorflow as tf

from PIL import Image, ImageOps, ImageFilter

import matplotlib.pyplot as plt

_#_ _加载保存的模型_

loaded_model = tf.keras.models.load_model('mnist_cnn_model.keras')

定义你自己的图片文件名列表

image_files = [

    'mnist_test_0_label_9.png',

    'mnist_test_2_label_8.png',

    'mnist_test_4_label_2.png',

    'mnist_test_1_label_0.png',

    'mnist_test_3_label_7.png'

]

_#image_files = ['__图片__.jpg', '123.jpg', 'third.jpg', '9.jpg']_

def preprocess_image(img):

    _#_ _转换为灰度图像_

    img = img.convert('L')

    _#_ _自动对比度增强_

    img = ImageOps.autocontrast(img)

    _#_ _裁剪数字的边缘并居中_

    img = img.crop(img.getbbox())  _#_ _裁剪非空白区域_

    img = img.resize((20, 20), Image.Resampling.LANCZOS)  _#_ _调整图像大小,保持最大信息_

    background = Image.new('L', (28, 28), 0)  _#_ _创建黑色背景_

    offset = ((28 - img.size[0]) // 2, (28 - img.size[1]) // 2)

    background.paste(img, offset)  _#_ _将图像粘贴到背景上使其居中_

    return background

_#_ _创建一个图形,包含__5__行__2__列的子图_

fig, axs = plt.subplots(5, 2, figsize=(10, 25))

_#fig, axs = plt.subplots(4, 2, figsize=(10, 25))_

for i, file in enumerate(image_files):

    _#_ _加载图像_

    img = Image.open(file)

    _#_ _对图像进行预处理_

    processed_img = preprocess_image(img)

    _#_ _将图像转换为数组并进行标准化,确保形状为_ _(28, 28, 1)_

    img_array = np.array(processed_img).reshape(1, 28, 28, 1).astype('float32') / 255.0

    _#_ _进行预测_

    predictions = loaded_model.predict(img_array)

    _#_ _获取预测结果_

    predicted_digit = np.argmax(predictions[0])

    _#_ _显示原始图片_

    axs[i, 0].imshow(img, cmap='gray')

    axs[i, 0].set_title(f'原始图片: {file}')

    axs[i, 0].axis('off')

    _#_ _显示处理后的图片_

    axs[i, 1].imshow(processed_img, cmap='gray')

    axs[i, 1].set_title(f'预测结果: {predicted_digit}')

    axs[i, 1].axis('off')

    print(f"{file} 预测的数字是: {predicted_digit}")

plt.tight_layout()

plt.show()
Enter fullscreen mode Exit fullscreen mode


GPU Training

Using a GPU image requires GPU resources in our cluster, as shown below:
GPU Cluster

When selecting a GPU image, if you want to add a GPU but get a prompt indicating that no GPUs are available in the cluster, you need to physically install a GPU on a node and then add an operator to the cluster to use the GPU resources.

Below is the method for installing the NVIDIA GPU Operator:
Reference: Official NVIDIA Operator Installation Guide

Download and prepare Helm 3:

curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3
&& chmod 700 get_helm.sh
&& ./get_helm.sh

Ensure NFD mode is disabled; if it's enabled, manually turn it off:

kubectl get nodes -o json | jq '.items[].metadata.labels | keys | any(startswith("feature.node.kubernetes.io"))'

Add the NVIDIA Helm repository:

helm repo add nvidia https://helm.ngc.nvidia.com/nvidia
&& helm repo update

Deploy the GPU Operator:

helm install --wait --generate-name
-n gpu-operator --create-namespace
nvidia/gpu-operator
--set driver.version=535
Enter fullscreen mode Exit fullscreen mode

Note that the driver version should match your specific GPU model. For instance, I used version 535 because I have an A800 GPU. Make sure to select the correct driver version based on your NVIDIA GPU model.

After running the commands, you should see the following:
Deployment Success

The node will now show schedulable resources as nvidia.com/gpu:
GPU Resources

Open Kubeflow to start using the GPU for training tasks. On this page, specify an image that includes CUDA, and select the available NVIDIA GPU in the cluster in the GPU configuration.
Kubeflow GPU Setup

Once set up, enter Jupyter and create a Python 3 environment. You can then develop machine learning code and use the nvidia-smi command within the pod to check and utilize the GPU.
NVIDIA-SMI Command

Below is the code for training using the GPU:

import os

import tensorflow as tf

import time

import numpy as np

from tensorflow.keras import layers, models

import matplotlib.pyplot as plt

print("====检查 GPU 可用性====")

os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

_#_ _检查_ _GPU_ _是否可用_

if tf.test.is_gpu_available():

    print("\033[1;32m[GPU 可用] 将进行 GPU 和 CPU 训练对比\033[0m")

    gpu_device = tf.config.list_physical_devices('GPU')[0]

    print(f"可用的 GPU: {gpu_device}")

else:

    print("\033[1;31m[GPU 不可用] 只能使用 CPU 进行训练\033[0m")

    exit()

print("\n====加载和预处理数据====")

(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()

train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255

test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255

def create_model():

    model = models.Sequential([

        layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),

        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(64, (3, 3), activation='relu'),

        layers.MaxPooling2D((2, 2)),

        layers.Conv2D(64, (3, 3), activation='relu'),

        layers.Flatten(),

        layers.Dense(64, activation='relu'),

        layers.Dense(10, activation='softmax')

    ])

    model.compile(optimizer='adam',

                  loss='sparse_categorical_crossentropy',

                  metrics=['accuracy'])

    return model

_# GPU_ _训练_

print("\n====开始 GPU 训练====")

with tf.device('/GPU:0'):

    gpu_model = create_model()

    start_time = time.time()

    gpu_history = gpu_model.fit(train_images, train_labels, epochs=10,

                                validation_split=0.2, batch_size=64, verbose=1)

    gpu_time = time.time() - start_time

_# CPU_ _训练_

print("\n====开始 CPU 训练====")

os.environ['CUDA_VISIBLE_DEVICES'] = '-1'  _#_ _禁用_ _GPU_

with tf.device('/CPU:0'):

    cpu_model = create_model()

    start_time = time.time()

    cpu_history = cpu_model.fit(train_images, train_labels, epochs=10,

                                validation_split=0.2, batch_size=64, verbose=1)

    cpu_time = time.time() - start_time

_#_ _结果对比_

print("\n====训练时间对比====")

print(f"\033[1;34mGPU 训练时间: {gpu_time:.2f} 秒\033[0m")

print(f"\033[1;34mCPU 训练时间: {cpu_time:.2f} 秒\033[0m")

print(f"\033[1;32mGPU 加速比: {cpu_time / gpu_time:.2f}x\033[0m")

_#_ _绘制训练过程的损失和准确率曲线_

def plot_history(history, title):

    acc = history.history['accuracy']

    val_acc = history.history['val_accuracy']

    loss = history.history['loss']

    val_loss = history.history['val_loss']

    epochs = range(1, len(acc) + 1)

    plt.figure(figsize=(12, 5))

    plt.subplot(1, 2, 1)

    plt.plot(epochs, loss, 'bo-', label='Training loss')

    plt.plot(epochs, val_loss, 'ro-', label='Validation loss')

    plt.title(f'{title} - Training and validation loss')

    plt.xlabel('Epochs')

    plt.ylabel('Loss')

    plt.legend()

    plt.subplot(1, 2, 2)

    plt.plot(epochs, acc, 'bo-', label='Training accuracy')

    plt.plot(epochs, val_acc, 'ro-', label='Validation accuracy')

    plt.title(f'{title} - Training and validation accuracy')

    plt.xlabel('Epochs')

    plt.ylabel('Accuracy')

    plt.legend()

    plt.show()

print("\n====可视化 GPU 训练过程====")

plot_history(gpu_history, "GPU Training")

print("\n====可视化 CPU 训练过程====")

plot_history(cpu_history, "CPU Training")

_#_ _评估_ _GPU_ _模型_

print("\n====评估 GPU 训练的模型====")

test_loss, test_acc = gpu_model.evaluate(test_images, test_labels, verbose=0)

print(f'\n\033[1;32mTest accuracy: {test_acc:.4f}\033[0m')

_#_ _保存_ _GPU_ _训练的模型_

print("\n====保存 GPU 训练的模型====")

gpu_model.save('mnist_model_gpu.keras')

print("模型已保存为 mnist_model_gpu.keras")
Enter fullscreen mode Exit fullscreen mode

This code demonstrates the significant computational power of GPUs compared to CPUs, especially in deep learning tasks. The code first checks for available GPUs in the environment and then trains a simple Convolutional Neural Network (CNN) on both GPU and CPU, using the MNIST handwritten digits dataset for classification.

GPU vs CPU Training
Training Output

During training, you can monitor GPU utilization using the nvidia-smi command, which provides real-time insights into GPU usage.

nvidia-smi Output

As shown in the image, GPU utilization during training is around 9%, indicating that the task successfully leveraged the GPU for computation.

Pipeline

Kubeflow Pipelines is a core module within the Kubeflow project, focused on building, deploying, and managing complex machine learning workflows. It offers a comprehensive set of tools for designing and automating ML pipelines, allowing data scientists and engineers to efficiently manage and monitor the training and deployment processes of ML models. With Kubeflow Pipelines, users can easily define, share, reuse, and automate complex workflows.

In the pipeline interface, you can upload pre-built .gz or .yaml files. Once the pipeline is created, it will be visually represented in the graphical interface as shown below. For detailed instructions on creating pipelines, refer to the official documentation.

Pipeline Interface

import kfp

from kfp import dsl

from kfp.dsl import component, Input, Output, Dataset, Model

_# Step 1:_ _数据下载和预处理_

@component(

    base_image='python:3.8-slim',

    packages_to_install=[

        'pandas',

        'scikit-learn',

        'joblib',

        'numpy',

        'requests'

    ]

)

def preprocess_data_op(output_data: Output[Dataset]):

    print("开始执行 preprocess_data_op...")

    try:

        import pandas as pd

        print("成功导入 pandas 模块。")

    except ImportError as e:

        print(f"导入 pandas 失败: {e}")

        raise e

    from sklearn.model_selection import train_test_split

    from sklearn.preprocessing import StandardScaler

    import os

    print("正在下载数据集...")

    url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"

    columns = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome']

    data = pd.read_csv(url, names=columns)

    print("数据集下载完成。")

    _#_ _数据清洗和特征工程_

    print("正在进行数据清洗和特征工程...")

    X = data.drop('Outcome', axis=1)

    y = data['Outcome']

    _#_ _标准化特征_

    print("正在标准化特征...")

    scaler = StandardScaler()

    X_scaled = scaler.fit_transform(X)

    _#_ _划分训练集和测试集_

    print("正在划分训练集和测试集...")

    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)

    _#_ _保存预处理后的数据_

    print("正在保存预处理后的数据...")

    os.makedirs(output_data.path, exist_ok=True)

    pd.DataFrame(X_train).to_csv(os.path.join(output_data.path, 'X_train.csv'), index=False)

    pd.DataFrame(X_test).to_csv(os.path.join(output_data.path, 'X_test.csv'), index=False)

    pd.DataFrame(y_train).to_csv(os.path.join(output_data.path, 'y_train.csv'), index=False)

    pd.DataFrame(y_test).to_csv(os.path.join(output_data.path, 'y_test.csv'), index=False)

    print(f"数据预处理完成并已保存到 {output_data.path}。")

_# Step 2:_ _模型训练_

@component(

    base_image='python:3.8-slim',

    packages_to_install=[

        'pandas',

        'scikit-learn',

        'joblib',

        'numpy'

    ]

)

def train_model_op(input_data: Input[Dataset], output_model: Output[Model]):

    print("开始执行 train_model_op...")

    try:

        import pandas as pd

        print("成功导入 pandas 模块。")

    except ImportError as e:

        print(f"导入 pandas 失败: {e}")

        raise e

    from sklearn.linear_model import LogisticRegression

    import joblib

    import os

    print("正在加载训练数据...")

    X_train = pd.read_csv(os.path.join(input_data.path, 'X_train.csv'))

    y_train = pd.read_csv(os.path.join(input_data.path, 'y_train.csv'))

    _#_ _训练模型_

    print("正在训练模型...")

    model = LogisticRegression()

    model.fit(X_train, y_train.values.ravel())

    _#_ _创建输出目录并保存模型_

    os.makedirs(output_model.path, exist_ok=True)  _#_ _确保输出目录存在_

    model_path = os.path.join(output_model.path, 'trained_model.joblib')

    joblib.dump(model, model_path)

    print(f"模型训练完成并已保存到 {model_path}。")

_# Step 3:_ _模型评估_

@component(

    base_image='python:3.8-slim',

    packages_to_install=[

        'pandas',

        'scikit-learn',

        'joblib',

        'numpy'

    ]

)

def evaluate_model_op(input_data: Input[Dataset], input_model: Input[Model]):

    print("开始执行 evaluate_model_op...")

    try:

        import pandas as pd

        print("成功导入 pandas 模块。")

    except ImportError as e:

        print(f"导入 pandas 失败: {e}")

        raise e

    from sklearn.metrics import accuracy_score

    import joblib

    import os  # 添加os模块的导入

    print("正在加载测试数据和模型...")

    X_test = pd.read_csv(os.path.join(input_data.path, 'X_test.csv'))

    y_test = pd.read_csv(os.path.join(input_data.path, 'y_test.csv'))

    model = joblib.load(os.path.join(input_model.path, 'trained_model.joblib'))

    _#_ _预测和评估_

    print("正在进行模型预测和评估...")

    y_pred = model.predict(X_test)

    accuracy = accuracy_score(y_test, y_pred)

    print(f"模型准确率: {accuracy}")

# Step 4: Pipeline 定义

@dsl.pipeline(

    name='Diabetes Classifier Pipeline',

    description='A pipeline to train and evaluate a diabetes classifier model'

)

def diabetes_pipeline():

    preprocess = preprocess_data_op()

    train = train_model_op(input_data=preprocess.outputs['output_data'])

    evaluate = evaluate_model_op(input_data=preprocess.outputs['output_data'], input_model=train.outputs['output_model'])

# Compile the pipeline

if __name__ == "__main__":

    kfp.compiler.Compiler().compile(diabetes_pipeline, 'diabetes_pipeline.yaml')
Enter fullscreen mode Exit fullscreen mode

The above is a usable pipeline file, which can be opened in the Pipelines module. The main workflow is shown as follows:

Pipeline Workflow

After running the pipeline, the results are as follows:

Pipeline Results

This Python file primarily defines a Kubeflow Pipelines (KFP) pipeline for data processing, machine learning model training, and evaluation for a diabetes classification task.

  • The file first defines the preprocess_data_op component, which downloads the diabetes dataset and performs data preprocessing. This includes data cleaning, feature standardization, and splitting into training and test sets. The preprocessed data is saved to a specified output path for later use in model training and evaluation.

  • Next, the train_model_op component is defined, which uses the preprocessed training data to train a logistic regression model. The trained model is saved to a specified output path. Using the scikit-learn library, this component quickly trains the model, providing a trained model file for the subsequent evaluation step.

  • The evaluate_model_op component is also defined in the file, which loads the test data and the trained model, then uses the test data to make predictions and evaluate the model. This component calculates and outputs the model's accuracy as a performance measure, helping to assess the model's effectiveness.

  • Finally, the file defines a pipeline named diabetes_pipeline, which links the above three components together. Using the dsl.pipeline decorator, the data preprocessing, model training, and model evaluation steps are combined into a complete workflow. This pipeline is ultimately compiled into a YAML file (diabetes_pipeline.yaml) that can be deployed and run in a Kubeflow environment.

Overall, this example demonstrates the basic usage of Argo Workflow, achieving automated workflow management in a Kubernetes environment by defining simple tasks and dependencies.

For tasks that need to be executed multiple times or have common preprocessing steps, previously processed caches can be reused based on their names to improve training efficiency.

Pipeline Caching

Katib

AutoML is a popular area in machine learning, mainly used for model optimization and hyperparameter tuning. Here, Katib, a Kubernetes-based AutoML project, is used. For more details, visit Katib on GitHub.

Katib mainly provides hyperparameter tuning, early stopping, and neural architecture search. It supports various hyperparameter optimization algorithms, including random search, Bayesian optimization, and HyperBand. These algorithms help developers efficiently search for the best hyperparameter configurations within a given parameter space. By defining experiments, users can specify the hyperparameters to be tuned, their ranges, the optimization objective (e.g., accuracy or loss), and the search algorithm to use. Katib also supports early stopping strategies, allowing experiments with poor performance to be dynamically terminated during training, thus saving computational resources. Additionally, Katib offers neural architecture search (NAS) capabilities, allowing users to explore different network architectures to further enhance model performance.

Create a new notebook as shown in Section 6.1. After creating a Python file, run the following command in the terminal to download the necessary libraries.

pip install kubeflow-katib

import kubeflow.katib as katib

_# Step 1. Create an objective function._

def objective(parameters):

    _# Import required packages._

    import time

    time.sleep(5)

    _# Calculate objective function._

    result = 4 * int(parameters["a"]) - float(parameters["b"]) ** 2

    _# Katib parses metrics in this format: <metric-name>=<metric-value>._

    print(f"result={result}")

_# Step 2. Create HyperParameter search space._

parameters = {

    "a": katib.search.int(min=10, max=20),

    "b": katib.search.double(min=0.1, max=0.2)

}

_# Step 3. Create Katib Experiment._

katib_client = katib.KatibClient()

name = "tune-experiment"

katib_client.tune(

    name=name,

    objective=objective,

    parameters=parameters,

    objective_metric_name="result",

    max_trial_count=12

)

_# Step 4. Get the best HyperParameters._

print(katib_client.get_optimal_hyperparameters(name))
Enter fullscreen mode Exit fullscreen mode

After running, you can view the parameter optimization files and the overall workflow in AutoML.

AutoML Workflow
Parameter Optimization

This code automatically performs hyperparameter optimization by defining an objective function, using a random search algorithm. The objective function is in the form of result = 4a - b^2, where parameter a is an integer ranging from 10 to 20, and parameter b is a floating-point number ranging from 0.1 to 0.2. The code first defines the objective function and then creates the search space for the hyperparameters. Next, Kubeflow Katib's KatibClient is used to start the experiment, specifying the objective function, parameter ranges, and the maximum number of experiments (12). Finally, the katib_client.get_optimal_hyperparameters(name) method retrieves and outputs the optimal combination of hyperparameters, achieving automated hyperparameter tuning to optimize the specified objective function.

Optimal Parameters
Hyperparameter Tuning

The value range for a is 10-20, and for b, it is 0.1-0.2. Under these conditions, the random search algorithm is used to find the maximum value of F.

Below is an example of a more complex function and the use of other algorithms for computation:

Complex Function

import kubeflow.katib as katib

_# Step 1. Create an objective function._

def objective(parameters):

    _# Import required packages._

    import time

    import math

    time.sleep(5)

    _# Calculate a more complex objective function._

    a = int(parameters["a"])

    b = float(parameters["b"])

    _#_ _复杂的目标函数示例:结合多项式、对数和三角函数的组合_

    result = (3 * a ** 2 + 2 * b - math.sin(a * b)) / (1 + math.log(b + 0.1)) + math.sqrt(abs(a - b))

    _# Katib parses metrics in this format: <metric-name>=<metric-value>._

    print(f"result={result}")

_# Step 2. Create HyperParameter search space._

parameters = {

    "a": katib.search.int(min=10, max=20),

    "b": katib.search.double(min=0.1, max=0.2)

}

_# Step 3. Create Katib Experiment._

katib_client = katib.KatibClient()

name = "tune-experiment-for-kalibbase"

katib_client.tune(

    name=name,

    objective=objective,

    parameters=parameters,

    algorithm_name="bayesianoptimization",

objective_metric_name="result",

#objective_type="minimize",  # 指定最小化目标函数

    max_trial_count=12

)

_# Step 4. Get the best HyperParameters._

print(katib_client.get_optimal_hyperparameters(name))
Enter fullscreen mode Exit fullscreen mode

Using a more complex function to optimize parameters, the Bayesian search algorithm is employed:

Bayesian Search Algorithm

Top comments (0)