Using FastCRUD for Enhanced CRUD Operations¶
FastCRUD is a versatile tool for handling CRUD (Create, Read, Update, Delete) operations in FastAPI applications with SQLAlchemy models. It leverages Pydantic schemas for data validation and serialization, offering a streamlined approach to database interactions.
Key Features¶
- Simplified CRUD operations with SQLAlchemy models.
- Data validation and serialization using Pydantic.
- Support for complex queries including joins and pagination.
Getting Started¶
Step 1: Define Models and Schemas¶
Define your SQLAlchemy models and Pydantic schemas for data representation.
Models and Schemas Used Below
item/model.py
from sqlalchemy import Column, DateTime, Integer, Numeric, String, func
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class Item(Base):
__tablename__ = "items"
id = Column(Integer, primary_key=True)
name = Column(String)
description = Column(String)
category = Column(String)
price = Column(Numeric)
last_sold = Column(DateTime)
created_at = Column(DateTime, default=func.now())
item/schemas.py
import datetime
from pydantic import BaseModel
class CreateItemSchema(BaseModel):
name: str | None = None
description: str | None = None
category: str | None = None
price: float | None = None
last_sold: datetime.datetime | None = None
class UpdateItemSchema(BaseModel):
name: str | None = None
description: str | None = None
category: str | None = None
price: float | None = None
last_sold: datetime.datetime | None = None
customer/model.py
customer/schemas.py
product/model.py
order/model.py
from sqlalchemy import Column, ForeignKey, Integer
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class Order(Base):
__tablename__ = "order"
id = Column(Integer, primary_key=True)
customer_id = Column(Integer, ForeignKey("customer.id"))
product_id = Column(Integer, ForeignKey("product.id"))
quantity = Column(Integer)
Step 2: Initialize FastCRUD¶
Create a FastCRUD instance for your model to handle CRUD operations.
from fastcrud import FastCRUD
# Creating a FastCRUD instance
item_crud = FastCRUD(Item)
order_crud = FastCRUD(Order)
Step 3: Pick your Method¶
Then you just pick the method you need and use it like this:
# Creating a new record (v0.20.0: returns None without schema_to_select)
result = await item_crud.create(db_session, create_schema_instance)
# result is None
# To get created data back as dict:
new_record = await item_crud.create(
db_session,
create_schema_instance,
schema_to_select=YourReadSchema
)
# To get created data back as Pydantic model:
new_record = await item_crud.create(
db_session,
create_schema_instance,
schema_to_select=YourReadSchema,
return_as_model=True
)
More on available methods below.
Understanding FastCRUD Methods¶
FastCRUD offers a comprehensive suite of methods for CRUD operations, each designed to handle different aspects of database interactions efficiently.
1. Create¶
Purpose: To create a new record in the database.
Usage Example: Creates an item with name "New Item".
# v0.20.0: Returns None without schema_to_select
result = await item_crud.create(db, CreateItemSchema(name="New Item"))
# result is None
# To get the created item data back as a dict:
new_item_dict = await item_crud.create(
db,
CreateItemSchema(name="New Item"),
schema_to_select=ReadItemSchema
)
# new_item_dict is a dict with the created item data
# To get the created item data back as a Pydantic model:
new_item_model = await item_crud.create(
db,
CreateItemSchema(name="New Item"),
schema_to_select=ReadItemSchema,
return_as_model=True
)
# new_item_model is a ReadItemSchema instance
v0.20.0 Behavior
Changes Completed in v0.20.0: The create() method now behaves consistently with other CRUD methods like update(). Changes made:
- Without
schema_to_select: Now returnsNone(was SQLAlchemy model) - With
schema_to_select: Returns created data immediately - dict by default, Pydantic model ifreturn_as_model=True
Recommended: Always use schema_to_select to get the created data back in one operation. This is more efficient than separate create + get calls.
Warning
Note that naive datetime such as datetime.utcnow is not supported by FastCRUD as it was deprecated.
Use timezone aware datetime, such as datetime.now(UTC) instead.
2. Get¶
get(
db: AsyncSession,
schema_to_select: Optional[type[BaseModel]] = None,
return_as_model: bool = False,
one_or_none: bool = False,
**kwargs: Any,
) -> Optional[Union[dict, BaseModel]]
Purpose: To fetch a single record based on filters, with an option to select specific columns using a Pydantic schema.
Return Types:
- When return_as_model=True and schema_to_select is provided: Optional[SelectSchemaType]
- When return_as_model=False: Optional[Dict[str, Any]]
Usage Examples:
Fetch item as dictionary:
Fetch item as typed Pydantic model:
from .schemas import ReadItemSchema
item = await item_crud.get(
db,
schema_to_select=ReadItemSchema,
return_as_model=True,
id=item_id
)
# Returns: Optional[ReadItemSchema]
3. Exists¶
Purpose: To check if a record exists based on provided filters.
Usage Example: Checks whether an item with name "Existing Item" exists.
4. Count¶
Purpose: To count the number of records matching provided filters.
Usage Example: Counts the number of items with the "Books" category.
5. Get Multi¶
get_multi(
db: AsyncSession,
offset: int = 0,
limit: Optional[int] = 100,
schema_to_select: Optional[type[BaseModel]] = None,
sort_columns: Optional[Union[str, list[str]]] = None,
sort_orders: Optional[Union[str, list[str]]] = None,
return_as_model: bool = False,
return_total_count: bool = True,
**kwargs: Any,
) -> dict[str, Any]
Purpose: To fetch multiple records with optional sorting, pagination, and model conversion.
Return Types:
- When return_as_model=True and schema_to_select is provided: GetMultiResponseModel[SelectSchemaType] - a TypedDict with data: list[SelectSchemaType] and total_count: int (when return_total_count=True)
- When return_as_model=False: GetMultiResponseDict - a TypedDict with data: list[dict[str, Any]] and total_count: int (when return_total_count=True)
Usage Examples:
Fetch items as dictionaries:
items = await item_crud.get_multi(db, offset=10, limit=5)
# Returns: {"data": [Dict[str, Any], ...], "total_count": int}
Fetch items as typed Pydantic models:
from .schemas import ReadItemSchema
items = await item_crud.get_multi(
db,
offset=10,
limit=5,
schema_to_select=ReadItemSchema,
return_as_model=True
)
# Returns: {"data": [ReadItemSchema, ...], "total_count": int}
Typing Your Functions:
When wrapping get_multi in your own functions, use the proper return types for full type safety:
from fastcrud import GetMultiResponseDict, GetMultiResponseModel
from .schemas import ReadItemSchema
# Option 1: Return as dict with proper typing
async def get_items(db: AsyncSession) -> GetMultiResponseDict:
return await item_crud.get_multi(db)
# Option 2: Return as model with proper typing
async def get_items_typed(db: AsyncSession) -> GetMultiResponseModel[ReadItemSchema]:
return await item_crud.get_multi(
db,
schema_to_select=ReadItemSchema,
return_as_model=True
)
# Option 3: Let the type be inferred (omit return annotation)
async def get_items_inferred(db: AsyncSession):
return await item_crud.get_multi(db)
Avoid -> dict[str, Any]
Using -> dict[str, Any] as a return type annotation discards the type information that GetMultiResponseDict provides. If you explicitly annotate with dict[str, Any], you'll need to cast the result and lose the benefit of knowing that result["data"] is a list and result["total_count"] is an int.
6. Update¶
update(
db: AsyncSession,
object: Union[UpdateSchemaType, dict[str, Any]],
allow_multiple: bool = False,
commit: bool = True,
return_columns: Optional[list[str]] = None,
schema_to_select: Optional[type[BaseModel]] = None,
return_as_model: bool = False,
one_or_none: bool = False,
**kwargs: Any,
) -> Optional[Union[dict, BaseModel]]
Purpose: To update an existing record in the database.
Return Types:
- When return_as_model=True and schema_to_select is provided: Optional[SelectSchemaType]
- When return_as_model=False: Optional[Dict[str, Any]]
Usage Examples:
Update and return as dictionary:
updated_item = await item_crud.update(
db,
UpdateItemSchema(description="Updated"),
id=item_id,
)
# Returns: Optional[Dict[str, Any]]
Update and return as typed Pydantic model:
from .schemas import ReadItemSchema, UpdateItemSchema
updated_item = await item_crud.update(
db,
UpdateItemSchema(description="Updated"),
schema_to_select=ReadItemSchema,
return_as_model=True,
id=item_id,
)
# Returns: Optional[ReadItemSchema]
7. Delete¶
delete(
db: AsyncSession,
db_row: Optional[Row] = None,
allow_multiple: bool = False,
commit: bool = True,
**kwargs: Any,
) -> None
Purpose: To delete a record from the database, with support for soft delete.
Usage Example: Deletes the item with item_id as its id, performs a soft delete if the model has the is_deleted column.
8. Hard Delete¶
db_delete(
db: AsyncSession,
allow_multiple: bool = False,
commit: bool = True,
**kwargs: Any,
) -> None
Purpose: To hard delete a record from the database.
Usage Example: Hard deletes the item with item_id as its id.
Advanced Methods for Complex Queries and Joins¶
FastCRUD extends its functionality with advanced methods tailored for complex query operations and handling joins. These methods cater to specific use cases where more sophisticated data retrieval and manipulation are required.
1. Get Multi¶
get_multi(
db: AsyncSession,
offset: int = 0,
limit: Optional[int] = 100,
schema_to_select: Optional[type[BaseModel]] = None,
sort_columns: Optional[Union[str, list[str]]] = None,
sort_orders: Optional[Union[str, list[str]]] = None,
return_as_model: bool = False,
return_total_count: bool = True,
**kwargs: Any,
) -> dict[str, Any]
Purpose: To fetch multiple records based on specified filters, with options for sorting and pagination.
Usage Example: Gets the first 10 items sorted by name in ascending order.
items = await item_crud.get_multi(
db,
offset=0,
limit=10,
sort_columns=['name'],
sort_orders=['asc'],
)
2. Get Joined¶
get_joined(
db: AsyncSession,
schema_to_select: Optional[type[BaseModel]] = None,
join_model: Optional[ModelType] = None,
join_on: Optional[Union[Join, BinaryExpression]] = None,
join_prefix: Optional[str] = None,
join_schema_to_select: Optional[type[BaseModel]] = None,
join_type: str = "left",
alias: Optional[AliasedClass] = None,
join_filters: Optional[dict] = None,
joins_config: Optional[list[JoinConfig]] = None,
nest_joins: bool = False,
relationship_type: Optional[str] = None,
**kwargs: Any,
) -> Optional[dict[str, Any]]
Purpose: To fetch a single record with one or multiple joins on other models.
Usage Example: Fetches order details for a specific order by joining with the Customer table, selecting specific columns as defined in ReadOrderSchema and ReadCustomerSchema.
order_details = await order_crud.get_joined(
db,
schema_to_select=ReadOrderSchema,
join_model=Customer,
join_schema_to_select=ReadCustomerSchema,
id=order_id,
)
3. Get Multi Joined¶
get_multi_joined(
db: AsyncSession,
schema_to_select: Optional[type[BaseModel]] = None,
join_model: Optional[type[ModelType]] = None,
join_on: Optional[Any] = None,
join_prefix: Optional[str] = None,
join_schema_to_select: Optional[type[BaseModel]] = None,
join_type: str = "left",
alias: Optional[AliasedClass[Any]] = None,
join_filters: Optional[dict] = None,
nest_joins: bool = False,
offset: int = 0,
limit: Optional[int] = 100,
sort_columns: Optional[Union[str, list[str]]] = None,
sort_orders: Optional[Union[str, list[str]]] = None,
return_as_model: bool = False,
joins_config: Optional[list[JoinConfig]] = None,
return_total_count: bool = True,
relationship_type: Optional[str] = None,
**kwargs: Any,
) -> dict[str, Any]
Purpose: Similar to get_joined, but for fetching multiple records.
Return Types:
- When return_as_model=True and schema_to_select is provided: GetMultiResponseModel[SelectSchemaType] - a TypedDict with data: list[SelectSchemaType] and total_count: int (when return_total_count=True)
- When return_as_model=False: GetMultiResponseDict - a TypedDict with data: list[dict[str, Any]] and total_count: int (when return_total_count=True)
Usage Examples:
Fetch joined records as dictionaries:
orders = await order_crud.get_multi_joined(
db,
schema_to_select=ReadOrderSchema,
join_model=Customer,
join_schema_to_select=ReadCustomerSchema,
offset=0,
limit=5,
)
# Returns: {"data": [Dict[str, Any], ...], "total_count": int}
Fetch joined records as typed Pydantic models:
orders = await order_crud.get_multi_joined(
db,
schema_to_select=ReadOrderSchema,
join_model=Customer,
join_schema_to_select=ReadCustomerSchema,
return_as_model=True,
offset=0,
limit=5,
)
# Returns: {"data": [ReadOrderSchema, ...], "total_count": int}
4. Get Multi By Cursor¶
get_multi_by_cursor(
db: AsyncSession,
cursor: Any = None,
limit: int = 100,
schema_to_select: Optional[type[BaseModel]] = None,
sort_column: str = "id",
sort_order: str = "asc",
return_as_model: bool = False,
**kwargs: Any,
) -> dict[str, Any]
Purpose: Implements cursor-based pagination for efficient data retrieval in large datasets.
Return Types: A dictionary with data: list[...] and next_cursor: Any:
- When return_as_model=True and schema_to_select is provided: {"data": list[SelectSchemaType], "next_cursor": Any}
- When return_as_model=False: {"data": list[dict[str, Any]], "next_cursor": Any}
Usage Examples:
Cursor pagination with dictionaries:
paginated_items = await item_crud.get_multi_by_cursor(
db,
cursor=last_cursor,
limit=10,
sort_column='created_at',
sort_order='desc',
)
# Returns: {"data": [Dict[str, Any], ...], "next_cursor": Any}
Cursor pagination with typed Pydantic models:
from .schemas import ReadItemSchema
paginated_items = await item_crud.get_multi_by_cursor(
db,
cursor=last_cursor,
limit=10,
schema_to_select=ReadItemSchema,
return_as_model=True,
sort_column='created_at',
sort_order='desc',
)
# Returns: {"data": [ReadItemSchema, ...], "next_cursor": Any}
5. Select¶
async def select(
db: AsyncSession,
schema_to_select: Optional[type[BaseModel]] = None,
sort_columns: Optional[Union[str, list[str]]] = None,
sort_orders: Optional[Union[str, list[str]]] = None,
**kwargs: Any,
) -> Select
Purpose: Constructs a SQL Alchemy Select statement with optional column selection, filtering, and sorting.
Usage Example: Selects all items, filtering by name and sorting by id. Returns the Select statement.
stmt = await item_crud.select(
schema_to_select=ItemSchema,
sort_columns='id',
name='John',
)
# Note: This method returns a SQL Alchemy Select object, not the actual query result.
6. Count for Joined Models¶
Purpose: To count records that match specified filters, especially useful in scenarios involving joins between models. This method supports counting unique entities across relationships, a common requirement in many-to-many or complex relationships.
Usage Example: Count the number of unique projects a participant is involved in, considering a many-to-many relationship between Project and Participant models.
Models
class Project(Base):
__tablename__ = "projects"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
description = Column(String(32))
participants = relationship(
"Participant",
secondary="projects_participants_association",
back_populates="projects",
)
class Participant(Base):
__tablename__ = "participants"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
role = Column(String(32))
projects = relationship(
"Project",
secondary="projects_participants_association",
back_populates="participants",
)
class ProjectsParticipantsAssociation(Base):
__tablename__ = "projects_participants_association"
project_id = Column(Integer, ForeignKey("projects.id"), primary_key=True)
participant_id = Column(Integer, ForeignKey("participants.id"), primary_key=True)
project_crud = FastCRUD(Project)
projects_count = await project_crud.count(
db=session,
joins_config=[
JoinConfig(
model=Participant,
join_on=ProjectsParticipantsAssociation.project_id == Project.id,
join_type="inner",
),
],
participant_id=specific_participant_id,
)
Error Handling¶
FastCRUD provides mechanisms to handle common database errors, ensuring robust API behavior.