Skip to content

Agent

The agent module provides FastroAgent, a wrapper around PydanticAI's Agent with automatic cost calculation, distributed tracing, and a consistent response format.

FastroAgent

fastroai.agent.FastroAgent

AI agent with usage tracking, cost calculation, and tracing.

Wraps PydanticAI's Agent to provide: - Automatic cost calculation in microcents - Optional distributed tracing - Streaming and non-streaming modes - Consistent ChatResponse format - Structured output support via output_type

The agent is STATELESS regarding conversation history. Callers load history from their storage and pass it to run().

Examples:

# Basic usage (returns string)
agent = FastroAgent(
    model="openai:gpt-4o",
    system_prompt="You are helpful.",
)
response = await agent.run("Hello!")
print(response.content)
print(f"Cost: ${response.cost_dollars:.6f}")

# With structured output
from pydantic import BaseModel

class Answer(BaseModel):
    value: int
    explanation: str

agent = FastroAgent(
    model="openai:gpt-4o",
    output_type=Answer,
)
response = await agent.run("What is 2+2?")
print(response.output.value)  # 4

# With conversation history (you load it)
history = await my_memory_service.load(user_id)
response = await agent.run("Continue", message_history=history)
await my_memory_service.save(user_id, "Continue", response.content)

# With tracing
from fastroai import SimpleTracer
tracer = SimpleTracer()
response = await agent.run("Hello", tracer=tracer)

# With custom deps for tools
response = await agent.run("Search for news", deps=MyDeps(api_key="..."))

# Streaming
async for chunk in agent.run_stream("Tell me a story"):
    if chunk.is_final:
        print(f"\nCost: ${chunk.usage_data.cost_dollars:.6f}")
    else:
        print(chunk.content, end="", flush=True)

agent property

Access the underlying PydanticAI agent.

Returns:

Type Description
Agent[Any, OutputT]

The wrapped PydanticAI Agent instance.

__init__(config=None, agent=None, output_type=None, toolsets=None, cost_calculator=None, on_before_dispatch=None, on_after_dispatch=None, **kwargs)

Initialize FastroAgent.

Parameters:

Name Type Description Default
config AgentConfig | None

Agent configuration. If None, creates from kwargs.

None
agent Agent[Any, OutputT] | None

Pre-configured PydanticAI Agent (escape hatch). If provided, config is only used for cost calculation.

None
output_type type[OutputT] | None

Pydantic model for structured output. Defaults to str.

None
toolsets list[AbstractToolset] | None

Tool sets available to the agent.

None
cost_calculator CostCalculator | None

Cost calculator. Default uses standard pricing.

None
on_before_dispatch Callable[[], Awaitable[None]] | None

Optional async callback fired immediately before each agent.run() attempt inside the StepContext retry loop. Raise DispatchSkippedError (or a subclass) to short-circuit the dispatch — the retry loop propagates that exception without retrying. Use this for guards that should fail fast: circuit breakers, rate limiters, kill switches. Fires per attempt (every retry triggers the hook again).

None
on_after_dispatch Callable[[Exception | None], Awaitable[None]] | None

Optional async callback fired immediately after each agent.run() attempt completes. Receives the exception that was raised, or None on success. Use this for outcome signaling: breaker recording, retry-budget tracking, alerting. Fires per attempt (every retry triggers the hook again). Does NOT fire when on_before_dispatch raises (that short-circuits dispatch entirely) or when a pre-flight guard like CostBudgetExceededError rejects before dispatch.

None
**kwargs Any

Passed to AgentConfig if config is None. Common: model, system_prompt, temperature, max_tokens, timeout, timeout_name.

{}

Examples:

# Using config object
config = AgentConfig(model="gpt-4o", temperature=0.3)
agent = FastroAgent(config=config)

# Using kwargs (simpler)
agent = FastroAgent(model="gpt-4o", temperature=0.5)

# With structured output
agent = FastroAgent(model="gpt-4o", output_type=MyResponseModel)

# Custom pricing override (e.g., volume discount)
calc = CostCalculator(pricing_overrides={
    "gpt-4o": {"input_per_mtok": 2.00, "output_per_mtok": 8.00}
})
agent = FastroAgent(cost_calculator=calc)

# Escape hatch: your own PydanticAI agent
from pydantic_ai import Agent
pydantic_agent = Agent(model="gpt-4o", output_type=MyType)
agent = FastroAgent(agent=pydantic_agent)

# With dispatch hooks for circuit-breaker integration
async def before():
    await breaker.check()  # may raise DispatchSkippedError subclass

async def after(exc):
    if exc is None:
        await breaker.record_success()
    else:
        await breaker.record_failure(categorize(exc))

agent = FastroAgent(
    model="gpt-4o",
    timeout=300,
    timeout_name="my_step.dispatch",
    on_before_dispatch=before,
    on_after_dispatch=after,
)

run(message, deps=None, message_history=None, model_settings=None, tracer=None, **kwargs) async

Execute a single agent interaction.

Parameters:

Name Type Description Default
message str

User message to send.

required
deps DepsT | None

Dependencies passed to tools. Can be any type.

None
message_history list[ModelMessage] | None

Previous messages (you load these from your storage).

None
model_settings ModelSettings | None

Runtime model config overrides.

None
tracer Tracer | None

Tracer for distributed tracing.

None
**kwargs Any

Passed to PydanticAI Agent.run().

{}

Returns:

Type Description
ChatResponse[OutputT]

ChatResponse with content, usage, cost, and trace_id.

Examples:

# Simple usage
response = await agent.run("Hello!")
print(response.content)
print(f"Cost: ${response.cost_dollars:.6f}")

# With conversation history
history = await memory.load(user_id)
response = await agent.run("Continue", message_history=history)
await memory.save(user_id, "Continue", response.content)

# With tracing
tracer = SimpleTracer()
response = await agent.run("Hello", tracer=tracer)
print(f"Trace ID: {response.trace_id}")

run_stream(message, deps=None, message_history=None, model_settings=None, tracer=None, **kwargs) async

Execute a streaming agent interaction.

Yields StreamChunk objects as the response is generated. The final chunk has is_final=True and includes complete usage data.

Parameters:

Name Type Description Default
message str

User message to send.

required
deps DepsT | None

Dependencies passed to tools.

None
message_history list[ModelMessage] | None

Previous messages.

None
model_settings ModelSettings | None

Runtime model config overrides.

None
tracer Tracer | None

Tracer for distributed tracing.

None
**kwargs Any

Passed to PydanticAI Agent.run_stream().

{}

Yields:

Type Description
AsyncGenerator[StreamChunk[OutputT], None]

StreamChunk objects. Final chunk has usage_data.

Examples:

async for chunk in agent.run_stream("Tell me a story"):
    if chunk.is_final:
        print(f"\nCost: ${chunk.usage_data.cost_dollars:.6f}")
    else:
        print(chunk.content, end="", flush=True)

as_step(prompt)

Turn this agent into a pipeline step.

Creates a BaseStep that runs this agent with the given prompt and returns the agent's output directly.

Parameters:

Name Type Description Default
prompt Callable[[StepContext[DepsT]], str] | str

Either a static string or a function that builds the prompt from the step context.

required

Returns:

Type Description
AgentStepWrapper[DepsT, OutputT]

A BaseStep that can be used in a Pipeline.

Examples:

# Static prompt
agent = FastroAgent(model="gpt-4o", system_prompt="Summarize text.")
step = agent.as_step("Summarize the document.")

# Dynamic prompt from context
agent = FastroAgent(model="gpt-4o", system_prompt="Summarize text.")
step = agent.as_step(lambda ctx: f"Summarize: {ctx.get_input('doc')}")

# With structured output
agent = FastroAgent(model="gpt-4o", output_type=Summary)
step = agent.as_step(lambda ctx: f"Summarize: {ctx.get_input('doc')}")
# step returns Summary directly

# Use in pipeline
pipeline = Pipeline(
    name="summarizer",
    steps={"summarize": step},
)

AgentConfig

fastroai.agent.AgentConfig

Configuration for FastroAgent instances.

All parameters have sensible defaults. Override as needed.

Examples:

# Minimal - uses all defaults
config = AgentConfig()

# Custom configuration
config = AgentConfig(
    model="anthropic:claude-3-5-sonnet",
    system_prompt="You are a financial advisor.",
    temperature=0.3,
)

# Use with agent
agent = FastroAgent(config=config)

# Or pass kwargs directly to FastroAgent
agent = FastroAgent(model="openai:gpt-4o-mini", temperature=0.5)

get_effective_system_prompt()

Get system prompt, using default if not set.

Returns:

Type Description
str

The configured system prompt or DEFAULT_SYSTEM_PROMPT.

ChatResponse

fastroai.agent.ChatResponse

Response from an AI agent interaction.

Contains the response content plus comprehensive usage metrics for billing, analytics, and debugging.

Examples:

response = await agent.run("What is 2+2?")

print(f"Answer: {response.content}")
print(f"Cost: ${response.cost_dollars:.6f}")
print(f"Tokens: {response.total_tokens}")

# Check cache effectiveness
if response.cache_read_tokens > 0:
    cache_ratio = response.cache_read_tokens / response.input_tokens
    print(f"Cache hit ratio: {cache_ratio:.1%}")

if response.tool_calls:
    for call in response.tool_calls:
        print(f"Used tool: {call['tool_name']}")

# With structured output
from pydantic import BaseModel

class Answer(BaseModel):
    value: int
    explanation: str

agent = FastroAgent(output_type=Answer)
response = await agent.run("What is 2+2?")
print(response.output.value)  # 4
print(response.output.explanation)  # "2 plus 2 equals 4"
Note

Why microcents? Floating-point math has precision errors (0.1 + 0.2 = 0.30000000000000004). With integers, precision is exact. For billing systems, this matters.

cost_dollars property

Cost in dollars for display purposes.

Returns:

Type Description
float

Cost as a float in dollars.

Note

Use cost_microcents for calculations to avoid floating-point errors.

StreamChunk

fastroai.agent.StreamChunk

A chunk in a streaming response.

Most chunks have content with is_final=False. The last chunk has is_final=True with complete usage data.

Examples:

async for chunk in agent.run_stream("Tell me a story"):
    if chunk.is_final:
        print(f"\nTotal cost: ${chunk.usage_data.cost_dollars:.6f}")
    else:
        print(chunk.content, end="", flush=True)

AgentStepWrapper

fastroai.agent.AgentStepWrapper

Pipeline step wrapper for FastroAgent.

Created via FastroAgent.as_step(). Wraps an agent as a pipeline step.

The wrapper uses ctx.run() for automatic tracer/deps forwarding and usage tracking, and returns the agent's typed output directly.

Note

Use FastroAgent.as_step() to create instances rather than instantiating directly.

agent property

Access the underlying FastroAgent.

Returns:

Type Description
FastroAgent[OutputT]

The wrapped FastroAgent instance.

__init__(agent, prompt)

Initialize the step wrapper.

Parameters:

Name Type Description Default
agent FastroAgent[OutputT]

The FastroAgent to wrap.

required
prompt Callable[[StepContext[DepsT]], str] | str

Static string or function that builds the prompt from context.

required

execute(context) async

Execute the agent with the configured prompt.

Parameters:

Name Type Description Default
context StepContext[DepsT]

Step execution context with inputs, deps, and config.

required

Returns:

Type Description
OutputT

The agent's typed output.


← API Overview Pipelines →