Skip to content

Using FastCRUD for Enhanced CRUD Operations

FastCRUD is a versatile tool for handling CRUD (Create, Read, Update, Delete) operations in FastAPI applications with SQLAlchemy models. It leverages Pydantic schemas for data validation and serialization, offering a streamlined approach to database interactions.

Key Features

  • Simplified CRUD operations with SQLAlchemy models.
  • Data validation and serialization using Pydantic.
  • Support for complex queries including joins and pagination.

Getting Started

Step 1: Define Models and Schemas

Define your SQLAlchemy models and Pydantic schemas for data representation.

Models and Schemas Used Below
item/model.py
from sqlalchemy import Column, DateTime, Integer, Numeric, String, func
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class Item(Base):
    __tablename__ = "items"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    description = Column(String)
    category = Column(String)
    price = Column(Numeric)
    last_sold = Column(DateTime)
    created_at = Column(DateTime, default=func.now())
item/schemas.py
import datetime

from pydantic import BaseModel


class CreateItemSchema(BaseModel):
    name: str | None = None
    description: str | None = None
    category: str | None = None
    price: float | None = None
    last_sold: datetime.datetime | None = None


class UpdateItemSchema(BaseModel):
    name: str | None = None
    description: str | None = None
    category: str | None = None
    price: float | None = None
    last_sold: datetime.datetime | None = None

customer/model.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class Customer(Base):
    __tablename__ = "customer"
    id = Column(Integer, primary_key=True)
    name = Column(String)
customer/schemas.py
from pydantic import BaseModel


class ReadCustomerSchema(BaseModel):
    id: int
    name: str | None = None
product/model.py
from sqlalchemy import Column, Integer, Numeric, String
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class Product(Base):
    __tablename__ = "product"
    id = Column(Integer, primary_key=True)
    name = Column(String)
    price = Column(Numeric)
order/model.py
from sqlalchemy import Column, ForeignKey, Integer
from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class Order(Base):
    __tablename__ = "order"
    id = Column(Integer, primary_key=True)
    customer_id = Column(Integer, ForeignKey("customer.id"))
    product_id = Column(Integer, ForeignKey("product.id"))
    quantity = Column(Integer)
order/schemas.py
from pydantic import BaseModel


class ReadOrderSchema(BaseModel):
    id: int
    customer_id: int | None = None
    product_id: int | None = None
    quantity: int | None = None

Step 2: Initialize FastCRUD

Create a FastCRUD instance for your model to handle CRUD operations.

from fastcrud import FastCRUD

# Creating a FastCRUD instance
item_crud = FastCRUD(Item)
order_crud = FastCRUD(Order)

Step 3: Pick your Method

Then you just pick the method you need and use it like this:

# Creating a new record (v0.20.0: returns None without schema_to_select)
result = await item_crud.create(db_session, create_schema_instance)
# result is None

# To get created data back as dict:
new_record = await item_crud.create(
    db_session, 
    create_schema_instance, 
    schema_to_select=YourReadSchema
)

# To get created data back as Pydantic model:
new_record = await item_crud.create(
    db_session, 
    create_schema_instance, 
    schema_to_select=YourReadSchema,
    return_as_model=True
)

More on available methods below.


Understanding FastCRUD Methods

FastCRUD offers a comprehensive suite of methods for CRUD operations, each designed to handle different aspects of database interactions efficiently.

1. Create

create(
    db: AsyncSession,
    object: CreateSchemaType,
    commit: bool = True,
) -> ModelType

Purpose: To create a new record in the database.
Usage Example: Creates an item with name "New Item".

# v0.20.0: Returns None without schema_to_select
result = await item_crud.create(db, CreateItemSchema(name="New Item"))
# result is None

# To get the created item data back as a dict:
new_item_dict = await item_crud.create(
    db, 
    CreateItemSchema(name="New Item"), 
    schema_to_select=ReadItemSchema
)
# new_item_dict is a dict with the created item data

# To get the created item data back as a Pydantic model:
new_item_model = await item_crud.create(
    db, 
    CreateItemSchema(name="New Item"), 
    schema_to_select=ReadItemSchema,
    return_as_model=True
)
# new_item_model is a ReadItemSchema instance

v0.20.0 Behavior

Changes Completed in v0.20.0: The create() method now behaves consistently with other CRUD methods like update(). Changes made:

  • Without schema_to_select: Now returns None (was SQLAlchemy model)
  • With schema_to_select: Returns created data immediately - dict by default, Pydantic model if return_as_model=True

Recommended: Always use schema_to_select to get the created data back in one operation. This is more efficient than separate create + get calls.

Warning

Note that naive datetime such as datetime.utcnow is not supported by FastCRUD as it was deprecated.

Use timezone aware datetime, such as datetime.now(UTC) instead.

2. Get

get(
    db: AsyncSession,
    schema_to_select: Optional[type[BaseModel]] = None,
    return_as_model: bool = False,
    one_or_none: bool = False,
    **kwargs: Any,
) -> Optional[Union[dict, BaseModel]]

Purpose: To fetch a single record based on filters, with an option to select specific columns using a Pydantic schema.
Return Types: - When return_as_model=True and schema_to_select is provided: Optional[SelectSchemaType] - When return_as_model=False: Optional[Dict[str, Any]]

Usage Examples:

Fetch item as dictionary:

item = await item_crud.get(db, id=item_id)
# Returns: Optional[Dict[str, Any]]

Fetch item as typed Pydantic model:

from .schemas import ReadItemSchema

item = await item_crud.get(
    db, 
    schema_to_select=ReadItemSchema,
    return_as_model=True,
    id=item_id
)
# Returns: Optional[ReadItemSchema]

3. Exists

exists(
    db: AsyncSession,
    **kwargs: Any,
) -> bool

Purpose: To check if a record exists based on provided filters.
Usage Example: Checks whether an item with name "Existing Item" exists.

exists = await item_crud.exists(db, name="Existing Item")

4. Count

count(
    db: AsyncSession,
    joins_config: Optional[list[JoinConfig]] = None,
    **kwargs: Any,
) -> int

Purpose: To count the number of records matching provided filters.
Usage Example: Counts the number of items with the "Books" category.

count = await item_crud.count(db, category="Books")

5. Get Multi

get_multi(
    db: AsyncSession,
    offset: int = 0,
    limit: Optional[int] = 100,
    schema_to_select: Optional[type[BaseModel]] = None,
    sort_columns: Optional[Union[str, list[str]]] = None,
    sort_orders: Optional[Union[str, list[str]]] = None,
    return_as_model: bool = False,
    return_total_count: bool = True,
    **kwargs: Any,
) -> dict[str, Any]

Purpose: To fetch multiple records with optional sorting, pagination, and model conversion. Return Types: - When return_as_model=True and schema_to_select is provided: GetMultiResponseModel[SelectSchemaType] - a TypedDict with data: list[SelectSchemaType] and total_count: int (when return_total_count=True) - When return_as_model=False: GetMultiResponseDict - a TypedDict with data: list[dict[str, Any]] and total_count: int (when return_total_count=True)

Usage Examples:

Fetch items as dictionaries:

items = await item_crud.get_multi(db, offset=10, limit=5)
# Returns: {"data": [Dict[str, Any], ...], "total_count": int}

Fetch items as typed Pydantic models:

from .schemas import ReadItemSchema

items = await item_crud.get_multi(
    db,
    offset=10,
    limit=5,
    schema_to_select=ReadItemSchema,
    return_as_model=True
)
# Returns: {"data": [ReadItemSchema, ...], "total_count": int}

Typing Your Functions:

When wrapping get_multi in your own functions, use the proper return types for full type safety:

from fastcrud import GetMultiResponseDict, GetMultiResponseModel
from .schemas import ReadItemSchema

# Option 1: Return as dict with proper typing
async def get_items(db: AsyncSession) -> GetMultiResponseDict:
    return await item_crud.get_multi(db)

# Option 2: Return as model with proper typing
async def get_items_typed(db: AsyncSession) -> GetMultiResponseModel[ReadItemSchema]:
    return await item_crud.get_multi(
        db,
        schema_to_select=ReadItemSchema,
        return_as_model=True
    )

# Option 3: Let the type be inferred (omit return annotation)
async def get_items_inferred(db: AsyncSession):
    return await item_crud.get_multi(db)

Avoid -> dict[str, Any]

Using -> dict[str, Any] as a return type annotation discards the type information that GetMultiResponseDict provides. If you explicitly annotate with dict[str, Any], you'll need to cast the result and lose the benefit of knowing that result["data"] is a list and result["total_count"] is an int.

6. Update

update(
    db: AsyncSession, 
    object: Union[UpdateSchemaType, dict[str, Any]], 
    allow_multiple: bool = False,
    commit: bool = True,
    return_columns: Optional[list[str]] = None,
    schema_to_select: Optional[type[BaseModel]] = None,
    return_as_model: bool = False,
    one_or_none: bool = False,
    **kwargs: Any,
) -> Optional[Union[dict, BaseModel]]

Purpose: To update an existing record in the database.
Return Types: - When return_as_model=True and schema_to_select is provided: Optional[SelectSchemaType] - When return_as_model=False: Optional[Dict[str, Any]]

Usage Examples:

Update and return as dictionary:

updated_item = await item_crud.update(
    db,
    UpdateItemSchema(description="Updated"),
    id=item_id,
)
# Returns: Optional[Dict[str, Any]]

Update and return as typed Pydantic model:

from .schemas import ReadItemSchema, UpdateItemSchema

updated_item = await item_crud.update(
    db,
    UpdateItemSchema(description="Updated"),
    schema_to_select=ReadItemSchema,
    return_as_model=True,
    id=item_id,
)
# Returns: Optional[ReadItemSchema]

7. Delete

delete(
    db: AsyncSession, 
    db_row: Optional[Row] = None, 
    allow_multiple: bool = False,
    commit: bool = True,
    **kwargs: Any,
) -> None

Purpose: To delete a record from the database, with support for soft delete.
Usage Example: Deletes the item with item_id as its id, performs a soft delete if the model has the is_deleted column.

await item_crud.delete(db, id=item_id)

8. Hard Delete

db_delete(
    db: AsyncSession, 
    allow_multiple: bool = False,
    commit: bool = True,
    **kwargs: Any,
) -> None

Purpose: To hard delete a record from the database.
Usage Example: Hard deletes the item with item_id as its id.

await item_crud.db_delete(db, id=item_id)

Advanced Methods for Complex Queries and Joins

FastCRUD extends its functionality with advanced methods tailored for complex query operations and handling joins. These methods cater to specific use cases where more sophisticated data retrieval and manipulation are required.

1. Get Multi

get_multi(
    db: AsyncSession,
    offset: int = 0,
    limit: Optional[int] = 100,
    schema_to_select: Optional[type[BaseModel]] = None,
    sort_columns: Optional[Union[str, list[str]]] = None,
    sort_orders: Optional[Union[str, list[str]]] = None,
    return_as_model: bool = False,
    return_total_count: bool = True,
    **kwargs: Any,
) -> dict[str, Any]

Purpose: To fetch multiple records based on specified filters, with options for sorting and pagination.
Usage Example: Gets the first 10 items sorted by name in ascending order.

items = await item_crud.get_multi(
    db,
    offset=0,
    limit=10,
    sort_columns=['name'],
    sort_orders=['asc'],
)

2. Get Joined

get_joined(
    db: AsyncSession,
    schema_to_select: Optional[type[BaseModel]] = None,
    join_model: Optional[ModelType] = None,
    join_on: Optional[Union[Join, BinaryExpression]] = None,
    join_prefix: Optional[str] = None,
    join_schema_to_select: Optional[type[BaseModel]] = None,
    join_type: str = "left",
    alias: Optional[AliasedClass] = None,
    join_filters: Optional[dict] = None,
    joins_config: Optional[list[JoinConfig]] = None,
    nest_joins: bool = False,
    relationship_type: Optional[str] = None,
    **kwargs: Any,
) -> Optional[dict[str, Any]]

Purpose: To fetch a single record with one or multiple joins on other models.
Usage Example: Fetches order details for a specific order by joining with the Customer table, selecting specific columns as defined in ReadOrderSchema and ReadCustomerSchema.

order_details = await order_crud.get_joined(
    db,
    schema_to_select=ReadOrderSchema,
    join_model=Customer,
    join_schema_to_select=ReadCustomerSchema,
    id=order_id,
)

3. Get Multi Joined

get_multi_joined(
    db: AsyncSession,
    schema_to_select: Optional[type[BaseModel]] = None,
    join_model: Optional[type[ModelType]] = None,
    join_on: Optional[Any] = None,
    join_prefix: Optional[str] = None,
    join_schema_to_select: Optional[type[BaseModel]] = None,
    join_type: str = "left",
    alias: Optional[AliasedClass[Any]] = None,
    join_filters: Optional[dict] = None,
    nest_joins: bool = False,
    offset: int = 0,
    limit: Optional[int] = 100,
    sort_columns: Optional[Union[str, list[str]]] = None,
    sort_orders: Optional[Union[str, list[str]]] = None,
    return_as_model: bool = False,
    joins_config: Optional[list[JoinConfig]] = None,
    return_total_count: bool = True,
    relationship_type: Optional[str] = None,
    **kwargs: Any,
) -> dict[str, Any]

Purpose: Similar to get_joined, but for fetching multiple records. Return Types: - When return_as_model=True and schema_to_select is provided: GetMultiResponseModel[SelectSchemaType] - a TypedDict with data: list[SelectSchemaType] and total_count: int (when return_total_count=True) - When return_as_model=False: GetMultiResponseDict - a TypedDict with data: list[dict[str, Any]] and total_count: int (when return_total_count=True)

Usage Examples:

Fetch joined records as dictionaries:

orders = await order_crud.get_multi_joined(
    db,
    schema_to_select=ReadOrderSchema,
    join_model=Customer,
    join_schema_to_select=ReadCustomerSchema,
    offset=0,
    limit=5,
)
# Returns: {"data": [Dict[str, Any], ...], "total_count": int}

Fetch joined records as typed Pydantic models:

orders = await order_crud.get_multi_joined(
    db,
    schema_to_select=ReadOrderSchema,
    join_model=Customer,
    join_schema_to_select=ReadCustomerSchema,
    return_as_model=True,
    offset=0,
    limit=5,
)
# Returns: {"data": [ReadOrderSchema, ...], "total_count": int}

4. Get Multi By Cursor

get_multi_by_cursor(
    db: AsyncSession,
    cursor: Any = None,
    limit: int = 100,
    schema_to_select: Optional[type[BaseModel]] = None,
    sort_column: str = "id",
    sort_order: str = "asc",
    return_as_model: bool = False,
    **kwargs: Any,
) -> dict[str, Any]

Purpose: Implements cursor-based pagination for efficient data retrieval in large datasets. Return Types: A dictionary with data: list[...] and next_cursor: Any: - When return_as_model=True and schema_to_select is provided: {"data": list[SelectSchemaType], "next_cursor": Any} - When return_as_model=False: {"data": list[dict[str, Any]], "next_cursor": Any}

Usage Examples:

Cursor pagination with dictionaries:

paginated_items = await item_crud.get_multi_by_cursor(
    db,
    cursor=last_cursor,
    limit=10,
    sort_column='created_at',
    sort_order='desc',
)
# Returns: {"data": [Dict[str, Any], ...], "next_cursor": Any}

Cursor pagination with typed Pydantic models:

from .schemas import ReadItemSchema

paginated_items = await item_crud.get_multi_by_cursor(
    db,
    cursor=last_cursor,
    limit=10,
    schema_to_select=ReadItemSchema,
    return_as_model=True,
    sort_column='created_at',
    sort_order='desc',
)
# Returns: {"data": [ReadItemSchema, ...], "next_cursor": Any}

5. Select

async def select(
    db: AsyncSession,
    schema_to_select: Optional[type[BaseModel]] = None,
    sort_columns: Optional[Union[str, list[str]]] = None,
    sort_orders: Optional[Union[str, list[str]]] = None,
    **kwargs: Any,
) -> Select

Purpose: Constructs a SQL Alchemy Select statement with optional column selection, filtering, and sorting. Usage Example: Selects all items, filtering by name and sorting by id. Returns the Select statement.

stmt = await item_crud.select(
    schema_to_select=ItemSchema,
    sort_columns='id',
    name='John',
)
# Note: This method returns a SQL Alchemy Select object, not the actual query result.

6. Count for Joined Models

count(
    db: AsyncSession,
    joins_config: Optional[list[JoinConfig]] = None,
    **kwargs: Any,
) -> int

Purpose: To count records that match specified filters, especially useful in scenarios involving joins between models. This method supports counting unique entities across relationships, a common requirement in many-to-many or complex relationships.
Usage Example: Count the number of unique projects a participant is involved in, considering a many-to-many relationship between Project and Participant models.

Models
class Project(Base):
    __tablename__ = "projects"
    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=False)
    description = Column(String(32))
    participants = relationship(
        "Participant",
        secondary="projects_participants_association",
        back_populates="projects",
    )


class Participant(Base):
    __tablename__ = "participants"
    id = Column(Integer, primary_key=True)
    name = Column(String(32), nullable=False)
    role = Column(String(32))
    projects = relationship(
        "Project",
        secondary="projects_participants_association",
        back_populates="participants",
    )


class ProjectsParticipantsAssociation(Base):
    __tablename__ = "projects_participants_association"
    project_id = Column(Integer, ForeignKey("projects.id"), primary_key=True)
    participant_id = Column(Integer, ForeignKey("participants.id"), primary_key=True)
project_crud = FastCRUD(Project)
projects_count = await project_crud.count(
    db=session,
    joins_config=[
        JoinConfig(
            model=Participant,
            join_on=ProjectsParticipantsAssociation.project_id == Project.id,
            join_type="inner",
        ),
    ],
    participant_id=specific_participant_id,
)

Error Handling

FastCRUD provides mechanisms to handle common database errors, ensuring robust API behavior.