This article briefly introduces kubeflow and its deployment and use methods. Recently, I was researching the kubeflow platform and found that there is less information online. In addition, its version iteration is faster and some of the calling and usage methods have changed. Due to the work content, some of its main functions have been run through, such as katib parameter tuning, kubeflow pipline construction, and multi-user creation. Wait for specific cases.
kubeflow
Kubeflow is an open source platform designed for deploying, managing, and scaling machine learning (ML) workflows on Kubernetes, aiming to simplify the deployment and operation of machine learning projects in production environments. Container orchestration and resource management functions based on Kubernetes. By splitting the machine learning workflow into a series of containerized tasks, Kubeflow can take advantage of Kubernetes' automatic scaling, fault tolerance, and scheduling capabilities to ensure efficient execution of machine learning tasks. Kubeflow provides a complete set of tools and services to support everything from The entire machine learning life cycle from data preparation, model training, tuning to deployment. This end-to-end solution helps data scientists and engineers quickly transition from development to production.
Overall, kubeflow consists of the following components
- Kubeflow Central Dashboard provides a web interface for accessing Kubeflow and its components. It serves as a centralized hub, unifying various tools and services within the cluster for easier management of the machine learning platform.
- Kubeflow integrates with Jupyter Notebooks to offer an interactive environment for data exploration, experimentation, and model development. Notebooks support multiple languages like Python, R, and Scala, enabling collaborative and reproducible ML workflows.
- Kubeflow Pipelines allow users to define and run complex ML workflows as Directed Acyclic Graphs (DAGs). These pipelines help automate and manage end-to-end processes like data preprocessing, model training, evaluation, and deployment, enhancing reproducibility and collaboration. The Kubeflow Pipelines SDK is a set of Python packages for efficiently defining and executing ML workflows.
- Kubeflow Training Operator provides tools for large-scale training of ML models, supporting distributed training with frameworks like TensorFlow, PyTorch, and XGBoost. It leverages Kubernetes' scalability and resource management for efficient model training across clusters.
- Kubeflow Serving enables the deployment of trained ML models as scalable, production-ready services, supporting frameworks like TensorFlow Serving, Seldon Core, or custom servers. Models can be deployed for real-time or batch predictions via HTTP endpoints.
- Kubeflow Metadata is a centralized repository for managing metadata related to ML experiments, runs, and artifacts, ensuring reproducibility, collaboration, and governance across the workflow.
Environment Setup
The prerequisite for deploying Kubeflow is having a cluster. I am using Kubernetes version 1.26, installed and deployed with kubeadm. The corresponding Kubeflow version is 1.8.1.
NFS Installation
Since deploying Kubeflow requires a default storage class, here's how to install and deploy the NFS file system with the nfs-subdir-external-provisioner.
According to the official documentation, installing Kubeflow using the Kubeflow manifest requires a default storageclass, kustomize, and kubectl tools. To use the default storage class, the nfs-subdir-external-provisioner is deployed. It can automatically create and manage PV and PVC relationships. Below is the process for installing and deploying NFS and the NFS plugin.
Install the NFS service using the command apt install -y nfs-kernel-server
.
Create a data storage directory with mkdir -p /data/redis
.
Modify the contents of the /etc/exports file by adding the new configuration directory: /data/redis 192.168.0.0/24(rw,sync,no_all_squash,no_subtree_check,no_root_squash)
.
Restart the NFS service after completing the above steps.
Deploying NFS-Subdir-External-Provisioner
NFS-Subdir-External-Provisioner is a dynamic volume provisioner that uses an existing and configured NFS server to support dynamic provisioning of Kubernetes persistent volumes through persistent volume claims.
Before deploying the NFS-subdir service, ensure you have a node with NFS service and know the corresponding storage directory. In the example above, the node and directory are 192.168.0.208
and /data/redis
, respectively.
- Create a service account
apiVersion: v1
kind: ServiceAccount
metadata:
name: nfs-client-provisioner
namespace: default _#_ _替换成你要部署的_ _Namespace_
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: nfs-client-provisioner-runner
rules:
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: run-nfs-client-provisioner
subjects:
- kind: ServiceAccount
name: nfs-client-provisioner
namespace: default
roleRef:
kind: ClusterRole
name: nfs-client-provisioner-runner
apiGroup: rbac.authorization.k8s.io
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-client-provisioner
namespace: default
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: leader-locking-nfs-client-provisioner
namespace: default
subjects:
- kind: ServiceAccount
name: nfs-client-provisioner
namespace: default
roleRef:
kind: Role
name: leader-locking-nfs-client-provisioner
apiGroup: rbac.authorization.k8s.io
- Deploy NFS-Subdir-External-Provisioner and modify the configuration parameters as needed:
apiVersion: apps/v1
kind: Deployment
metadata:
name: nfs-client-provisioner
labels:
app: nfs-client-provisioner
spec:
replicas: 1
strategy:
type: Recreate # Set upgrade strategy to Recreate (default is RollingUpdate)
selector:
matchLabels:
app: nfs-client-provisioner
template:
metadata:
labels:
app: nfs-client-provisioner
spec:
serviceAccountName: nfs-client-provisioner
containers:
- name: nfs-client-provisioner
# image: gcr.io/k8s-staging-sig-storage/nfs-subdir-external-provisioner:v4.0.0
image: registry.cn-beijing.aliyuncs.com/xngczl/nfs-subdir-external-provisione:v4.0.0
volumeMounts:
- name: nfs-client-root
mountPath: /persistentvolumes
env:
- name: PROVISIONER_NAME # Name of the provisioner, must match the name set in the storage class
value: nfs-client
- name: NFS_SERVER # NFS server address, must match the value in the volumes parameter
value: 192.168.0.208
- name: NFS_PATH # NFS server data storage directory, must match the value in the volumes parameter
value: /data/redis
volumes:
- name: nfs-client-root
nfs:
server: 192.168.0.208 # NFS server address
path: /data/redis # NFS server data storage directory
- Create NFS StorageClass:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: nfs-storage
annotations:
storageclass.kubernetes.io/is-default-class: "true" # Set as the default storage class
provisioner: nfs-client # Must match the provisioner name set in the deployment
parameters:
archiveOnDelete: "true" # Set to "false" to delete PVC without retaining data; "true" retains data
Afterwards, apply the above files using kubectl apply -f. Also, grant permissions to the NFS storage directory with chmod 777 /data/redis.
Installing Kubeflow
Download the corresponding binary package skustomize_v5.0.3_linux_amd64.tar.gz from the official GitHub repository of the Kubeflow manifest project, and extract it to the /usr/local/bin
directory. Download the official Kubeflow manifest package mainfests-1.8.1.tar.gz.
Modify Default StorageClass
Edit the YAML files by adding storageClassName: nfs-storage
in the following files located in the mainfests-1.8.1
directory:
apps/katib/upstream/components/mysql/pvc.yaml
common/oidc-client/oidc-authservice/base/pvc.yaml
apps/pipeline/upstream/third-party/minio/base/minio-pvc.yaml
apps/pipeline/upstream/third-party/mysql/base/mysql-pv-claim.yaml
Modify APP_SECURE_COOKIES
Edit the configuration file to set APP_SECURE_COOKIES=false
, which disables the use of secure cookies.
Open the file with:
vim ./apps/jupyter/jupyter-web-app/upstream/base/params.env
Set APP_SECURE_COOKIES
to false
to ensure dashboard access to Kubeflow after deployment. You can also check for other instances that may need this change using the following command:
find ./ -type f -exec grep -l "APP_SECURE_COOKIES" {} ;
In most cases, only the Jupyter Web App (JWP) configuration needs this update.
Deploy Kubeflow
After making the necessary configuration changes, navigate to the manifest directory and run the following command to automatically deploy Kubeflow. Due to the large number of dependencies, issues may arise during installation. The previous configurations should resolve most problems, but if the installation fails or pods do not run correctly, further troubleshooting may be required.
while ! kustomize build example | kubectl apply -f -; do echo "Retrying to apply resources"; sleep 10; done
Modify SVC to NodePort Mode
Since the deployed services are all of the ClusterIP type, they cannot be accessed externally. Therefore, you need to manually change the istio-ingressgateway
service to NodePort type.
k edit svc -n istio-system istio-ingressgateway
The default login credentials are:
- Username:
user@example.com
- Password:
12341234
Using Kubeflow
Below is an introduction to the key components of Kubeflow:
- Notebook Servers: These serve as tools for managing interactive experiments, helping researchers quickly conduct algorithm experiments. They also provide unified document management capabilities.
- AutoML: This automates processes such as feature engineering, model selection, hyperparameter tuning, and model evaluation, reducing the number of manual experiments required.
- Pipeline: An engineering tool that organizes the different stages of an algorithmic workflow into a topology diagram. It can be combined with Argo to implement MLOps.
- Serverless: This allows models to be deployed directly as services, shortening the path from experimentation to production.
Model Development - Notebook
Kubeflow Notebooks is a critical component of the Kubeflow platform, enabling data scientists and ML engineers to run Jupyter Notebooks on Kubernetes. This significantly simplifies managing and using Jupyter Notebooks in cloud environments. With Kubeflow Notebooks, users can easily create and manage multiple Jupyter Notebook instances within a Kubernetes cluster. These instances can be configured with specific resources such as CPU, memory, and GPU, ensuring efficient resource isolation and utilization in a multi-user environment.
Create a new task through the notebook interface:
In the configuration page, you can set the name and allocate resources like CPU and GPU. After creation, the corresponding pod will appear. For GPU usage, refer to the official documentation.
Click "connect" to link to the corresponding pod. At this step, you might encounter issues with pulling images, requiring manual image pulls to the specified node before the pod can run.
Model Training
When creating a notebook, you can select an image with TensorFlow, allowing you to use the framework directly. Other images like PyTorch-CUDA are also available. After setting up, you can run a simple training code as shown below:
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten, Dropout, Conv2D, MaxPooling2D
_#_ _加载__MNIST__数据集_
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
_#_ _预处理数据:调整形状并归一化_
train_images = train_images.reshape(-1, 28, 28, 1).astype('float32') / 255.0
test_images = test_images.reshape(-1, 28, 28, 1).astype('float32') / 255.0
_#_ _将标签转换为_ _one-hot_ _编码_
train_labels = to_categorical(train_labels, 10)
test_labels = to_categorical(test_labels, 10)
_#_ _构建模型_
model = Sequential([
Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(28, 28, 1)),
MaxPooling2D(pool_size=(2, 2)),
Conv2D(64, kernel_size=(3, 3), activation='relu'),
MaxPooling2D(pool_size=(2, 2)),
Flatten(),
Dense(128, activation='relu'),
Dropout(0.5),
Dense(10, activation='softmax')
])
_#_ _编译模型_
model.compile(optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
_#_ _训练模型_
model.fit(train_images, train_labels, epochs=10, batch_size=64, validation_data=(test_images, test_labels))
_#_ _评估模型_
test_loss, test_accuracy = model.evaluate(test_images, test_labels, verbose=0)
print(f"Test accuracy: {test_accuracy:.4f}")
# 保存模型
model.save('mnist_cnn_model.keras')
print("模型保存成功")
and we have
also we can edit in a new jupyter books
import numpy as np
import tensorflow as tf
from PIL import Image, ImageOps, ImageFilter
import matplotlib.pyplot as plt
_#_ _加载保存的模型_
loaded_model = tf.keras.models.load_model('mnist_cnn_model.keras')
定义你自己的图片文件名列表
image_files = [
'mnist_test_0_label_9.png',
'mnist_test_2_label_8.png',
'mnist_test_4_label_2.png',
'mnist_test_1_label_0.png',
'mnist_test_3_label_7.png'
]
_#image_files = ['__图片__.jpg', '123.jpg', 'third.jpg', '9.jpg']_
def preprocess_image(img):
_#_ _转换为灰度图像_
img = img.convert('L')
_#_ _自动对比度增强_
img = ImageOps.autocontrast(img)
_#_ _裁剪数字的边缘并居中_
img = img.crop(img.getbbox()) _#_ _裁剪非空白区域_
img = img.resize((20, 20), Image.Resampling.LANCZOS) _#_ _调整图像大小,保持最大信息_
background = Image.new('L', (28, 28), 0) _#_ _创建黑色背景_
offset = ((28 - img.size[0]) // 2, (28 - img.size[1]) // 2)
background.paste(img, offset) _#_ _将图像粘贴到背景上使其居中_
return background
_#_ _创建一个图形,包含__5__行__2__列的子图_
fig, axs = plt.subplots(5, 2, figsize=(10, 25))
_#fig, axs = plt.subplots(4, 2, figsize=(10, 25))_
for i, file in enumerate(image_files):
_#_ _加载图像_
img = Image.open(file)
_#_ _对图像进行预处理_
processed_img = preprocess_image(img)
_#_ _将图像转换为数组并进行标准化,确保形状为_ _(28, 28, 1)_
img_array = np.array(processed_img).reshape(1, 28, 28, 1).astype('float32') / 255.0
_#_ _进行预测_
predictions = loaded_model.predict(img_array)
_#_ _获取预测结果_
predicted_digit = np.argmax(predictions[0])
_#_ _显示原始图片_
axs[i, 0].imshow(img, cmap='gray')
axs[i, 0].set_title(f'原始图片: {file}')
axs[i, 0].axis('off')
_#_ _显示处理后的图片_
axs[i, 1].imshow(processed_img, cmap='gray')
axs[i, 1].set_title(f'预测结果: {predicted_digit}')
axs[i, 1].axis('off')
print(f"{file} 预测的数字是: {predicted_digit}")
plt.tight_layout()
plt.show()
GPU Training
Using a GPU image requires GPU resources in our cluster, as shown below:
When selecting a GPU image, if you want to add a GPU but get a prompt indicating that no GPUs are available in the cluster, you need to physically install a GPU on a node and then add an operator to the cluster to use the GPU resources.
Below is the method for installing the NVIDIA GPU Operator:
Reference: Official NVIDIA Operator Installation Guide
Download and prepare Helm 3:
curl -fsSL -o get_helm.sh https://raw.githubusercontent.com/helm/helm/master/scripts/get-helm-3
&& chmod 700 get_helm.sh
&& ./get_helm.sh
Ensure NFD mode is disabled; if it's enabled, manually turn it off:
kubectl get nodes -o json | jq '.items[].metadata.labels | keys | any(startswith("feature.node.kubernetes.io"))'
Add the NVIDIA Helm repository:
helm repo add nvidia https://helm.ngc.nvidia.com/nvidia
&& helm repo update
Deploy the GPU Operator:
helm install --wait --generate-name
-n gpu-operator --create-namespace
nvidia/gpu-operator
--set driver.version=535
Note that the driver version should match your specific GPU model. For instance, I used version 535 because I have an A800 GPU. Make sure to select the correct driver version based on your NVIDIA GPU model.
After running the commands, you should see the following:
The node will now show schedulable resources as nvidia.com/gpu
:
Open Kubeflow to start using the GPU for training tasks. On this page, specify an image that includes CUDA, and select the available NVIDIA GPU in the cluster in the GPU configuration.
Once set up, enter Jupyter and create a Python 3 environment. You can then develop machine learning code and use the nvidia-smi
command within the pod to check and utilize the GPU.
Below is the code for training using the GPU:
import os
import tensorflow as tf
import time
import numpy as np
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt
print("====检查 GPU 可用性====")
os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'
_#_ _检查_ _GPU_ _是否可用_
if tf.test.is_gpu_available():
print("\033[1;32m[GPU 可用] 将进行 GPU 和 CPU 训练对比\033[0m")
gpu_device = tf.config.list_physical_devices('GPU')[0]
print(f"可用的 GPU: {gpu_device}")
else:
print("\033[1;31m[GPU 不可用] 只能使用 CPU 进行训练\033[0m")
exit()
print("\n====加载和预处理数据====")
(train_images, train_labels), (test_images, test_labels) = tf.keras.datasets.mnist.load_data()
train_images = train_images.reshape((60000, 28, 28, 1)).astype('float32') / 255
test_images = test_images.reshape((10000, 28, 28, 1)).astype('float32') / 255
def create_model():
model = models.Sequential([
layers.Conv2D(32, (3, 3), activation='relu', input_shape=(28, 28, 1)),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(64, (3, 3), activation='relu'),
layers.MaxPooling2D((2, 2)),
layers.Conv2D(64, (3, 3), activation='relu'),
layers.Flatten(),
layers.Dense(64, activation='relu'),
layers.Dense(10, activation='softmax')
])
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
return model
_# GPU_ _训练_
print("\n====开始 GPU 训练====")
with tf.device('/GPU:0'):
gpu_model = create_model()
start_time = time.time()
gpu_history = gpu_model.fit(train_images, train_labels, epochs=10,
validation_split=0.2, batch_size=64, verbose=1)
gpu_time = time.time() - start_time
_# CPU_ _训练_
print("\n====开始 CPU 训练====")
os.environ['CUDA_VISIBLE_DEVICES'] = '-1' _#_ _禁用_ _GPU_
with tf.device('/CPU:0'):
cpu_model = create_model()
start_time = time.time()
cpu_history = cpu_model.fit(train_images, train_labels, epochs=10,
validation_split=0.2, batch_size=64, verbose=1)
cpu_time = time.time() - start_time
_#_ _结果对比_
print("\n====训练时间对比====")
print(f"\033[1;34mGPU 训练时间: {gpu_time:.2f} 秒\033[0m")
print(f"\033[1;34mCPU 训练时间: {cpu_time:.2f} 秒\033[0m")
print(f"\033[1;32mGPU 加速比: {cpu_time / gpu_time:.2f}x\033[0m")
_#_ _绘制训练过程的损失和准确率曲线_
def plot_history(history, title):
acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(acc) + 1)
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(epochs, loss, 'bo-', label='Training loss')
plt.plot(epochs, val_loss, 'ro-', label='Validation loss')
plt.title(f'{title} - Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.subplot(1, 2, 2)
plt.plot(epochs, acc, 'bo-', label='Training accuracy')
plt.plot(epochs, val_acc, 'ro-', label='Validation accuracy')
plt.title(f'{title} - Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()
print("\n====可视化 GPU 训练过程====")
plot_history(gpu_history, "GPU Training")
print("\n====可视化 CPU 训练过程====")
plot_history(cpu_history, "CPU Training")
_#_ _评估_ _GPU_ _模型_
print("\n====评估 GPU 训练的模型====")
test_loss, test_acc = gpu_model.evaluate(test_images, test_labels, verbose=0)
print(f'\n\033[1;32mTest accuracy: {test_acc:.4f}\033[0m')
_#_ _保存_ _GPU_ _训练的模型_
print("\n====保存 GPU 训练的模型====")
gpu_model.save('mnist_model_gpu.keras')
print("模型已保存为 mnist_model_gpu.keras")
This code demonstrates the significant computational power of GPUs compared to CPUs, especially in deep learning tasks. The code first checks for available GPUs in the environment and then trains a simple Convolutional Neural Network (CNN) on both GPU and CPU, using the MNIST handwritten digits dataset for classification.
During training, you can monitor GPU utilization using the nvidia-smi
command, which provides real-time insights into GPU usage.
As shown in the image, GPU utilization during training is around 9%, indicating that the task successfully leveraged the GPU for computation.
Pipeline
Kubeflow Pipelines is a core module within the Kubeflow project, focused on building, deploying, and managing complex machine learning workflows. It offers a comprehensive set of tools for designing and automating ML pipelines, allowing data scientists and engineers to efficiently manage and monitor the training and deployment processes of ML models. With Kubeflow Pipelines, users can easily define, share, reuse, and automate complex workflows.
In the pipeline interface, you can upload pre-built .gz
or .yaml
files. Once the pipeline is created, it will be visually represented in the graphical interface as shown below. For detailed instructions on creating pipelines, refer to the official documentation.
import kfp
from kfp import dsl
from kfp.dsl import component, Input, Output, Dataset, Model
_# Step 1:_ _数据下载和预处理_
@component(
base_image='python:3.8-slim',
packages_to_install=[
'pandas',
'scikit-learn',
'joblib',
'numpy',
'requests'
]
)
def preprocess_data_op(output_data: Output[Dataset]):
print("开始执行 preprocess_data_op...")
try:
import pandas as pd
print("成功导入 pandas 模块。")
except ImportError as e:
print(f"导入 pandas 失败: {e}")
raise e
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import os
print("正在下载数据集...")
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/pima-indians-diabetes.data.csv"
columns = ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome']
data = pd.read_csv(url, names=columns)
print("数据集下载完成。")
_#_ _数据清洗和特征工程_
print("正在进行数据清洗和特征工程...")
X = data.drop('Outcome', axis=1)
y = data['Outcome']
_#_ _标准化特征_
print("正在标准化特征...")
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
_#_ _划分训练集和测试集_
print("正在划分训练集和测试集...")
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
_#_ _保存预处理后的数据_
print("正在保存预处理后的数据...")
os.makedirs(output_data.path, exist_ok=True)
pd.DataFrame(X_train).to_csv(os.path.join(output_data.path, 'X_train.csv'), index=False)
pd.DataFrame(X_test).to_csv(os.path.join(output_data.path, 'X_test.csv'), index=False)
pd.DataFrame(y_train).to_csv(os.path.join(output_data.path, 'y_train.csv'), index=False)
pd.DataFrame(y_test).to_csv(os.path.join(output_data.path, 'y_test.csv'), index=False)
print(f"数据预处理完成并已保存到 {output_data.path}。")
_# Step 2:_ _模型训练_
@component(
base_image='python:3.8-slim',
packages_to_install=[
'pandas',
'scikit-learn',
'joblib',
'numpy'
]
)
def train_model_op(input_data: Input[Dataset], output_model: Output[Model]):
print("开始执行 train_model_op...")
try:
import pandas as pd
print("成功导入 pandas 模块。")
except ImportError as e:
print(f"导入 pandas 失败: {e}")
raise e
from sklearn.linear_model import LogisticRegression
import joblib
import os
print("正在加载训练数据...")
X_train = pd.read_csv(os.path.join(input_data.path, 'X_train.csv'))
y_train = pd.read_csv(os.path.join(input_data.path, 'y_train.csv'))
_#_ _训练模型_
print("正在训练模型...")
model = LogisticRegression()
model.fit(X_train, y_train.values.ravel())
_#_ _创建输出目录并保存模型_
os.makedirs(output_model.path, exist_ok=True) _#_ _确保输出目录存在_
model_path = os.path.join(output_model.path, 'trained_model.joblib')
joblib.dump(model, model_path)
print(f"模型训练完成并已保存到 {model_path}。")
_# Step 3:_ _模型评估_
@component(
base_image='python:3.8-slim',
packages_to_install=[
'pandas',
'scikit-learn',
'joblib',
'numpy'
]
)
def evaluate_model_op(input_data: Input[Dataset], input_model: Input[Model]):
print("开始执行 evaluate_model_op...")
try:
import pandas as pd
print("成功导入 pandas 模块。")
except ImportError as e:
print(f"导入 pandas 失败: {e}")
raise e
from sklearn.metrics import accuracy_score
import joblib
import os # 添加os模块的导入
print("正在加载测试数据和模型...")
X_test = pd.read_csv(os.path.join(input_data.path, 'X_test.csv'))
y_test = pd.read_csv(os.path.join(input_data.path, 'y_test.csv'))
model = joblib.load(os.path.join(input_model.path, 'trained_model.joblib'))
_#_ _预测和评估_
print("正在进行模型预测和评估...")
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"模型准确率: {accuracy}")
# Step 4: Pipeline 定义
@dsl.pipeline(
name='Diabetes Classifier Pipeline',
description='A pipeline to train and evaluate a diabetes classifier model'
)
def diabetes_pipeline():
preprocess = preprocess_data_op()
train = train_model_op(input_data=preprocess.outputs['output_data'])
evaluate = evaluate_model_op(input_data=preprocess.outputs['output_data'], input_model=train.outputs['output_model'])
# Compile the pipeline
if __name__ == "__main__":
kfp.compiler.Compiler().compile(diabetes_pipeline, 'diabetes_pipeline.yaml')
The above is a usable pipeline file, which can be opened in the Pipelines module. The main workflow is shown as follows:
After running the pipeline, the results are as follows:
This Python file primarily defines a Kubeflow Pipelines (KFP) pipeline for data processing, machine learning model training, and evaluation for a diabetes classification task.
The file first defines the
preprocess_data_op
component, which downloads the diabetes dataset and performs data preprocessing. This includes data cleaning, feature standardization, and splitting into training and test sets. The preprocessed data is saved to a specified output path for later use in model training and evaluation.Next, the
train_model_op
component is defined, which uses the preprocessed training data to train a logistic regression model. The trained model is saved to a specified output path. Using thescikit-learn
library, this component quickly trains the model, providing a trained model file for the subsequent evaluation step.The
evaluate_model_op
component is also defined in the file, which loads the test data and the trained model, then uses the test data to make predictions and evaluate the model. This component calculates and outputs the model's accuracy as a performance measure, helping to assess the model's effectiveness.Finally, the file defines a pipeline named
diabetes_pipeline
, which links the above three components together. Using thedsl.pipeline
decorator, the data preprocessing, model training, and model evaluation steps are combined into a complete workflow. This pipeline is ultimately compiled into a YAML file (diabetes_pipeline.yaml
) that can be deployed and run in a Kubeflow environment.
Overall, this example demonstrates the basic usage of Argo Workflow, achieving automated workflow management in a Kubernetes environment by defining simple tasks and dependencies.
For tasks that need to be executed multiple times or have common preprocessing steps, previously processed caches can be reused based on their names to improve training efficiency.
Katib
AutoML is a popular area in machine learning, mainly used for model optimization and hyperparameter tuning. Here, Katib, a Kubernetes-based AutoML project, is used. For more details, visit Katib on GitHub.
Katib mainly provides hyperparameter tuning, early stopping, and neural architecture search. It supports various hyperparameter optimization algorithms, including random search, Bayesian optimization, and HyperBand. These algorithms help developers efficiently search for the best hyperparameter configurations within a given parameter space. By defining experiments, users can specify the hyperparameters to be tuned, their ranges, the optimization objective (e.g., accuracy or loss), and the search algorithm to use. Katib also supports early stopping strategies, allowing experiments with poor performance to be dynamically terminated during training, thus saving computational resources. Additionally, Katib offers neural architecture search (NAS) capabilities, allowing users to explore different network architectures to further enhance model performance.
Create a new notebook as shown in Section 6.1. After creating a Python file, run the following command in the terminal to download the necessary libraries.
pip install kubeflow-katib
import kubeflow.katib as katib
_# Step 1. Create an objective function._
def objective(parameters):
_# Import required packages._
import time
time.sleep(5)
_# Calculate objective function._
result = 4 * int(parameters["a"]) - float(parameters["b"]) ** 2
_# Katib parses metrics in this format: <metric-name>=<metric-value>._
print(f"result={result}")
_# Step 2. Create HyperParameter search space._
parameters = {
"a": katib.search.int(min=10, max=20),
"b": katib.search.double(min=0.1, max=0.2)
}
_# Step 3. Create Katib Experiment._
katib_client = katib.KatibClient()
name = "tune-experiment"
katib_client.tune(
name=name,
objective=objective,
parameters=parameters,
objective_metric_name="result",
max_trial_count=12
)
_# Step 4. Get the best HyperParameters._
print(katib_client.get_optimal_hyperparameters(name))
After running, you can view the parameter optimization files and the overall workflow in AutoML.
This code automatically performs hyperparameter optimization by defining an objective function, using a random search algorithm. The objective function is in the form of result = 4a - b^2
, where parameter a
is an integer ranging from 10 to 20, and parameter b
is a floating-point number ranging from 0.1 to 0.2. The code first defines the objective function and then creates the search space for the hyperparameters. Next, Kubeflow Katib's KatibClient
is used to start the experiment, specifying the objective function, parameter ranges, and the maximum number of experiments (12). Finally, the katib_client.get_optimal_hyperparameters(name)
method retrieves and outputs the optimal combination of hyperparameters, achieving automated hyperparameter tuning to optimize the specified objective function.
The value range for a
is 10-20, and for b
, it is 0.1-0.2. Under these conditions, the random search algorithm is used to find the maximum value of F
.
Below is an example of a more complex function and the use of other algorithms for computation:
import kubeflow.katib as katib
_# Step 1. Create an objective function._
def objective(parameters):
_# Import required packages._
import time
import math
time.sleep(5)
_# Calculate a more complex objective function._
a = int(parameters["a"])
b = float(parameters["b"])
_#_ _复杂的目标函数示例:结合多项式、对数和三角函数的组合_
result = (3 * a ** 2 + 2 * b - math.sin(a * b)) / (1 + math.log(b + 0.1)) + math.sqrt(abs(a - b))
_# Katib parses metrics in this format: <metric-name>=<metric-value>._
print(f"result={result}")
_# Step 2. Create HyperParameter search space._
parameters = {
"a": katib.search.int(min=10, max=20),
"b": katib.search.double(min=0.1, max=0.2)
}
_# Step 3. Create Katib Experiment._
katib_client = katib.KatibClient()
name = "tune-experiment-for-kalibbase"
katib_client.tune(
name=name,
objective=objective,
parameters=parameters,
algorithm_name="bayesianoptimization",
objective_metric_name="result",
#objective_type="minimize", # 指定最小化目标函数
max_trial_count=12
)
_# Step 4. Get the best HyperParameters._
print(katib_client.get_optimal_hyperparameters(name))
Using a more complex function to optimize parameters, the Bayesian search algorithm is employed:
Top comments (0)