Use this file to discover all available pages before exploring further.
ZenML allows you to deploy pipelines as persistent services that can be invoked on-demand. This is useful for inference pipelines, real-time processing, or any workflow that needs to respond to external triggers.
Ensure your pipeline accepts parameters for flexible invocation:
3
from typing import Optional, Annotatedfrom zenml import pipeline, step@stepdef simple_step(name: Optional[str] = None) -> str: """A simple step that returns a greeting.""" if name: message = f"Hello {name}! Welcome to ZenML" else: message = "Hello from ZenML!" print(message) return message@pipelinedef simple_pipeline(name: Optional[str] = None) -> Annotated[str, "greeting"]: """A pipeline that can be deployed and invoked. Args: name: Optional name to personalize the greeting Returns: A greeting message as an artifact """ greeting = simple_step(name=name) return greeting
from typing import List, Dictfrom zenml import pipeline, step@stepdef load_model(version: str = "latest"): """Load model from registry.""" # Load model logic return model@stepdef predict(model, data: List[Dict]) -> List[float]: """Generate predictions.""" # Inference logic return predictions@pipelinedef inference_pipeline( model_version: str = "latest", data_source: str = "api"): """Production inference pipeline. Args: model_version: Version of model to use data_source: Where to get inference data from """ model = load_model(version=model_version) data = load_inference_data(source=data_source) predictions = predict(model, data) store_predictions(predictions)
Deploy and invoke:
# Deployzenml pipeline deploy pipelines.inference_pipeline.inference_pipeline \ -n inference_api# Invoke with specific model versionzenml deployment invoke inference_api \ --model_version="v2.1.0" \ --data_source="live"
from datetime import datetime@pipelinedef batch_processing_pipeline( batch_id: str, process_date: str, num_records: int = 1000): """Process batches of data. Args: batch_id: Unique identifier for this batch process_date: Date to process (YYYY-MM-DD) num_records: Number of records to process """ data = fetch_batch_data(batch_id, process_date, num_records) processed = process_batch(data) results = analyze_batch(processed) store_results(results, batch_id)
# View deployment statuszenml deployment describe my_pipeline# List runs for a deploymentzenml pipeline runs list --deployment=my_pipeline# View logs for a specific runzenml pipeline runs logs RUN_ID