Kubeflow: Kubeflow Pipelines (KFP)
Kubeflow Pipelines (KFP) is a platform for building and deploying portable, scalable machine learning (ML) workflows based on Docker containers. It enables end-to-end orchestration of ML tasks, making it easier to manage, track, and reproduce complex ML experiments.
1. Key Concepts in Kubeflow Pipelines
- Pipeline: An end-to-end ML workflow, defined as a sequence of steps.
- Component: A self-contained piece of code (typically a Python function) that performs one step in an ML workflow. Each component runs in its own Docker container.
- Task: An execution of a component within a pipeline.
- Artifacts: Inputs and outputs of pipeline components (e.g., datasets, models, metrics). KFP tracks these artifacts.
- Directed Acyclic Graph (DAG): Pipelines are defined as DAGs of components, where edges represent data dependencies (outputs of one component become inputs to another).
2. Setting up Kubeflow Pipelines (Conceptual)
Before running pipelines, you need a running Kubeflow installation on a Kubernetes cluster. This typically involves:
1. Kubernetes Cluster: A cluster (e.g., MiniKube, GKE, EKS, AKS) with Kubeflow installed.
2. Kubeflow Pipelines UI: Access to the KFP UI, which provides a dashboard for managing and visualizing pipelines.
3. KFP SDK: The Python SDK for defining and compiling pipelines (pip install kfp).
3. Defining a Simple Pipeline
A pipeline is defined using the KFP SDK in Python. You define components as Python functions annotated with @dsl.component and then compose them into a pipeline function annotated with @dsl.pipeline.
Example: A Simple "Hello World" Pipeline
This pipeline demonstrates two simple components that pass a string between them.
import kfp
from kfp import dsl
from kfp.compiler import Compiler # Required for compiling the pipeline
# --- Define Components ---
# A component is a Python function decorated with @dsl.component.
# Inputs and outputs are type-hinted.
@dsl.component
def greet_op(name: str) -> str:
"""A component that greets a given name."""
print(f"Hello, {name}!")
greeting = f"Hello, {name} from component!"
# Outputs are returned from the function
return greeting
@dsl.component
def capitalize_op(input_string: str) -> str:
"""A component that capitalizes an input string."""
print(f"Received: {input_string}")
capitalized = input_string.upper()
return capitalized
# --- Define the Pipeline ---
# A pipeline is a Python function decorated with @dsl.pipeline.
# It defines the workflow using component tasks.
@dsl.pipeline(
name='Hello World KFP Pipeline',
description='A simple pipeline demonstrating component interaction.'
)
def hello_world_pipeline(person_name: str = 'World'):
"""
This pipeline greets a person and then capitalizes the greeting.
"""
# Create a task from the 'greet_op' component
# The output of the greet_task can be accessed via greet_task.output
greet_task = greet_op(name=person_name)
# Create a task from the 'capitalize_op' component
# Pass the output of greet_task as input to capitalize_op
capitalize_task = capitalize_op(input_string=greet_task.output)
# You can also print the final output for logging purposes
# Note: dsl.ResourceOp is deprecated, for simple printing, direct print in component is often enough
# or create a dedicated logger component.
# --- Compile the Pipeline ---
# Compiling converts the Python pipeline definition into a YAML file
# that can be uploaded to the Kubeflow Pipelines UI.
pipeline_filename = 'hello_world_pipeline.yaml'
compiler = Compiler()
compiler.compile(hello_world_pipeline, pipeline_filename)
print(f"Pipeline compiled to {pipeline_filename}")
# To run this pipeline, you would typically upload 'hello_world_pipeline.yaml'
# to your Kubeflow Pipelines UI and create an experiment run.
# Or, if using the KFP client directly:
# client = kfp.Client()
# client.create_run_from_pipeline_func(hello_world_pipeline, arguments={'person_name': 'KFP User'})
4. Components with Inputs, Outputs, and Artifacts
Components can have various types of inputs and outputs, including strings, numbers, booleans, and more complex types like Dataset, Model, Metrics, which are treated as KFP artifacts.
import kfp
from kfp import dsl
from kfp.compiler import Compiler
from kfp.dsl import Output, Model, Dataset, Metrics # For typing artifacts
from typing import NamedTuple
# Define component that generates data and a simple model
@dsl.component
def data_gen_and_train_op(
message: str,
output_dataset: Output[Dataset], # Declare output artifact of type Dataset
output_model: Output[Model] # Declare output artifact of type Model
):
"""Generates dummy data and 'trains' a dummy model."""
import pandas as pd
import numpy as np
import joblib # For saving model
print(message)
# Simulate data generation
data = pd.DataFrame({
'feature1': np.random.rand(10),
'feature2': np.random.rand(10),
'target': np.random.randint(0, 2, 10)
})
data.to_csv(output_dataset.path, index=False) # Save data to artifact path
print(f"Dummy dataset saved to {output_dataset.path}")
# Simulate model training (e.g., a simple scikit-learn model)
class DummyModel:
def predict(self, x):
return np.random.randint(0, 2, size=len(x))
model = DummyModel()
joblib.dump(model, output_model.path) # Save model to artifact path
print(f"Dummy model saved to {output_model.path}")
@dsl.component
def evaluate_op(
input_dataset: Input[Dataset], # Declare input artifact of type Dataset
input_model: Input[Model], # Declare input artifact of type Model
output_metrics: Output[Metrics] # Declare output artifact of type Metrics
) -> NamedTuple('EvaluationOutput', [('accuracy', float)]):
"""Evaluates the dummy model on the dummy dataset."""
import pandas as pd
import joblib
from sklearn.metrics import accuracy_score
import json # For saving metrics
# Load data and model from artifact paths
data = pd.read_csv(input_dataset.path)
model = joblib.load(input_model.path)
X = data[['feature1', 'feature2']]
y_true = data['target']
y_pred = model.predict(X)
accuracy = accuracy_score(y_true, y_pred)
print(f"Model Accuracy: {accuracy:.4f}")
# Save metrics to artifact path
metrics_data = {'accuracy': accuracy, 'dataset_size': len(data)}
with open(output_metrics.path, 'w') as f:
json.dump(metrics_data, f)
print(f"Metrics saved to {output_metrics.path}")
return (accuracy,) # Return accuracy as a NamedTuple output
@dsl.pipeline(
name='ML Pipeline with Artifacts',
description='A pipeline demonstrating data generation, training, and evaluation with artifacts.'
)
def ml_pipeline_artifacts(initial_message: str = 'Starting ML process!'):
data_train_task = data_gen_and_train_op(message=initial_message)
evaluate_task = evaluate_op(
input_dataset=data_train_task.outputs['output_dataset'], # Use output artifact from previous task
input_model=data_train_task.outputs['output_model']
)
print_accuracy = dsl.ContainerOp(
name='Print Accuracy',
image='python:3.9', # A simple image to run Python
command=['python', '-c'],
arguments=[
f'import json; with open("{evaluate_task.outputs["accuracy"]}", "r") as f: accuracy = f.read(); print(f"Final Accuracy from Pipeline: {{accuracy}}")'
]
)
pipeline_filename_ml = 'ml_pipeline_artifacts.yaml'
compiler.compile(ml_pipeline_artifacts, pipeline_filename_ml)
print(f"ML Pipeline with artifacts compiled to {pipeline_filename_ml}")
5. Advanced Pipeline Features (Conceptual)
- Conditional Execution: Running steps only if certain conditions are met (
dsl.cond()). - Loops: Running a step multiple times with different parameters (
dsl.for_each()). - Volume Mounts: Sharing data between components using persistent volumes.
- Resource Customization: Specifying CPU, memory, and GPU requirements for each component.
- Caching: KFP can cache component runs to avoid re-executing steps with the same inputs, speeding up development.
Further Topics:
- Building Custom Components (from Python functions, Docker images, or component YAMLs).
- Managing pipeline versions and experiments.
- Working with Kubeflow SDK Client for programmatically interacting with KFP.
- Integration with other Kubeflow components (KFServing, Katib).
- Debugging pipelines.
Kubeflow Pipelines simplifies the development, deployment, and management of scalable and reproducible ML workflows on Kubernetes, making it a cornerstone for MLOps.