Pipelines¶
The pipelines module provides DAG-based workflow orchestration with automatic parallelism, multi-turn conversation support, and usage tracking.
Pipeline¶
fastroai.pipelines.Pipeline
¶
Declarative DAG pipeline for multi-step AI workflows.
Features: - Automatic parallelism from dependencies - Type-safe dependency access - Early termination on INCOMPLETE status - Aggregated usage tracking - Distributed tracing
Examples:
pipeline = Pipeline(
name="document_processor",
steps={
"extract": ExtractStep(),
"classify": ClassifyStep(),
"summarize": SummarizeStep(),
},
dependencies={
"classify": ["extract"],
"summarize": ["classify"],
},
)
result = await pipeline.execute({"document": doc}, deps, tracer)
summary = result.output
Parallelism example - steps at the same level run in parallel:
dependencies = {
"classify": ["extract"],
"fetch_market": ["classify"],
"fetch_user": ["classify"], # Same dep as above
"calculate": ["fetch_market", "fetch_user"],
}
# Execution:
# Level 0: extract
# Level 1: classify
# Level 2: fetch_market, fetch_user (PARALLEL)
# Level 3: calculate
__init__(name, steps, dependencies=None, output_step=None, config=None, step_configs=None)
¶
Initialize Pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Pipeline name (for tracing). |
required |
steps
|
dict[str, BaseStep[DepsT, Any]]
|
Dict of step_id -> step instance. |
required |
dependencies
|
dict[str, list[str]] | None
|
Dict of step_id -> [dependency_ids]. |
None
|
output_step
|
str | None
|
Which step's output is the pipeline output. Defaults to last step in topological order. |
None
|
config
|
PipelineConfig | None
|
Default configuration for all steps (timeout, retries, budget). |
None
|
step_configs
|
dict[str, StepConfig] | None
|
Per-step configuration overrides. |
None
|
Config Resolution (most specific wins): 1. Pipeline default config 2. Step class config (if step has .config attribute) 3. step_configs[step_id] override 4. Per-call overrides via ctx.run(timeout=..., retries=...)
Raises:
| Type | Description |
|---|---|
PipelineValidationError
|
Invalid deps, unknown output_step, or cycles. |
Examples:
execute(input_data, deps, tracer=None)
async
¶
Execute the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_data
|
InputT
|
Input accessible via context.get_input(). |
required |
deps
|
DepsT
|
Your deps accessible via context.deps. |
required |
tracer
|
Tracer | None
|
For distributed tracing. |
None
|
Returns:
| Type | Description |
|---|---|
PipelineResult[OutputT]
|
PipelineResult with output and usage. |
Raises:
| Type | Description |
|---|---|
StepExecutionError
|
If any step fails. |
Examples:
# Basic execution
result = await pipeline.execute(
{"document": "Hello world"},
deps=MyDeps(api_key="..."),
)
print(result.output)
# With tracing
from fastroai import SimpleTracer
tracer = SimpleTracer()
result = await pipeline.execute(
{"document": doc},
deps=deps,
tracer=tracer,
)
# Handle early termination (multi-turn)
if result.stopped_early:
missing = result.conversation_state.context["missing"]
return {"status": "incomplete", "missing": missing}
# Access usage metrics
if result.usage:
print(f"Cost: ${result.usage.total_cost_dollars:.6f}")
print(f"Tokens: {result.usage.total_input_tokens}")
PipelineResult¶
fastroai.pipelines.PipelineResult
¶
Result from pipeline execution.
Attributes:
| Name | Type | Description |
|---|---|---|
output |
OutputT | None
|
Final step's output, or None if stopped early. |
step_outputs |
dict[str, Any]
|
All step outputs by ID. |
conversation_state |
ConversationState[Any] | None
|
ConversationState if a step returned one. |
usage |
PipelineUsage | None
|
Aggregated usage metrics. |
stopped_early |
bool
|
True if stopped due to INCOMPLETE status. |
Examples:
result = await pipeline.execute(data, deps)
if result.stopped_early:
missing = result.conversation_state.context["missing"]
return {"status": "incomplete", "missing": missing}
print(f"Cost: ${result.usage.total_cost_dollars:.6f}")
return {"status": "complete", "output": result.output}
BaseStep¶
fastroai.pipelines.BaseStep
¶
Abstract base class for pipeline steps.
A step is one unit of work. It: - Receives context with inputs and dependencies - Does something (AI call, computation, API call) - Returns typed output
Steps should be stateless. Any state goes in deps or inputs.
Examples:
class ExtractStep(BaseStep[MyDeps, ExtractionResult]):
'''Extract entities from document.'''
def __init__(self):
self.agent = FastroAgent(system_prompt="Extract entities.")
async def execute(self, context: StepContext[MyDeps]) -> ExtractionResult:
document = context.get_input("document")
response = await self.agent.run(f"Extract: {document}")
return ExtractionResult.model_validate_json(response.content)
execute(context)
abstractmethod
async
¶
Execute step logic.
Override this method to implement your step's behavior.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
StepContext[DepsT]
|
Execution context with inputs, deps, and step outputs. |
required |
Returns:
| Type | Description |
|---|---|
OutputT
|
The step's typed output. |
StepContext¶
fastroai.pipelines.StepContext
¶
Execution context provided to pipeline steps.
Provides access to: - Pipeline inputs (the data passed to execute()) - Outputs from dependency steps - Application dependencies (your db session, user, etc.) - Tracer for custom spans
Examples:
class ProcessStep(BaseStep[MyDeps, Result]):
async def execute(self, context: StepContext[MyDeps]) -> Result:
# Get pipeline input
document = context.get_input("document")
# Get output from dependency step
classification = context.get_dependency("classify", Classification)
# Access your deps
db = context.deps.session
user_id = context.deps.user_id
# Custom tracing
if context.tracer:
async with context.tracer.span("custom_operation"):
result = await process(document)
return result
step_id
property
¶
Current step's ID.
Returns:
| Type | Description |
|---|---|
str
|
The step identifier string. |
deps
property
¶
Application dependencies (your session, user, etc.).
Returns:
| Type | Description |
|---|---|
DepsT
|
The dependencies object passed to pipeline.execute(). |
tracer
property
¶
Tracer for custom spans.
Returns:
| Type | Description |
|---|---|
Tracer | None
|
The tracer instance, or None if no tracing. |
usage
property
¶
Accumulated usage from all ctx.run() calls in this step.
Returns:
| Type | Description |
|---|---|
StepUsage
|
StepUsage with aggregated tokens and cost. |
config
property
¶
Configuration for this step (timeout, retries, budget).
Returns:
| Type | Description |
|---|---|
StepConfig
|
The resolved StepConfig for this step. |
__init__(step_id, inputs, deps, step_outputs, tracer=None, config=None)
¶
Initialize step context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_id
|
str
|
Unique identifier for this step. |
required |
inputs
|
dict[str, Any]
|
Pipeline inputs passed to execute(). |
required |
deps
|
DepsT
|
Application dependencies (db session, user, etc.). |
required |
step_outputs
|
dict[str, Any]
|
Outputs from completed dependency steps. |
required |
tracer
|
Tracer | None
|
Optional tracer for distributed tracing. |
None
|
config
|
StepConfig | None
|
Step configuration (timeout, retries, budget). |
None
|
get_input(key, default=None)
¶
Get value from pipeline inputs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
The input key to retrieve. |
required |
default
|
Any
|
Value to return if key not found. |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
The input value, or default if not present. |
Examples:
get_dependency(step_id, output_type=None)
¶
Get output from a dependency step.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_id
|
str
|
ID of the dependency step. |
required |
output_type
|
type[T] | None
|
Expected type (for IDE/type checker, not enforced). |
None
|
Returns:
| Type | Description |
|---|---|
T
|
The output from the dependency step. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If step_id not in dependencies or hasn't run. |
Examples:
get_dependency_or_none(step_id, output_type=None)
¶
Get output from a dependency step, or None if not available.
Use this for optional dependencies that may not have run.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_id
|
str
|
ID of the dependency step. |
required |
output_type
|
type[T] | None
|
Expected type (for IDE/type checker, not enforced). |
None
|
Returns:
| Type | Description |
|---|---|
T | None
|
The output from the dependency step, or None if not available. |
Examples:
class EnhanceStep(BaseStep[MyDeps, str]):
async def execute(self, ctx: StepContext[MyDeps]) -> str:
# Required dependency
base_content = ctx.get_dependency("extract")
# Optional dependency - might not exist
metadata = ctx.get_dependency_or_none("fetch_metadata", dict)
if metadata:
return f"{base_content} (with metadata)"
return base_content
run(agent, message, *, timeout=None, retries=None)
async
¶
Run an agent with automatic tracer, usage tracking, and config.
This is THE way to call agents from within a step. It: - Passes deps and tracer automatically - Accumulates usage in ctx.usage - Enforces cost budget (raises CostBudgetExceededError if exceeded) - Supports timeout and retries (from config or per-call override)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent
|
FastroAgent[OutputT]
|
The FastroAgent to run. |
required |
message
|
str
|
The message/prompt to send. |
required |
timeout
|
float | None
|
Per-call timeout override (seconds). Uses config if None. |
None
|
retries
|
int | None
|
Per-call retries override. Uses config if None. |
None
|
Returns:
| Type | Description |
|---|---|
ChatResponse[OutputT]
|
ChatResponse with output, content, usage data, etc. |
Raises:
| Type | Description |
|---|---|
CostBudgetExceededError
|
If cost_budget is set and exceeded. |
TimeoutError
|
If timeout exceeded after all retries. |
Examples:
class MyStep(BaseStep[MyDeps, str]):
classifier = FastroAgent(model="gpt-4o-mini", output_type=Category)
writer = FastroAgent(model="gpt-4o")
async def execute(self, ctx: StepContext[MyDeps]) -> str:
# Both calls tracked in ctx.usage
category = await ctx.run(self.classifier, "Classify this")
result = await ctx.run(self.writer, f"Write about {category.output}")
return result.content
# With per-call overrides:
response = await ctx.run(agent, "msg", timeout=30.0, retries=2)
step¶
fastroai.pipelines.step(func=None, *, timeout=None, retries=0, retry_delay=1.0, cost_budget=None)
¶
Decorator to create a pipeline step from a function.
Can be used with or without arguments:
@step
async def my_step(ctx): ...
@step(timeout=30.0, retries=2)
async def my_step(ctx): ...
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Callable[..., Any] | None
|
The function to wrap (when used without parentheses). |
None
|
timeout
|
float | None
|
Maximum execution time in seconds. |
None
|
retries
|
int
|
Number of retry attempts on failure. |
0
|
retry_delay
|
float
|
Base delay between retries (exponential backoff). |
1.0
|
cost_budget
|
int | None
|
Maximum cost in microcents for this step. |
None
|
Returns:
| Type | Description |
|---|---|
_FunctionStep | Callable[[Callable[..., Any]], _FunctionStep]
|
A BaseStep instance that can be used in a Pipeline. |
Configuration¶
StepConfig¶
fastroai.pipelines.StepConfig
dataclass
¶
Configuration for a pipeline step.
Attributes:
| Name | Type | Description |
|---|---|---|
timeout |
float | None
|
Maximum time in seconds for step execution. None = no timeout. |
retries |
int
|
Number of retry attempts on failure. 0 = no retries. |
retry_delay |
float
|
Delay in seconds between retry attempts. |
cost_budget |
int | None
|
Maximum cost in microcents. None = no budget limit. |
Examples:
# Step with 30s timeout and 2 retries
config = StepConfig(timeout=30.0, retries=2)
# Step with cost budget of $0.10 (10 cents = 100_000 microcents)
config = StepConfig(cost_budget=100_000)
PipelineConfig¶
fastroai.pipelines.PipelineConfig
dataclass
¶
Configuration for a pipeline with additional options.
Inherits all StepConfig fields plus pipeline-specific options.
Attributes:
| Name | Type | Description |
|---|---|---|
trace |
bool
|
Whether to enable tracing for this pipeline. |
on_error |
Literal['fail', 'continue']
|
Error handling strategy: - "fail": Stop pipeline on first error (default) - "continue": Continue executing other steps on error |
Examples:
# Pipeline with tracing and 60s timeout
config = PipelineConfig(trace=True, timeout=60.0)
# Pipeline that continues on errors
config = PipelineConfig(on_error="continue")
Conversation State¶
ConversationStatus¶
fastroai.pipelines.ConversationStatus
¶
Status of multi-turn conversation gathering.
Attributes:
| Name | Type | Description |
|---|---|---|
COMPLETE |
All required information has been gathered. The pipeline proceeds to subsequent steps. |
|
INCOMPLETE |
More information is needed from the user. The pipeline pauses and returns partial state. |
ConversationState¶
fastroai.pipelines.ConversationState
¶
Signal for multi-turn conversation steps.
When a step returns ConversationState with INCOMPLETE status, the pipeline stops early. Partial data and context are preserved.
Examples:
class GatherInfoStep(BaseStep[MyDeps, ConversationState[UserInfo]]):
async def execute(self, context) -> ConversationState[UserInfo]:
info = await self._extract(context.get_input("message"))
if info.is_complete():
return ConversationState(
status=ConversationStatus.COMPLETE,
data=info,
)
return ConversationState(
status=ConversationStatus.INCOMPLETE,
data=info, # Partial data
context={"missing": info.missing_fields()},
)
Usage Tracking¶
StepUsage¶
fastroai.pipelines.StepUsage
¶
Usage metrics for a single pipeline step.
Automatically extracted from ChatResponse when using AgentStep.
Examples:
# From ChatResponse
usage = StepUsage.from_chat_response(response)
# Manual creation
usage = StepUsage(
input_tokens=100,
output_tokens=50,
cost_microcents=175,
processing_time_ms=500,
model="gpt-4o",
)
# Combine usages
total = usage1 + usage2
__add__(other)
¶
from_chat_response(response)
classmethod
¶
Create StepUsage from a ChatResponse.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
response
|
ChatResponse[Any]
|
ChatResponse from an agent run. |
required |
Returns:
| Type | Description |
|---|---|
StepUsage
|
StepUsage with metrics extracted from the response. |
Examples:
PipelineUsage¶
fastroai.pipelines.PipelineUsage
¶
Aggregated usage across all pipeline steps.
Examples:
# From step usages
usage = PipelineUsage.from_step_usages({
"extract": StepUsage(cost_microcents=100, ...),
"classify": StepUsage(cost_microcents=200, ...),
})
print(f"Total cost: ${usage.total_cost_dollars:.6f}")
print(f"Steps: {list(usage.steps.keys())}")
total_cost_dollars
property
¶
Total cost in dollars for display purposes.
Returns:
| Type | Description |
|---|---|
float
|
Total cost as a float in dollars. |
Note
Use total_cost_microcents for calculations to avoid floating-point errors.
from_step_usages(step_usages)
classmethod
¶
Aggregate metrics from individual step usages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_usages
|
dict[str, StepUsage]
|
Dict mapping step IDs to their usage metrics. |
required |
Returns:
| Type | Description |
|---|---|
PipelineUsage
|
PipelineUsage with summed totals and per-step breakdown. |
Examples:
step_usages = {
"extract": StepUsage(input_tokens=100, cost_microcents=50),
"classify": StepUsage(input_tokens=200, cost_microcents=100),
}
usage = PipelineUsage.from_step_usages(step_usages)
print(f"Total tokens: {usage.total_input_tokens}") # 300
print(f"Total cost: ${usage.total_cost_dollars:.6f}")
# Access per-step breakdown
for step_id, step_usage in usage.steps.items():
print(f" {step_id}: {step_usage.cost_microcents} microcents")
Errors¶
StepExecutionError¶
fastroai.pipelines.StepExecutionError
¶
Raised when a pipeline step fails during execution.
Attributes:
| Name | Type | Description |
|---|---|---|
step_id |
The ID of the step that failed. |
|
original_error |
The underlying exception that caused the failure. |
Examples:
try:
result = await pipeline.execute(inputs, deps)
except StepExecutionError as e:
print(f"Step '{e.step_id}' failed: {e.original_error}")