Skip to content

Event System API Reference

The CRUDAdmin event system provides comprehensive audit logging and event tracking for admin operations. This system automatically logs admin actions, authentication events, and security-related activities with full audit trails.

Core Components

Event Types and Status

Bases: str, Enum

Source code in crudadmin/event/schemas.py
class EventType(str, enum.Enum):
    CREATE = "create"
    UPDATE = "update"
    DELETE = "delete"
    LOGIN = "login"
    LOGOUT = "logout"
    FAILED_LOGIN = "failed_login"

Bases: str, Enum

Source code in crudadmin/event/schemas.py
class EventStatus(str, enum.Enum):
    SUCCESS = "success"
    FAILURE = "failure"
    WARNING = "warning"

Model Creation Functions

Source code in crudadmin/event/models.py
def create_admin_event_log(base: type[DeclarativeBase]) -> type[DeclarativeBase]:
    tablename = "admin_event_log"

    if hasattr(base, "registry") and hasattr(base.registry, "_class_registry"):
        existing_class = base.registry._class_registry.get("AdminEventLog")
        if existing_class is not None and isinstance(existing_class, type):
            if issubclass(existing_class, base):
                return cast(type[DeclarativeBase], existing_class)

    class AdminEventLog(base):  # type: ignore
        __tablename__ = tablename
        __table_args__ = {"extend_existing": True}

        id: Mapped[int] = mapped_column(
            "id",
            autoincrement=True,
            nullable=False,
            unique=True,
            primary_key=True,
        )
        timestamp: Mapped[datetime] = mapped_column(
            DateTime(timezone=True),
            default=lambda: datetime.now(UTC),
            nullable=False,
        )
        event_type: Mapped[EventType] = mapped_column(
            SQLEnum(EventType), nullable=False
        )
        status: Mapped[EventStatus] = mapped_column(
            SQLEnum(EventStatus), nullable=False
        )
        user_id: Mapped[int] = mapped_column(index=True)
        session_id: Mapped[str] = mapped_column(String(36), index=True)
        ip_address: Mapped[str] = mapped_column(String(45))
        user_agent: Mapped[str] = mapped_column(String(512))
        resource_type: Mapped[Optional[str]] = mapped_column(String(128))
        resource_id: Mapped[Optional[str]] = mapped_column(String(128))
        details: Mapped[dict[str, Any]] = mapped_column(
            JSON, default=dict, nullable=False
        )

        def __repr__(self):
            return f"<AdminEventLog(id={self.id}, event_type={self.event_type}, user_id={self.user_id})>"

    return AdminEventLog
Source code in crudadmin/event/models.py
def create_admin_audit_log(base: type[DeclarativeBase]) -> type[DeclarativeBase]:
    tablename = "admin_audit_log"

    if hasattr(base, "registry") and hasattr(base.registry, "_class_registry"):
        existing_class = base.registry._class_registry.get("AdminAuditLog")
        if existing_class is not None and isinstance(existing_class, type):
            if issubclass(existing_class, base):
                return cast(type[DeclarativeBase], existing_class)

    class AdminAuditLog(base):  # type: ignore
        __tablename__ = tablename
        __table_args__ = {"extend_existing": True}

        id: Mapped[int] = mapped_column(
            "id",
            autoincrement=True,
            nullable=False,
            unique=True,
            primary_key=True,
        )
        event_id: Mapped[int] = mapped_column(index=True)
        timestamp: Mapped[datetime] = mapped_column(
            DateTime(timezone=True),
            default=lambda: datetime.now(UTC),
            nullable=False,
        )
        resource_type: Mapped[str] = mapped_column(String(128))
        resource_id: Mapped[str] = mapped_column(String(128))
        action: Mapped[str] = mapped_column(String(64))
        previous_state: Mapped[Optional[dict[str, Any]]] = mapped_column(
            JSON, nullable=True
        )
        new_state: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True)
        changes: Mapped[dict[str, Any]] = mapped_column(
            JSON, default=dict, nullable=False
        )
        audit_metadata: Mapped[dict[str, Any]] = mapped_column(
            JSON, default=dict, nullable=False
        )

        def __repr__(self):
            return f"<AdminAuditLog(id={self.id}, resource_type={self.resource_type}, resource_id={self.resource_id})>"

    return AdminAuditLog

Event Service

The main service class for managing event logging and retrieval.

Source code in crudadmin/event/service.py
class EventService:
    def __init__(self, db_config):
        self.db_config = db_config
        self.crud_events = FastCRUD(db_config.AdminEventLog)
        self.crud_audits = FastCRUD(db_config.AdminAuditLog)
        self.json_encoder = CustomJSONEncoder()

    def _serialize_dict(self, data: Optional[dict]) -> dict:
        if not data:
            return {}
        return cast(dict, json.loads(self.json_encoder.encode(data)))

    async def log_event(
        self,
        db: AsyncSession,
        event_type: EventType,
        status: EventStatus,
        user_id: int,
        session_id: str,
        request: Request,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        details: Optional[dict] = None,
    ) -> AdminEventLogRead:
        try:
            ip_address = request.client.host if request.client else "unknown"

            event_data = AdminEventLogCreate(
                event_type=event_type,
                status=status,
                user_id=user_id,
                session_id=session_id,
                ip_address=ip_address,
                user_agent=request.headers.get("user-agent", ""),
                resource_type=resource_type,
                resource_id=resource_id,
                details=self._serialize_dict(details),
            )

            result = await self.crud_events.create(db=db, object=event_data)

            if hasattr(result, "__dict__"):
                result_dict = {
                    k: v for k, v in result.__dict__.items() if not k.startswith("_")
                }
            else:
                result_dict = cast(dict, dict(result))

            event_read = AdminEventLogRead(**result_dict)

            await db.commit()
            return event_read

        except Exception as e:
            logger.error(f"Error logging event: {str(e)}", exc_info=True)
            raise

    async def create_audit_log(
        self,
        db: AsyncSession,
        event_id: int,
        resource_type: str,
        resource_id: str,
        action: str,
        previous_state: Optional[dict] = None,
        new_state: Optional[dict] = None,
        metadata: Optional[dict] = None,
    ) -> AdminAuditLogRead:
        try:
            audit_data = AdminAuditLogCreate(
                event_id=event_id,
                resource_type=resource_type,
                resource_id=resource_id,
                action=action,
                previous_state=self._serialize_dict(previous_state),
                new_state=self._serialize_dict(new_state),
                changes=self._serialize_dict(
                    self._compute_changes(previous_state, new_state)
                ),
                metadata=self._serialize_dict(metadata),
            )

            result = await self.crud_audits.create(db=db, object=audit_data)

            if hasattr(result, "__dict__"):
                result_dict = {
                    k: v for k, v in result.__dict__.items() if not k.startswith("_")
                }
            else:
                result_dict = cast(dict, dict(result))

            return AdminAuditLogRead(**result_dict)

        except Exception as e:
            logger.error(f"Error creating audit log: {str(e)}", exc_info=True)
            raise

    def _compute_changes(
        self,
        previous_state: Optional[dict],
        new_state: Optional[dict],
    ) -> dict:
        """Compute changes between previous and new states."""
        changes: dict = {}

        if not previous_state or not new_state:
            return changes

        all_keys = set(previous_state.keys()) | set(new_state.keys())

        for key in all_keys:
            old_value = previous_state.get(key)
            new_value = new_state.get(key)

            if old_value != new_value:
                changes[key] = {"old": old_value, "new": new_value}

        return changes

    async def get_user_activity(
        self,
        db: AsyncSession,
        user_id: int,
        start_time: Optional[datetime] = None,
        end_time: Optional[datetime] = None,
        limit: int = 50,
        offset: int = 0,
    ) -> dict:
        """Get user activity logs."""
        filters: dict = {"user_id": user_id}

        if start_time:
            filters["timestamp__gte"] = start_time
        if end_time:
            filters["timestamp__lte"] = end_time

        result = await self.crud_events.get_multi(
            db,
            offset=offset,
            limit=limit,
            sort_columns=["timestamp"],
            sort_orders=["desc"],
            **filters,
        )

        return cast(dict, result)

    async def get_resource_history(
        self,
        db: AsyncSession,
        resource_type: str,
        resource_id: str,
        limit: int = 50,
        offset: int = 0,
    ) -> dict:
        """Get audit history for a specific resource."""
        result = await self.crud_audits.get_multi(
            db,
            offset=offset,
            limit=limit,
            sort_columns=["timestamp"],
            sort_orders=["desc"],
            resource_type=resource_type,
            resource_id=resource_id,
        )

        return cast(dict[str, Any], result)

    async def get_security_alerts(
        self, db: AsyncSession, lookback_hours: int = 24
    ) -> list[dict[str, Any]]:
        """Get security alerts based on event patterns."""
        alerts: list[dict[str, Any]] = []
        lookback_time = datetime.now(UTC) - timedelta(hours=lookback_hours)

        failed_logins = await self.crud_events.get_multi(
            db,
            event_type=EventType.FAILED_LOGIN,
            status=EventStatus.FAILURE,
            timestamp__gte=lookback_time,
        )

        failed_login_patterns: dict[tuple, int] = {}

        for login in failed_logins.get("data", []):
            key = (
                login.get("ip_address", "unknown"),
                login.get("details", {}).get("username", "unknown"),
            )
            failed_login_patterns[key] = failed_login_patterns.get(key, 0) + 1

        for (ip, username), count in failed_login_patterns.items():
            if count >= 5:
                alerts.append(
                    {
                        "type": "multiple_failed_logins",
                        "severity": "high",
                        "details": {
                            "ip_address": ip,
                            "username": username,
                            "attempts": count,
                        },
                    }
                )

        return alerts

    async def cleanup_old_logs(
        self, db: AsyncSession, retention_days: int = 90
    ) -> None:
        """Clean up old logs based on retention policy."""
        try:
            cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)

            await self.crud_events.delete(
                db, allow_multiple=True, timestamp__lt=cutoff_date
            )

            await self.crud_audits.delete(
                db, allow_multiple=True, timestamp__lt=cutoff_date
            )

        except Exception as e:
            logger.error(f"Error cleaning up old logs: {str(e)}", exc_info=True)
            raise

cleanup_old_logs(db, retention_days=90) async

Clean up old logs based on retention policy.

Source code in crudadmin/event/service.py
async def cleanup_old_logs(
    self, db: AsyncSession, retention_days: int = 90
) -> None:
    """Clean up old logs based on retention policy."""
    try:
        cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)

        await self.crud_events.delete(
            db, allow_multiple=True, timestamp__lt=cutoff_date
        )

        await self.crud_audits.delete(
            db, allow_multiple=True, timestamp__lt=cutoff_date
        )

    except Exception as e:
        logger.error(f"Error cleaning up old logs: {str(e)}", exc_info=True)
        raise

get_resource_history(db, resource_type, resource_id, limit=50, offset=0) async

Get audit history for a specific resource.

Source code in crudadmin/event/service.py
async def get_resource_history(
    self,
    db: AsyncSession,
    resource_type: str,
    resource_id: str,
    limit: int = 50,
    offset: int = 0,
) -> dict:
    """Get audit history for a specific resource."""
    result = await self.crud_audits.get_multi(
        db,
        offset=offset,
        limit=limit,
        sort_columns=["timestamp"],
        sort_orders=["desc"],
        resource_type=resource_type,
        resource_id=resource_id,
    )

    return cast(dict[str, Any], result)

get_security_alerts(db, lookback_hours=24) async

Get security alerts based on event patterns.

Source code in crudadmin/event/service.py
async def get_security_alerts(
    self, db: AsyncSession, lookback_hours: int = 24
) -> list[dict[str, Any]]:
    """Get security alerts based on event patterns."""
    alerts: list[dict[str, Any]] = []
    lookback_time = datetime.now(UTC) - timedelta(hours=lookback_hours)

    failed_logins = await self.crud_events.get_multi(
        db,
        event_type=EventType.FAILED_LOGIN,
        status=EventStatus.FAILURE,
        timestamp__gte=lookback_time,
    )

    failed_login_patterns: dict[tuple, int] = {}

    for login in failed_logins.get("data", []):
        key = (
            login.get("ip_address", "unknown"),
            login.get("details", {}).get("username", "unknown"),
        )
        failed_login_patterns[key] = failed_login_patterns.get(key, 0) + 1

    for (ip, username), count in failed_login_patterns.items():
        if count >= 5:
            alerts.append(
                {
                    "type": "multiple_failed_logins",
                    "severity": "high",
                    "details": {
                        "ip_address": ip,
                        "username": username,
                        "attempts": count,
                    },
                }
            )

    return alerts

get_user_activity(db, user_id, start_time=None, end_time=None, limit=50, offset=0) async

Get user activity logs.

Source code in crudadmin/event/service.py
async def get_user_activity(
    self,
    db: AsyncSession,
    user_id: int,
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    limit: int = 50,
    offset: int = 0,
) -> dict:
    """Get user activity logs."""
    filters: dict = {"user_id": user_id}

    if start_time:
        filters["timestamp__gte"] = start_time
    if end_time:
        filters["timestamp__lte"] = end_time

    result = await self.crud_events.get_multi(
        db,
        offset=offset,
        limit=limit,
        sort_columns=["timestamp"],
        sort_orders=["desc"],
        **filters,
    )

    return cast(dict, result)

Event System Integration

High-level integration class for simplified event logging.

Source code in crudadmin/event/integration.py
class EventSystemIntegration:
    def __init__(self, event_service: EventService):
        self.event_service = event_service

    async def log_model_event(
        self,
        db: AsyncSession,
        event_type: EventType,
        model: Type[DeclarativeBase],
        user_id: int,
        session_id: str,
        request: Request,
        resource_id: Optional[str] = None,
        previous_state: Optional[Dict[str, Any]] = None,
        new_state: Optional[Dict[str, Any]] = None,
        details: Optional[Dict[str, Any]] = None,
    ):
        try:
            event = await self.event_service.log_event(
                db=db,
                event_type=event_type,
                status=EventStatus.SUCCESS,
                user_id=user_id,
                session_id=session_id,
                request=request,
                resource_type=model.__name__,
                resource_id=str(resource_id) if resource_id else None,
                details=details,
            )

            if (
                event
                and event_type in [EventType.CREATE, EventType.UPDATE, EventType.DELETE]
                and resource_id
            ):
                await self.event_service.create_audit_log(
                    db=db,
                    event_id=event.id,
                    resource_type=model.__name__,
                    resource_id=str(resource_id),
                    action=event_type.value,
                    previous_state=previous_state,
                    new_state=new_state,
                    metadata=details,
                )

            await db.commit()
            return event

        except Exception as e:
            logger.error(f"Error in event logging: {str(e)}")
            await db.rollback()
            raise

    async def log_auth_event(
        self,
        db: AsyncSession,
        event_type: EventType,
        user_id: int,
        session_id: str,
        request: Request,
        success: bool,
        details: Optional[Dict[str, Any]] = None,
    ):
        """Log authentication-related events."""
        try:
            status = EventStatus.SUCCESS if success else EventStatus.FAILURE

            await self.event_service.log_event(
                db=db,
                event_type=event_type,
                status=status,
                user_id=user_id,
                session_id=session_id,
                request=request,
                details=details,
            )

        except Exception as e:
            logger.error(f"Error logging auth event: {str(e)}", exc_info=True)

    async def log_security_event(
        self,
        db: AsyncSession,
        event_type: EventType,
        user_id: int,
        session_id: str,
        request: Request,
        details: Dict[str, Any],
    ):
        """Log security-related events with high priority."""
        try:
            event = await self.event_service.log_event(
                db=db,
                event_type=event_type,
                status=EventStatus.WARNING,
                user_id=user_id,
                session_id=session_id,
                request=request,
                details={**details, "priority": "high", "requires_attention": True},
            )

            return event

        except Exception as e:
            logger.error(f"Error logging security event: {str(e)}", exc_info=True)
            raise

log_auth_event(db, event_type, user_id, session_id, request, success, details=None) async

Log authentication-related events.

Source code in crudadmin/event/integration.py
async def log_auth_event(
    self,
    db: AsyncSession,
    event_type: EventType,
    user_id: int,
    session_id: str,
    request: Request,
    success: bool,
    details: Optional[Dict[str, Any]] = None,
):
    """Log authentication-related events."""
    try:
        status = EventStatus.SUCCESS if success else EventStatus.FAILURE

        await self.event_service.log_event(
            db=db,
            event_type=event_type,
            status=status,
            user_id=user_id,
            session_id=session_id,
            request=request,
            details=details,
        )

    except Exception as e:
        logger.error(f"Error logging auth event: {str(e)}", exc_info=True)

log_security_event(db, event_type, user_id, session_id, request, details) async

Log security-related events with high priority.

Source code in crudadmin/event/integration.py
async def log_security_event(
    self,
    db: AsyncSession,
    event_type: EventType,
    user_id: int,
    session_id: str,
    request: Request,
    details: Dict[str, Any],
):
    """Log security-related events with high priority."""
    try:
        event = await self.event_service.log_event(
            db=db,
            event_type=event_type,
            status=EventStatus.WARNING,
            user_id=user_id,
            session_id=session_id,
            request=request,
            details={**details, "priority": "high", "requires_attention": True},
        )

        return event

    except Exception as e:
        logger.error(f"Error logging security event: {str(e)}", exc_info=True)
        raise

Decorators

Convenient decorators for automatic event logging.

Source code in crudadmin/event/decorators.py
def log_admin_action(
    event_type: EventType, model: Optional[Type[DeclarativeBase]] = None
):
    def decorator(func: Callable):
        @functools.wraps(func)
        async def wrapper(
            *args,
            request: Request,
            db: AsyncSession,
            admin_db: AsyncSession,
            current_user: Any,
            event_integration=None,
            **kwargs,
        ):
            user_dict = convert_user_to_dict(current_user) if current_user else None

            previous_state = None
            crud: Optional[FastCRUD] = None

            if event_type in [EventType.UPDATE, EventType.DELETE]:
                try:
                    if model is not None:
                        crud = FastCRUD(model)
                    else:
                        logger.error("Model is None. Cannot initialize FastCRUD.")
                        raise ValueError("Model must not be None.")

                    if "id" in kwargs:
                        assert crud is not None, "CRUD instance should be initialized."
                        item = await crud.get(db=db, id=kwargs["id"])
                        if item:
                            previous_state = {
                                k: v for k, v in item.items() if not k.startswith("_")
                            }
                except Exception as e:
                    logger.error(f"Error fetching previous state: {str(e)}")
                    raise

            result = await func(
                *args,
                request=request,
                db=db,
                admin_db=admin_db,
                current_user=current_user,
                **kwargs,
            )

            try:
                if event_integration and user_dict:
                    session_id = request.cookies.get("session_id", "unknown")

                    new_state = None
                    resource_id = kwargs.get("id")

                    if event_type == EventType.UPDATE:
                        try:
                            if model is not None:
                                crud = FastCRUD(model)
                            assert crud is not None, (
                                "CRUD instance should be initialized."
                            )
                            updated_item = await crud.get(db=db, id=kwargs["id"])
                            if updated_item:
                                new_state = {
                                    k: v
                                    for k, v in updated_item.items()
                                    if not k.startswith("_")
                                }
                                new_state = get_model_changes(new_state)
                        except Exception as e:
                            logger.error(f"Error fetching updated state: {str(e)}")

                    elif hasattr(request.state, "crud_result"):
                        crud_result = request.state.crud_result
                        if hasattr(crud_result, "__dict__"):
                            model_dict = {
                                k: v
                                for k, v in crud_result.__dict__.items()
                                if not k.startswith("_")
                            }
                        else:
                            model_dict = dict(crud_result)

                        resource_id = str(model_dict.get("id", resource_id))
                        new_state = get_model_changes(model_dict)

                    if event_type == EventType.DELETE:
                        try:
                            body = await request.json()
                            ids = body.get("ids", [])
                            logger.info(f"Delete request received for ids: {ids}")

                            deleted_records = []
                            if hasattr(request.state, "deleted_records"):
                                deleted_records = [
                                    {
                                        k: v
                                        for k, v in record.items()
                                        if not k.startswith("_")
                                    }
                                    for record in request.state.deleted_records
                                ]

                            new_state = {
                                "action": "delete",
                                "deleted_at": datetime.now(UTC).isoformat(),
                                "deleted_records": deleted_records,
                                "deletion_details": {
                                    "deleted_by": user_dict.get("username"),
                                    "trigger_path": request.url.path,
                                    "deletion_type": "bulk"
                                    if "bulk-delete" in request.url.path
                                    else "single",
                                    "records_count": len(deleted_records),
                                    "requested_ids": ids,
                                },
                            }
                        except Exception as e:
                            logger.error(f"Error in bulk delete process: {str(e)}")

                    elif event_type == EventType.UPDATE:
                        changes = compare_states(previous_state, new_state)
                        new_state = {
                            "action": "update",
                            "updated_at": datetime.now(UTC).isoformat(),
                            "previous_state": previous_state,
                            "new_state": new_state,
                            "changes": changes,
                            "update_details": {
                                "updated_by": user_dict.get("username"),
                                "update_path": request.url.path,
                                "modified_fields": list(changes.keys()),
                            },
                        }

                    details = {
                        "resource_details": {
                            "model": model.__name__ if model else None,
                            "id": resource_id,
                            "changes": new_state
                            if event_type in [EventType.UPDATE, EventType.DELETE]
                            else new_state,
                        },
                        "request_details": {
                            "method": request.method,
                            "path": str(request.url.path),
                            "user_agent": request.headers.get("user-agent"),
                        },
                    }

                    await event_integration.log_model_event(
                        db=admin_db,
                        event_type=event_type,
                        model=model,
                        user_id=user_dict["id"],
                        session_id=session_id,
                        request=request,
                        resource_id=resource_id,
                        previous_state=previous_state,
                        new_state=new_state,
                        details=details,
                    )
                    await admin_db.commit()

            except Exception as e:
                logger.error(f"Error logging event: {str(e)}")

            return result

        return wrapper

    return decorator
Source code in crudadmin/event/decorators.py
def log_auth_action(event_type: EventType) -> Callable:
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(
            *args, request: Request, db: AsyncSession, event_integration=None, **kwargs
        ):
            if event_integration:
                try:
                    form_data = kwargs.get("form_data")
                    result = await func(*args, request=request, db=db, **kwargs)

                    user_id = None
                    username = None
                    session_id = None
                    success = False

                    if event_type == EventType.LOGIN:
                        if (
                            hasattr(request.state, "user")
                            and request.state.user is not None
                        ):
                            user_id = request.state.user.get("id")
                            username = request.state.user.get("username")
                            success = True
                            if hasattr(result, "headers"):
                                for header in result.raw_headers:
                                    if (
                                        header[0].decode() == "set-cookie"
                                        and b"session_id=" in header[1]
                                    ):
                                        session_id = (
                                            header[1]
                                            .decode()
                                            .split("session_id=")[1]
                                            .split(";")[0]
                                        )
                                        break
                    elif event_type == EventType.LOGOUT:
                        session_id = request.cookies.get("session_id", "unknown")
                        if (
                            hasattr(request.state, "user")
                            and request.state.user is not None
                        ):
                            user_id = request.state.user.get("id")
                            username = request.state.user.get("username")
                            success = True

                    if not session_id:
                        session_id = "unknown"

                    details = {
                        "auth_details": {
                            "event_type": event_type.value,
                            "username": username
                            or (form_data.username if form_data else "unknown"),
                            "success": success,
                            "timestamp": datetime.now(UTC).isoformat(),
                        },
                        "request_details": {
                            "method": request.method,
                            "path": str(request.url.path),
                            "ip_address": request.client.host
                            if request.client
                            else "unknown",
                            "user_agent": request.headers.get("user-agent", "Unknown"),
                        },
                        "session_details": {
                            "session_id": session_id,
                            "browser": request.headers.get("user-agent", "Unknown"),
                        },
                    }

                    await event_integration.log_auth_event(
                        db=db,
                        event_type=event_type,
                        user_id=user_id or 0,
                        session_id=session_id,
                        request=request,
                        success=success,
                        details=details,
                    )

                    await db.commit()

                    return result

                except Exception as e:
                    logger.error(f"Error logging auth event: {str(e)}")
                    raise

            return await func(*args, request=request, db=db, **kwargs)

        return wrapper

    return decorator

Event Schemas

Event Log Schemas

Bases: BaseModel

Source code in crudadmin/event/schemas.py
class AdminEventLogBase(BaseModel):
    event_type: EventType
    status: EventStatus
    user_id: int
    session_id: str
    ip_address: str
    user_agent: str
    resource_type: Optional[str] = None
    resource_id: Optional[str] = None
    details: dict = {}

Bases: AdminEventLogBase

Source code in crudadmin/event/schemas.py
class AdminEventLogCreate(AdminEventLogBase):
    pass

Bases: AdminEventLogBase

Source code in crudadmin/event/schemas.py
class AdminEventLogRead(AdminEventLogBase):
    model_config = ConfigDict(from_attributes=True)

    id: int
    timestamp: datetime

Audit Log Schemas

Bases: BaseModel

Source code in crudadmin/event/schemas.py
class AdminAuditLogBase(BaseModel):
    event_id: int
    resource_type: str
    resource_id: str
    action: str
    previous_state: Optional[dict] = None
    new_state: Optional[dict] = None
    changes: dict = {}
    metadata: dict = {}

Bases: AdminAuditLogBase

Source code in crudadmin/event/schemas.py
class AdminAuditLogCreate(AdminAuditLogBase):
    pass

Bases: AdminAuditLogBase

Source code in crudadmin/event/schemas.py
class AdminAuditLogRead(AdminAuditLogBase):
    model_config = ConfigDict(from_attributes=True)

    id: int
    timestamp: datetime

Usage Examples

Basic Event Logging

from crudadmin.event import EventService, EventType, EventStatus

# Initialize event service
event_service = EventService(db_config)

# Log a successful login
await event_service.log_event(
    db=db,
    event_type=EventType.LOGIN,
    status=EventStatus.SUCCESS,
    user_id=user.id,
    session_id=session_id,
    request=request,
    details={"login_method": "password"}
)

Automatic Event Logging with Decorators

from crudadmin.event import log_admin_action, EventType

@log_admin_action(EventType.CREATE, model=User)
async def create_user_endpoint(
    request: Request,
    db: AsyncSession,
    admin_db: AsyncSession,
    current_user: User,
    event_integration=None,
    user_data: UserCreate
):
    # Create user logic here
    user = await create_user(db, user_data)
    return user

Model Event Integration

from crudadmin.event import EventSystemIntegration

# Initialize integration
event_integration = EventSystemIntegration(event_service)

# Log model changes
await event_integration.log_model_event(
    db=admin_db,
    event_type=EventType.UPDATE,
    model=Product,
    user_id=current_user.id,
    session_id=session_id,
    request=request,
    resource_id=str(product.id),
    previous_state={"name": "Old Name", "price": 10.00},
    new_state={"name": "New Name", "price": 15.00},
    details={"field_changed": "name, price"}
)

Querying Event History

# Get user activity
activity = await event_service.get_user_activity(
    db=admin_db,
    user_id=user.id,
    start_time=datetime.now() - timedelta(days=7),
    limit=100
)

# Get resource audit history
history = await event_service.get_resource_history(
    db=admin_db,
    resource_type="Product",
    resource_id="123",
    limit=50
)

Event Configuration

Enabling Event Tracking

from crudadmin import CRUDAdmin

# Enable event tracking
crud_admin = CRUDAdmin(
    track_events=True,
    session_backend="database",  # Required for event storage
    secret_key="your-secret-key"
)

Custom Event Details

Events can include custom details for additional context:

await event_service.log_event(
    db=admin_db,
    event_type=EventType.CREATE,
    status=EventStatus.SUCCESS,
    user_id=user.id,
    session_id=session_id,
    request=request,
    resource_type="Product",
    resource_id="123",
    details={
        "category": "electronics",
        "bulk_operation": True,
        "import_batch_id": "batch_001",
        "validation_warnings": ["price_below_cost"]
    }
)

Event Model Fields

AdminEventLog Fields

Field Type Description
id int Primary key
timestamp datetime When the event occurred
event_type EventType Type of event (CREATE, UPDATE, DELETE, LOGIN, etc.)
status EventStatus Event status (SUCCESS, FAILURE, WARNING)
user_id int ID of the user who performed the action
session_id str Session ID for the request
ip_address str IP address of the client
user_agent str User agent string from the request
resource_type str Type of resource affected (model name)
resource_id str ID of the specific resource
details dict Additional event-specific details

AdminAuditLog Fields

Field Type Description
id int Primary key
event_id int Reference to AdminEventLog
timestamp datetime When the audit record was created
resource_type str Type of resource (model name)
resource_id str ID of the specific resource
action str Action performed (create, update, delete)
previous_state dict State before the change
new_state dict State after the change
changes dict Computed differences between states
audit_metadata dict Additional audit metadata

Security Considerations

Event Integrity

  • Events are stored in append-only fashion
  • Original event records are never modified
  • All changes maintain full audit trails
  • Events include request context for security analysis

Data Privacy

  • Sensitive fields can be excluded from audit logs
  • Password fields are automatically excluded
  • PII can be masked or hashed in event details
  • Event retention policies can be implemented

Performance Impact

  • Event logging is asynchronous where possible
  • Failed event logging doesn't interrupt main operations
  • Database indexes optimize event querying
  • Cleanup routines prevent unbounded growth

Error Handling

The event system includes robust error handling:

try:
    await event_service.log_event(...)
except Exception as e:
    # Event logging failure doesn't interrupt main operation
    logger.error(f"Event logging failed: {e}")
    # Continue with main business logic

Monitoring and Alerting

High-Priority Events

await event_integration.log_security_event(
    db=admin_db,
    event_type=EventType.FAILED_LOGIN,
    user_id=user.id,
    session_id=session_id,
    request=request,
    details={
        "priority": "high",
        "requires_attention": True,
        "failed_attempts": 5,
        "suspicious_activity": True
    }
)

Event Metrics

Use event data for monitoring:

  • Failed login attempt patterns
  • Admin activity volume
  • Resource modification frequency
  • User behavior analysis

The event system provides a complete audit trail for compliance, security monitoring, and operational insights into your CRUDAdmin instance.