Skip to content

Pipelines

The pipelines module provides DAG-based workflow orchestration with automatic parallelism, multi-turn conversation support, and usage tracking.

Pipeline

fastroai.pipelines.Pipeline

Declarative DAG pipeline for multi-step AI workflows.

Features: - Automatic parallelism from dependencies - Type-safe dependency access - Early termination on INCOMPLETE status - Aggregated usage tracking - Distributed tracing

Examples:

pipeline = Pipeline(
    name="document_processor",
    steps={
        "extract": ExtractStep(),
        "classify": ClassifyStep(),
        "summarize": SummarizeStep(),
    },
    dependencies={
        "classify": ["extract"],
        "summarize": ["classify"],
    },
)

result = await pipeline.execute({"document": doc}, deps, tracer)
summary = result.output

Parallelism example - steps at the same level run in parallel:

dependencies = {
    "classify": ["extract"],
    "fetch_market": ["classify"],
    "fetch_user": ["classify"],  # Same dep as above
    "calculate": ["fetch_market", "fetch_user"],
}
# Execution:
# Level 0: extract
# Level 1: classify
# Level 2: fetch_market, fetch_user (PARALLEL)
# Level 3: calculate

__init__(name, steps, dependencies=None, output_step=None, config=None, step_configs=None)

Initialize Pipeline.

Parameters:

Name Type Description Default
name str

Pipeline name (for tracing).

required
steps dict[str, BaseStep[DepsT, Any]]

Dict of step_id -> step instance.

required
dependencies dict[str, list[str]] | None

Dict of step_id -> [dependency_ids].

None
output_step str | None

Which step's output is the pipeline output. Defaults to last step in topological order.

None
config PipelineConfig | None

Default configuration for all steps (timeout, retries, budget).

None
step_configs dict[str, StepConfig] | None

Per-step configuration overrides.

None

Config Resolution (most specific wins): 1. Pipeline default config 2. Step class config (if step has .config attribute) 3. step_configs[step_id] override 4. Per-call overrides via ctx.run(timeout=..., retries=...)

Raises:

Type Description
PipelineValidationError

Invalid deps, unknown output_step, or cycles.

Examples:

pipeline = Pipeline(
    name="processor",
    steps={"extract": ExtractStep(), "classify": ClassifyStep()},
    config=PipelineConfig(timeout=30.0, retries=1),
    step_configs={"classify": StepConfig(timeout=60.0)},  # Override
)

execute(input_data, deps, tracer=None) async

Execute the pipeline.

Parameters:

Name Type Description Default
input_data InputT

Input accessible via context.get_input().

required
deps DepsT

Your deps accessible via context.deps.

required
tracer Tracer | None

For distributed tracing.

None

Returns:

Type Description
PipelineResult[OutputT]

PipelineResult with output and usage.

Raises:

Type Description
StepExecutionError

If any step fails.

Examples:

# Basic execution
result = await pipeline.execute(
    {"document": "Hello world"},
    deps=MyDeps(api_key="..."),
)
print(result.output)

# With tracing
from fastroai import SimpleTracer

tracer = SimpleTracer()
result = await pipeline.execute(
    {"document": doc},
    deps=deps,
    tracer=tracer,
)

# Handle early termination (multi-turn)
if result.stopped_early:
    missing = result.conversation_state.context["missing"]
    return {"status": "incomplete", "missing": missing}

# Access usage metrics
if result.usage:
    print(f"Cost: ${result.usage.total_cost_dollars:.6f}")
    print(f"Tokens: {result.usage.total_input_tokens}")

PipelineResult

fastroai.pipelines.PipelineResult

Result from pipeline execution.

Attributes:

Name Type Description
output OutputT | None

Final step's output, or None if stopped early.

step_outputs dict[str, Any]

All step outputs by ID.

conversation_state ConversationState[Any] | None

ConversationState if a step returned one.

usage PipelineUsage | None

Aggregated usage metrics.

stopped_early bool

True if stopped due to INCOMPLETE status.

Examples:

result = await pipeline.execute(data, deps)

if result.stopped_early:
    missing = result.conversation_state.context["missing"]
    return {"status": "incomplete", "missing": missing}

print(f"Cost: ${result.usage.total_cost_dollars:.6f}")
return {"status": "complete", "output": result.output}

BaseStep

fastroai.pipelines.BaseStep

Abstract base class for pipeline steps.

A step is one unit of work. It: - Receives context with inputs and dependencies - Does something (AI call, computation, API call) - Returns typed output

Steps should be stateless. Any state goes in deps or inputs.

Examples:

class ExtractStep(BaseStep[MyDeps, ExtractionResult]):
    '''Extract entities from document.'''

    def __init__(self):
        self.agent = FastroAgent(system_prompt="Extract entities.")

    async def execute(self, context: StepContext[MyDeps]) -> ExtractionResult:
        document = context.get_input("document")
        response = await self.agent.run(f"Extract: {document}")
        return ExtractionResult.model_validate_json(response.content)

execute(context) abstractmethod async

Execute step logic.

Override this method to implement your step's behavior.

Parameters:

Name Type Description Default
context StepContext[DepsT]

Execution context with inputs, deps, and step outputs.

required

Returns:

Type Description
OutputT

The step's typed output.

StepContext

fastroai.pipelines.StepContext

Execution context provided to pipeline steps.

Provides access to: - Pipeline inputs (the data passed to execute()) - Outputs from dependency steps - Application dependencies (your db session, user, etc.) - Tracer for custom spans

Examples:

class ProcessStep(BaseStep[MyDeps, Result]):
    async def execute(self, context: StepContext[MyDeps]) -> Result:
        # Get pipeline input
        document = context.get_input("document")

        # Get output from dependency step
        classification = context.get_dependency("classify", Classification)

        # Access your deps
        db = context.deps.session
        user_id = context.deps.user_id

        # Custom tracing
        if context.tracer:
            async with context.tracer.span("custom_operation"):
                result = await process(document)

        return result

step_id property

Current step's ID.

Returns:

Type Description
str

The step identifier string.

deps property

Application dependencies (your session, user, etc.).

Returns:

Type Description
DepsT

The dependencies object passed to pipeline.execute().

tracer property

Tracer for custom spans.

Returns:

Type Description
Tracer | None

The tracer instance, or None if no tracing.

usage property

Accumulated usage from all ctx.run() calls in this step.

Returns:

Type Description
StepUsage

StepUsage with aggregated tokens and cost.

config property

Configuration for this step (timeout, retries, budget).

Returns:

Type Description
StepConfig

The resolved StepConfig for this step.

__init__(step_id, inputs, deps, step_outputs, tracer=None, config=None)

Initialize step context.

Parameters:

Name Type Description Default
step_id str

Unique identifier for this step.

required
inputs dict[str, Any]

Pipeline inputs passed to execute().

required
deps DepsT

Application dependencies (db session, user, etc.).

required
step_outputs dict[str, Any]

Outputs from completed dependency steps.

required
tracer Tracer | None

Optional tracer for distributed tracing.

None
config StepConfig | None

Step configuration (timeout, retries, budget).

None

get_input(key, default=None)

Get value from pipeline inputs.

Parameters:

Name Type Description Default
key str

The input key to retrieve.

required
default Any

Value to return if key not found.

None

Returns:

Type Description
Any

The input value, or default if not present.

Examples:

class ProcessStep(BaseStep[MyDeps, str]):
    async def execute(self, ctx: StepContext[MyDeps]) -> str:
        # Get required input
        document = ctx.get_input("document")

        # Get optional input with default
        format_type = ctx.get_input("format", "json")

        return f"Processing {document} as {format_type}"

get_dependency(step_id, output_type=None)

Get output from a dependency step.

Parameters:

Name Type Description Default
step_id str

ID of the dependency step.

required
output_type type[T] | None

Expected type (for IDE/type checker, not enforced).

None

Returns:

Type Description
T

The output from the dependency step.

Raises:

Type Description
ValueError

If step_id not in dependencies or hasn't run.

Examples:

# With type hint (IDE knows extraction is ExtractionResult)
extraction = context.get_dependency("extract", ExtractionResult)
extraction.entities  # Autocomplete works!

get_dependency_or_none(step_id, output_type=None)

Get output from a dependency step, or None if not available.

Use this for optional dependencies that may not have run.

Parameters:

Name Type Description Default
step_id str

ID of the dependency step.

required
output_type type[T] | None

Expected type (for IDE/type checker, not enforced).

None

Returns:

Type Description
T | None

The output from the dependency step, or None if not available.

Examples:

class EnhanceStep(BaseStep[MyDeps, str]):
    async def execute(self, ctx: StepContext[MyDeps]) -> str:
        # Required dependency
        base_content = ctx.get_dependency("extract")

        # Optional dependency - might not exist
        metadata = ctx.get_dependency_or_none("fetch_metadata", dict)

        if metadata:
            return f"{base_content} (with metadata)"
        return base_content

run(agent, message, *, timeout=None, retries=None) async

Run an agent with automatic tracer, usage tracking, and config.

This is THE way to call agents from within a step. It: - Passes deps and tracer automatically - Accumulates usage in ctx.usage - Enforces cost budget (raises CostBudgetExceededError if exceeded) - Supports timeout and retries (from config or per-call override)

Parameters:

Name Type Description Default
agent FastroAgent[OutputT]

The FastroAgent to run.

required
message str

The message/prompt to send.

required
timeout float | None

Per-call timeout override (seconds). Uses config if None.

None
retries int | None

Per-call retries override. Uses config if None.

None

Returns:

Type Description
ChatResponse[OutputT]

ChatResponse with output, content, usage data, etc.

Raises:

Type Description
CostBudgetExceededError

If cost_budget is set and exceeded.

TimeoutError

If timeout exceeded after all retries.

Examples:

class MyStep(BaseStep[MyDeps, str]):
    classifier = FastroAgent(model="gpt-4o-mini", output_type=Category)
    writer = FastroAgent(model="gpt-4o")

    async def execute(self, ctx: StepContext[MyDeps]) -> str:
        # Both calls tracked in ctx.usage
        category = await ctx.run(self.classifier, "Classify this")
        result = await ctx.run(self.writer, f"Write about {category.output}")
        return result.content

# With per-call overrides:
response = await ctx.run(agent, "msg", timeout=30.0, retries=2)

step

fastroai.pipelines.step(func=None, *, timeout=None, retries=0, retry_delay=1.0, cost_budget=None)

step(func: Callable[..., OutputT]) -> _FunctionStep
step(
    func: None = None,
    *,
    timeout: float | None = None,
    retries: int = 0,
    retry_delay: float = 1.0,
    cost_budget: int | None = None,
) -> Callable[[Callable[..., OutputT]], _FunctionStep]

Decorator to create a pipeline step from a function.

Can be used with or without arguments:

@step
async def my_step(ctx): ...

@step(timeout=30.0, retries=2)
async def my_step(ctx): ...

Parameters:

Name Type Description Default
func Callable[..., Any] | None

The function to wrap (when used without parentheses).

None
timeout float | None

Maximum execution time in seconds.

None
retries int

Number of retry attempts on failure.

0
retry_delay float

Base delay between retries (exponential backoff).

1.0
cost_budget int | None

Maximum cost in microcents for this step.

None

Returns:

Type Description
_FunctionStep | Callable[[Callable[..., Any]], _FunctionStep]

A BaseStep instance that can be used in a Pipeline.

Configuration

StepConfig

fastroai.pipelines.StepConfig dataclass

Configuration for a pipeline step.

Attributes:

Name Type Description
timeout float | None

Maximum time in seconds for step execution. None = no timeout.

retries int

Number of retry attempts on failure. 0 = no retries.

retry_delay float

Delay in seconds between retry attempts.

cost_budget int | None

Maximum cost in microcents. None = no budget limit.

Examples:

# Step with 30s timeout and 2 retries
config = StepConfig(timeout=30.0, retries=2)

# Step with cost budget of $0.10 (10 cents = 100_000 microcents)
config = StepConfig(cost_budget=100_000)

PipelineConfig

fastroai.pipelines.PipelineConfig dataclass

Configuration for a pipeline with additional options.

Inherits all StepConfig fields plus pipeline-specific options.

Attributes:

Name Type Description
trace bool

Whether to enable tracing for this pipeline.

on_error Literal['fail', 'continue']

Error handling strategy: - "fail": Stop pipeline on first error (default) - "continue": Continue executing other steps on error

Examples:

# Pipeline with tracing and 60s timeout
config = PipelineConfig(trace=True, timeout=60.0)

# Pipeline that continues on errors
config = PipelineConfig(on_error="continue")

Conversation State

ConversationStatus

fastroai.pipelines.ConversationStatus

Status of multi-turn conversation gathering.

Attributes:

Name Type Description
COMPLETE

All required information has been gathered. The pipeline proceeds to subsequent steps.

INCOMPLETE

More information is needed from the user. The pipeline pauses and returns partial state.

ConversationState

fastroai.pipelines.ConversationState

Signal for multi-turn conversation steps.

When a step returns ConversationState with INCOMPLETE status, the pipeline stops early. Partial data and context are preserved.

Examples:

class GatherInfoStep(BaseStep[MyDeps, ConversationState[UserInfo]]):
    async def execute(self, context) -> ConversationState[UserInfo]:
        info = await self._extract(context.get_input("message"))

        if info.is_complete():
            return ConversationState(
                status=ConversationStatus.COMPLETE,
                data=info,
            )

        return ConversationState(
            status=ConversationStatus.INCOMPLETE,
            data=info,  # Partial data
            context={"missing": info.missing_fields()},
        )

Usage Tracking

StepUsage

fastroai.pipelines.StepUsage

Usage metrics for a single pipeline step.

Automatically extracted from ChatResponse when using AgentStep.

Examples:

# From ChatResponse
usage = StepUsage.from_chat_response(response)

# Manual creation
usage = StepUsage(
    input_tokens=100,
    output_tokens=50,
    cost_microcents=175,
    processing_time_ms=500,
    model="gpt-4o",
)

# Combine usages
total = usage1 + usage2

__add__(other)

Combine two StepUsage instances.

Parameters:

Name Type Description Default
other StepUsage

Another StepUsage to add.

required

Returns:

Type Description
StepUsage

New StepUsage with summed metrics.

Examples:

usage1 = StepUsage(input_tokens=100, cost_microcents=50)
usage2 = StepUsage(input_tokens=200, cost_microcents=100)

total = usage1 + usage2
print(total.input_tokens)  # 300
print(total.cost_microcents)  # 150

from_chat_response(response) classmethod

Create StepUsage from a ChatResponse.

Parameters:

Name Type Description Default
response ChatResponse[Any]

ChatResponse from an agent run.

required

Returns:

Type Description
StepUsage

StepUsage with metrics extracted from the response.

Examples:

response = await agent.run("Hello")
usage = StepUsage.from_chat_response(response)

print(f"Tokens: {usage.input_tokens} in, {usage.output_tokens} out")
print(f"Cost: {usage.cost_microcents} microcents")

PipelineUsage

fastroai.pipelines.PipelineUsage

Aggregated usage across all pipeline steps.

Examples:

# From step usages
usage = PipelineUsage.from_step_usages({
    "extract": StepUsage(cost_microcents=100, ...),
    "classify": StepUsage(cost_microcents=200, ...),
})

print(f"Total cost: ${usage.total_cost_dollars:.6f}")
print(f"Steps: {list(usage.steps.keys())}")

total_cost_dollars property

Total cost in dollars for display purposes.

Returns:

Type Description
float

Total cost as a float in dollars.

Note

Use total_cost_microcents for calculations to avoid floating-point errors.

from_step_usages(step_usages) classmethod

Aggregate metrics from individual step usages.

Parameters:

Name Type Description Default
step_usages dict[str, StepUsage]

Dict mapping step IDs to their usage metrics.

required

Returns:

Type Description
PipelineUsage

PipelineUsage with summed totals and per-step breakdown.

Examples:

step_usages = {
    "extract": StepUsage(input_tokens=100, cost_microcents=50),
    "classify": StepUsage(input_tokens=200, cost_microcents=100),
}

usage = PipelineUsage.from_step_usages(step_usages)
print(f"Total tokens: {usage.total_input_tokens}")  # 300
print(f"Total cost: ${usage.total_cost_dollars:.6f}")

# Access per-step breakdown
for step_id, step_usage in usage.steps.items():
    print(f"  {step_id}: {step_usage.cost_microcents} microcents")

Errors

StepExecutionError

fastroai.pipelines.StepExecutionError

Raised when a pipeline step fails during execution.

Attributes:

Name Type Description
step_id

The ID of the step that failed.

original_error

The underlying exception that caused the failure.

Examples:

try:
    result = await pipeline.execute(inputs, deps)
except StepExecutionError as e:
    print(f"Step '{e.step_id}' failed: {e.original_error}")

← Agent Tools →