A hands-on tutorial repository to learn ZenML from scratch. This repository contains progressively complex examples that teach you the core concepts of ZenML pipelines.
ZenML is an extensible, open-source MLOps framework for creating portable, production-ready ML pipelines. This repository provides a structured learning path with 6 practical examples that build on each other. After completing this tutorial feel free to explore an end to end MLOps project using ZenML with data drift simulation here.
Before starting, make sure you have:
- Python 3.8 or higher installed
- Basic understanding of Python decorators
- Familiarity with ML/data science concepts (helpful but not required)
git clone https://github.com/ChristusJoy/learning-zenml.git
cd learning-zenmlpython -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activatepip install zenmlzenml initThis creates a .zen directory to track your ZenML configuration.
zenml login --localThis launches a local dashboard where you can visualize your pipelines, runs, and artifacts.
Follow these examples in order. Each builds on concepts from the previous one.
File: hello_pipeline.py
Concepts Covered:
@stepdecorator - defines a unit of work@pipelinedecorator - chains steps together- Running a pipeline
- Accessing step outputs
Code Walkthrough:
from zenml import pipeline, step
from zenml.logger import get_logger
logger = get_logger(__name__)
@step
def say_hello() -> str:
logger.info("Executing say_hello step")
return "Hello World!"
@pipeline
def hello_pipeline() -> str:
message = say_hello()
return messageKey Takeaways:
- A step is a Python function decorated with
@step- it's the smallest unit of work - A pipeline is a function decorated with
@pipelinethat orchestrates steps - Steps must have type hints for inputs and outputs
- ZenML automatically tracks all executions
Run It:
python hello_pipeline.pyWhat Happens:
- ZenML registers the pipeline and step
- The step executes and returns "Hello World!"
- The output is stored as an artifact in ZenML
- You can retrieve the output programmatically
File: io_pipeline.py
Concepts Covered:
- Steps with multiple outputs
- Passing data between steps
- Using
Annotatedfor named outputs - Tuple unpacking for multiple returns
Code Walkthrough:
from typing import Tuple
from typing_extensions import Annotated
from zenml import pipeline, step
@step
def load_data() -> Tuple[
Annotated[list[int], "features"],
Annotated[list[int], "labels"]
]:
return [1, 2, 3, 4], [1, 0, 1, 0]
@step
def count_rows(features: list[int], labels: list[int]) -> Annotated[int, "row_count"]:
return len(features)
@pipeline
def io_pipeline() -> int:
features, labels = load_data()
row_count = count_rows(features, labels)
return row_countKey Takeaways:
- Use
Annotated[type, "name"]to give outputs meaningful names - Multiple outputs use
Tuplewith named annotations - Outputs from one step become inputs to another
- ZenML tracks the data lineage - which step produced which data
Run It:
python io_pipeline.pyWhat Happens:
load_data()produces two artifacts: "features" and "labels"- Both artifacts are passed to
count_rows() - The final "row_count" artifact is stored
- All data dependencies are tracked automatically
File: param_pipeline.py
Concepts Covered:
- Pipeline parameters
- Step parameters with defaults
- Configuring pipelines at runtime
Code Walkthrough:
@step
def multiply(number: int, factor: int = 2) -> Annotated[int, "product"]:
result = number * factor
return result
@pipeline
def param_pipeline(number: int = 3, factor: int = 2) -> int:
result = multiply(number=number, factor=factor)
return result
# Run with custom parameters
run = param_pipeline(number=5, factor=10)Key Takeaways:
- Pipelines can accept runtime parameters
- Steps can have default values for parameters
- Parameters are tracked with each run for reproducibility
- Different runs can use different configurations
Run It:
python param_pipeline.pyExperiment:
- Modify the
numberandfactorvalues in the script - Run multiple times and compare results in the dashboard
File: cache_pipeline.py
Concepts Covered:
- Step caching with
enable_cache=True - How ZenML skips redundant computations
- When caching is triggered
Code Walkthrough:
import time
@step(enable_cache=True)
def slow_step() -> Annotated[int, "answer"]:
logger.info("π Actually computing result... (sleeping 3 seconds)")
time.sleep(3)
return 42
@pipeline
def cache_pipeline():
slow_step()
# First run - takes 3 seconds
cache_pipeline()
# Second run - instant (cached!)
cache_pipeline()Key Takeaways:
- Caching prevents re-execution when inputs haven't changed
- First run executes fully; subsequent runs use cached results
- Caching is determined by: step code, input data, and parameters
- Saves time and compute resources in iterative development
Run It:
python cache_pipeline.pyObserve:
- Run 1: You'll see the "Actually computing..." message and 3-second delay
- Run 2: Instant completion - the step was skipped entirely!
File: meta_pipeline.py
Concepts Covered:
- Logging metadata with
log_metadata() - Tracking metrics, hyperparameters, or any key-value data
- Viewing metadata in the dashboard
Code Walkthrough:
from zenml import log_metadata, pipeline, step
@step
def compute_accuracy() -> Annotated[float, "accuracy_metric"]:
acc = 0.93
# Log metadata - visible in ZenML dashboard
log_metadata({"accuracy": acc})
return acc
@pipeline
def meta_pipeline() -> float:
accuracy = compute_accuracy()
return accuracyKey Takeaways:
log_metadata()attaches key-value pairs to steps/runs- Metadata appears in the ZenML dashboard as cards
- Useful for logging: metrics, hyperparameters, dataset stats, model info
- Fully searchable and comparable across runs
Run It:
python meta_pipeline.pyWhat to Check:
- Open the ZenML dashboard
- Navigate to the run
- See the "accuracy" metadata card on the step
File: tagged_pipeline.py
Concepts Covered:
- Artifact configuration with
ArtifactConfig - Static tags on artifacts
- Dynamic tagging with
add_tags() - Cascade tags at pipeline level
- Using pandas DataFrames as artifacts
Code Walkthrough:
import pandas as pd
from zenml import ArtifactConfig, Tag, add_tags, pipeline, step
@step
def create_raw_data() -> Annotated[
pd.DataFrame,
ArtifactConfig(name="raw_data", tags=["raw", "input"])
]:
data = pd.DataFrame({
"feature_1": [1, 2, 3, 4, 5],
"feature_2": [10, 20, 30, 40, 50],
"target": [0, 1, 0, 1, 0]
})
return data
@step
def process_data(raw_data: pd.DataFrame) -> Annotated[
pd.DataFrame,
ArtifactConfig(name="processed_data", tags=["processed"])
]:
processed = raw_data.copy()
# Normalize features
processed["feature_1"] = processed["feature_1"] / processed["feature_1"].max()
processed["feature_2"] = processed["feature_2"] / processed["feature_2"].max()
# Add dynamic tags
add_tags(tags=["normalized", "ready_for_training"], infer_artifact=True)
return processed
# Pipeline-level tags cascade to all artifacts
@pipeline(tags=["tutorial", Tag(name="experiment", cascade=True)])
def tagged_pipeline():
raw_data = create_raw_data()
processed_data = process_data(raw_data)
return processed_dataKey Takeaways:
ArtifactConfiglets you name artifacts and add static tagsadd_tags()adds tags dynamically during step executionTag(name="...", cascade=True)applies tags to all artifacts in a pipeline- Tags help organize, filter, and search artifacts
- ZenML handles pandas DataFrames (and many other types) automatically
Run It:
python tagged_pipeline.pyExplore:
- Check the dashboard for artifact tags
- Filter artifacts by tag in the UI
- Run multiple times to see tags accumulate
The ZenML dashboard provides a visual interface to explore your ML workflows.
zenml login --local| Feature | Description |
|---|---|
| Pipelines | All registered pipelines |
| Runs | Execution history with status |
| DAG View | Visual step dependencies |
| Artifacts | All produced data with lineage |
| Metadata | Logged metrics and info |
| Tags | Filter and organize artifacts |
| Concept | Description | Example |
|---|---|---|
| Step | Single unit of work | @step def my_step(): ... |
| Pipeline | Chain of steps | @pipeline def my_pipeline(): ... |
| Artifact | Data produced by steps | Return values from steps |
| Annotated | Named outputs | Annotated[int, "count"] |
| Caching | Skip redundant work | @step(enable_cache=True) |
| Metadata | Key-value tracking | log_metadata({"key": value}) |
| Tags | Organize artifacts | ArtifactConfig(tags=["tag1"]) |
1. hello_pipeline.py β Understand @step and @pipeline basics
β
2. io_pipeline.py β Learn data flow between steps
β
3. param_pipeline.py β Make pipelines configurable
β
4. cache_pipeline.py β Optimize with caching
β
5. meta_pipeline.py β Track metrics and metadata
β
6. tagged_pipeline.py β Organize with tags (advanced)
After completing this tutorial, Checkout my Vehicle insuracne ZenMl project which explores an end to end pipeline for a sample project.
- Stacks & Components - Deploy to cloud infrastructure
- Model Registry - Version and manage models
- Experiment Tracking - Integrate with MLflow, W&B
- Deployment - Serve models in production
- Custom Materializers - Handle custom data types
This project is licensed under the MIT License - see the LICENSE file for details.