Use this file to discover all available pages before exploring further.
Materializers handle the serialization and deserialization of artifacts passed between pipeline steps. Custom materializers allow you to work with any data type, from simple objects to complex ML models.
All materializers inherit from BaseMaterializer and implement key methods:
from zenml.materializers import BaseMaterializerfrom typing import Type, Anyclass BaseMaterializer: """Base class for all materializers.""" # Required class attributes ASSOCIATED_TYPES = () # Types this materializer handles ASSOCIATED_ARTIFACT_TYPE = ArtifactType.BASE # Category def __init__(self, uri: str, artifact_store = None): """Initialize with storage location. Args: uri: URI where artifact will be stored artifact_store: The artifact store instance """ self.uri = uri self._artifact_store = artifact_store def save(self, data: Any) -> None: """Save artifact data to self.uri.""" raise NotImplementedError def load(self, data_type: Type[Any]) -> Any: """Load artifact data from self.uri.""" raise NotImplementedError
For collections (lists, dataframes), implement item loading:
from typing import Optionalclass PandasMaterializer(BaseMaterializer): # ... save/load methods ... def get_item_count(self, data: Any) -> Optional[int]: """Return number of items in the artifact. Args: data: The pandas DataFrame or Series Returns: Number of items (rows for Series, columns for DataFrame) """ import pandas as pd if isinstance(data, pd.Series): return len(data) elif isinstance(data, pd.DataFrame): return int(data.shape[1]) # Number of columns return None def load_item(self, data_type: Type[Any], index: int) -> Any: """Load a specific item from the collection. Args: data_type: Expected type index: Item index to load Returns: The specific item """ # Load full data data = self.load(data_type) # Return specific item return data[index]
This enables mapping operations over artifacts:
@stepdef process_row(row: pd.Series) -> dict: """Process a single row.""" return row.to_dict()@pipelinedef batch_processing_pipeline(): df = load_dataframe() # Map over each row results = process_row.map(df)
class CustomMaterializer(BaseMaterializer): def __init__(self, uri: str, artifact_store=None): super().__init__(uri, artifact_store) try: import special_library self.special_library_available = True except ImportError: self.special_library_available = False from zenml.logger import get_logger logger = get_logger(__name__) logger.warning( "special_library not installed. Install it with: " "pip install special_library" ) def save(self, data: Any) -> None: if self.special_library_available: # Use efficient format self._save_with_special_library(data) else: # Fall back to standard format self._save_with_json(data)
For materializers that need local file operations:
class ComplexMaterializer(BaseMaterializer): def save(self, data: Any) -> None: """Save using temporary directory for intermediate files.""" # Get temporary directory that cleans up after step finishes with self.get_temporary_directory( delete_at_exit=False, delete_after_step_execution=True, ) as temp_dir: # Do complex processing in temp directory temp_file = os.path.join(temp_dir, "intermediate.tmp") # ... processing ... # Copy final result to artifact store with open(temp_file, "rb") as src: final_path = os.path.join(self.uri, "data.bin") with self.artifact_store.open(final_path, "wb") as dst: dst.write(src.read())