Skip to content

Transports

A transport authenticates a request and returns a Principal. Configure them via transports=[...]; the first credential present wins.

Transport (port)

crudauth.Transport

Bases: ABC

Base class for authentication transports.

Implement authenticate (the authn slice) and optionally contributes_routes. Everything else - identity resolution, authorization gates, the Principal shape - is shared by the facade.

Attributes:

Name Type Description
name str

Stable transport name, surfaced as Principal.transport and used for per-endpoint narrowing (current_user(transport="session")).

_cookie_override CookieConfig | None

Optional per-transport cookie policy; falls back to the app-wide policy when None.

Example
class ApiKeyTransport(Transport):
    name = "apikey"

    async def authenticate(self, request, ctx):
        raw = request.headers.get("X-API-Key")
        if not raw:
            return None
        user = await ctx.resolve_user(lookup_user_id(raw))
        return None if user is None else ctx.build_principal(
            user_id=ctx.repo.user_id(user), user=user, transport=self.name,
        )

bind

bind(runtime: AuthRuntime) -> None

Called by CRUDAuth when the transport is registered.

cookie_config

cookie_config() -> CookieConfig

The effective cookie policy: this transport's override, else app-wide.

authenticate abstractmethod async

authenticate(
    request: Request, ctx: AuthContext
) -> Principal | None

Authenticate request.

Returns a Principal on success, or None if this transport's credentials are absent (so the facade can try the next transport). Raise an HTTP exception only for a present-but-invalid credential that should hard-fail the request (e.g. a session cookie that fails CSRF).

contributes_routes

contributes_routes() -> APIRouter | None

Return an APIRouter of endpoints this transport adds, or None.

initialize async

initialize() -> None

Open connections / start background work. Called from auth.initialize().

shutdown async

shutdown() -> None

Release resources. Called from auth.shutdown().

SessionTransport

crudauth.SessionTransport

SessionTransport(
    *,
    backend: str = BACKEND_MEMORY,
    redis_url: str | None = None,
    csrf: bool = True,
    max_sessions_per_user: int = DEFAULT_MAX_SESSIONS_PER_USER,
    session_timeout_minutes: int = DEFAULT_SESSION_TIMEOUT_MINUTES,
    remember_me_days: int = DEFAULT_REMEMBER_ME_DAYS,
    cleanup_interval_minutes: int = DEFAULT_CLEANUP_INTERVAL_MINUTES,
    cookies: CookieConfig | None = None,
    login_max_attempts: int = DEFAULT_LOGIN_MAX_ATTEMPTS,
    login_attempt_window_seconds: int = DEFAULT_LOGIN_ATTEMPT_WINDOW_SECONDS,
    login_lockout_base_seconds: int = DEFAULT_LOGIN_LOCKOUT_BASE_SECONDS,
    login_lockout_max_seconds: int = DEFAULT_LOGIN_LOCKOUT_MAX_SECONDS,
    on_login_success: Literal[
        "clear_all", "clear_user_only"
    ] = "clear_all",
)

Bases: Transport

Cookie-based session auth - the default transport.

Configuring nothing gives cookie sessions, CSRF synchronizer-token (header-only), login lockout, secure cookies, and /login /logout. CSRF is enforced inside authenticate on unsafe methods; the session cookie is never SameSite=None (rejected at construction).

Parameters:

Name Type Description Default
backend str

"memory" (default) or "redis" for shared/persistent state.

BACKEND_MEMORY
redis_url str | None

Connection URL when backend="redis".

None
csrf bool

Enforce the synchronizer-token header on unsafe methods (default True).

True
cookies CookieConfig | None

Per-transport CookieConfig override.

None
login_max_attempts int

Failed logins before the escalating lockout trips.

DEFAULT_LOGIN_MAX_ATTEMPTS
on_login_success Literal['clear_all', 'clear_user_only']

What a successful login clears - "clear_all" (default) or "clear_user_only" (keeps per-IP pressure; only safe when the per-IP key identifies an individual client, not a shared NAT/CGNAT egress). Governs the shared lockout (both /login and /token). See LockoutPolicy.

'clear_all'
Example
CRUDAuth(
    session=get_session, user_model=User, SECRET_KEY=...,
    transports=[SessionTransport(backend="redis", redis_url=..., csrf=True)],
)

bind

bind(runtime: AuthRuntime) -> None

Build the SessionManager from the bound runtime.

Note

Rejects SameSite=None for the session cookie at config time. SameSite is the backstop the header-only CSRF check leans on (the cookie auto-rides cross-origin, the header doesn't); none removes it and silently weakens CSRF. Bearer cookies may be none (no CSRF surface), so this guard is session-transport-specific.

Note

Login lockout is the shared runtime.lockout (the same policy the bearer /token route uses), so the two endpoints can't sidestep each other's counter.

initialize async

initialize() -> None

Open the session manager's storage connections.

shutdown async

shutdown() -> None

Close the session manager's storage connections.

authenticate async

authenticate(
    request: Request, ctx: AuthContext
) -> Principal | None

Authenticate via the session cookie.

Returns None when no session cookie is present or the session is invalid/idle-expired (try the next transport). On a present, valid session it enforces CSRF for unsafe methods (raising on failure) and returns the Principal.

BearerTransport

crudauth.BearerTransport

BearerTransport(
    *,
    access_ttl: int = DEFAULT_ACCESS_TTL_SECONDS,
    refresh_ttl_days: int = DEFAULT_REFRESH_TTL_DAYS,
    refresh: str = REFRESH_LOCATION_COOKIE,
    default_scopes: list[str] | None = None,
    grantable_scopes: list[str] | None = None,
    refresh_cookie_name: str = REFRESH_TOKEN_NAME,
    refresh_cookie_path: str | None = None,
    cookies: CookieConfig | None = None,
)

Bases: Transport

Stateless JWT auth.

Parameters:

Name Type Description Default
access_ttl int

Access token lifetime in seconds (default 15 min).

DEFAULT_ACCESS_TTL_SECONDS
refresh_ttl_days int

Refresh token lifetime in days (default 30).

DEFAULT_REFRESH_TTL_DAYS
refresh str

Where the refresh token lives - "cookie" (httpOnly, default) or "body" (returned in the JSON response).

REFRESH_LOCATION_COOKIE
default_scopes list[str] | None

Scopes granted to password-login tokens when the client doesn't request a narrower set.

None
grantable_scopes list[str] | None

The maximum set of scopes /token will ever issue. A client's requested scopes are intersected with this set, so a client cannot self-grant a scope it asks for. Defaults to default_scopes (clients may only narrow, never widen).

None
refresh_cookie_path str | None

Path for the refresh cookie. Defaults to the cookie policy's path (/); set it to the refresh endpoint's path (e.g. "/auth/refresh") to stop the cookie riding every request and narrow its exposure. Must match where /refresh is mounted.

None
Note

Issued scopes are clamped to grantable_scopes - the password grant silently drops anything outside the grantable set, so an endpoint gated by scopes=[...] can't be satisfied by a self-granted scope. The clamp is re-applied at /refresh, so tightening grantable_scopes also drops a removed scope from tokens minted off existing refresh tokens (rather than honoring it until the refresh token expires).

Note

Bearer tokens are stateless - there is no per-token revocation or rotation, so a stolen token is valid until it expires (refresh_ttl_days, default 30) unless the user's credential epoch is bumped. A password reset increments the user's token_version (embedded as the ver claim), which invalidates every outstanding access AND refresh token for that user at once. Per-token rotation is a planned future addition. (Epoch revocation requires a token_version column; AuthUserMixin supplies it - a model without it simply isn't epoch-revocable.)

Note

/token shares the escalating login lockout with the session /login route (same runtime.lockout, keyed by ip + username), so a client locked out on one endpoint can't brute-force via the other.

Example
BearerTransport(access_ttl=900, refresh="cookie")

authenticate async

authenticate(
    request: Request, ctx: AuthContext
) -> Principal | None

Authenticate an Authorization: Bearer access token.

Note

Absent or non-bearer header → None (this transport's credential isn't present; the facade tries the next transport). An expired access token also → None: expiry is the normal steady state of a short-lived token, not an attack signal, so it falls through to the next transport (e.g. a valid session cookie) and is treated as anonymous under optional=True. A tampered token (bad signature, wrong type, missing sub) → raises UnauthorizedException, mirroring the session transport's CSRF hard-fail. A valid token whose user no longer exists / is inactive returns None (account vanished, treat as anonymous). A token whose ver claim is below the user's current token_version (e.g. revoked by a password reset) also returns None - it's superseded, not tampered.

SessionManager

Server-side sessions, CSRF, device management, and login lockout. Reachable as auth.sessions when a SessionTransport is configured.

crudauth.transports.session.manager.SessionManager

SessionManager(
    session_storage: AbstractSessionStorage[SessionData],
    *,
    csrf_storage: AbstractSessionStorage[CSRFToken]
    | None = None,
    max_sessions_per_user: int = DEFAULT_MAX_SESSIONS_PER_USER,
    session_timeout_minutes: int = DEFAULT_SESSION_TIMEOUT_MINUTES,
    remember_me_days: int = DEFAULT_REMEMBER_ME_DAYS,
    cleanup_interval_minutes: int = DEFAULT_CLEANUP_INTERVAL_MINUTES,
    csrf_token_bytes: int = CSRF_TOKEN_BYTES,
    lockout: LockoutPolicy | None = None,
    cookie_secure: bool = True,
    cookie_samesite: SameSite = "lax",
    cookie_path: str = "/",
    session_cookie_name: str = SESSION_COOKIE_NAME,
    csrf_cookie_name: str = CSRF_COOKIE_NAME,
    trusted_proxy_hops: int = 0,
)

create_session async

create_session(
    request: Request,
    user_id: Any,
    metadata: dict[str, Any] | None = None,
    expiration_seconds: int | None = None,
) -> tuple[str, str]

Create a session + CSRF token. Returns (session_id, csrf_token).

Note

The CSRF token id is stored on the session so its TTL can slide forward alongside the session in validate_session.

validate_session async

validate_session(
    session_id: str, update_activity: bool = True
) -> SessionData | None

Return the live session for session_id, or None if invalid/idle-expired.

Note

On activity, the CSRF token's TTL is slid forward together with the session's - otherwise it would expire out from under a session kept alive by activity and 403 later mutations.

terminate_session async

terminate_session(
    session_id: str,
    reason: str = "manual_termination",
    user_id: Any = None,
) -> bool

Hard-revoke a session: remove it from storage (and its user index).

Passing user_id (known on the indexed terminate paths) lets the backend skip re-reading the record just to update its user index.

terminate_all_user_sessions async

terminate_all_user_sessions(
    user_id: Any,
    reason: str = "logout_all",
    exclude: str | None = None,
) -> int

Terminate every active session for user_id (optionally keeping exclude).

Returns:

Type Description
int

The number of sessions terminated. The public-facing wrapper is

int

list_for_user async

list_for_user(
    user_id: Any, current_session_id: str | None = None
) -> list[dict[str, Any]]

List a user's active sessions for a "manage devices" UI.

Parameters:

Name Type Description Default
user_id Any

Whose sessions to list.

required
current_session_id str | None

If given, the matching entry is flagged "current": True.

None

Returns:

Type Description
list[dict[str, Any]]

One dict per active session with session_id, device, ip,

list[dict[str, Any]]

created_at, last_activity, and current. Empty if the

list[dict[str, Any]]

storage backend can't index by user.

Example
@app.get("/account/sessions")
async def sessions(user: Principal = Depends(auth.current_user())):
    return await auth.sessions.list_for_user(user.user_id)

revoke async

revoke(
    session_id: str, owner_id: Any | None = None
) -> bool

Revoke one session.

Parameters:

Name Type Description Default
session_id str

Session to revoke.

required
owner_id Any | None

If given, the session is only revoked when it belongs to this user (so a user can't revoke someone else's session).

None

Returns:

Type Description
bool

True if a session was revoked, False if it didn't exist or

bool

failed the ownership check.

revoke_all async

revoke_all(user_id: Any, exclude: str | None = None) -> int

Revoke all of a user's sessions ("sign out everywhere").

Parameters:

Name Type Description Default
user_id Any

Whose sessions to revoke.

required
exclude str | None

An optional session id to keep (e.g. the current one, for "sign out my other devices").

None

Returns:

Type Description
int

The number of sessions revoked.

regenerate_csrf_token async

regenerate_csrf_token(
    user_id: Any,
    session_id: str,
    expiration_seconds: int | None = None,
) -> str

Rotate the session's CSRF token and return the new one.

Proper rotation, not just "issue another token": the new token is bound to the session (so validate_session slides its TTL, not the old one's), and the previous token is deleted so it can no longer pass validate_csrf_token.

Returns the new token, or "" when CSRF storage is disabled or the session no longer exists.

validate_csrf_token async

validate_csrf_token(
    session_id: str, csrf_token: str
) -> bool

True if csrf_token is a live token bound to session_id.

Note

Expiry is governed solely by the storage TTL (which slides forward with the session - see validate_session); a present record is by definition a live token.

set_session_cookies

set_session_cookies(
    response: Response,
    session_id: str,
    csrf_token: str,
    max_age: int | None = None,
) -> None

Write the session + CSRF cookies.

Note

max_age=None emits a session cookie (no Max-Age) - the authoritative expiry is the server-side sliding idle check in validate_session, so a fixed Max-Age would hard-expire a still-active session and defeat the slide. Remember-me passes an explicit long max_age for a persistent cookie whose lifetime matches its (equally long) server window.

Note

The session cookie is httponly but the CSRF cookie is NOT - it must be readable by JS so the SPA can echo it in the X-CSRF-Token header (the synchronizer-token check). Do not make it httponly.

clear_session_cookies

clear_session_cookies(response: Response) -> None

Delete the session + CSRF cookies.

Note

Deletes with the same secure/samesite as the set - some browsers only honor a deletion when those attributes match.

cleanup_expired_sessions async

cleanup_expired_sessions(force: bool = False) -> None

Proactively sweep idle-expired sessions (throttled by cleanup_interval).

Not called on the auth path: session TTL equals the idle window, so the storage backend evicts idle sessions on its own and validate_session catches idle-on-read. This sweep is therefore optional - call it explicitly (e.g. force=True from an ops job) if a no-TTL BYO backend needs proactive pruning. Needs the storage's optional scan_keys capability; a backend without it simply gets no sweep.

Note

Login-lockout keys (login:*) are deliberately NOT swept - they carry their own TTLs (attempt window, lockout duration, round retention), and bulk-deleting them would clear live lockouts and reset the exponential-backoff escalation. Never pattern-delete the lockout keys here.

track_login_attempt async

track_login_attempt(
    ip_address: str, username: str, success: bool = False
) -> tuple[bool, int | None, int]

Record a login attempt and report whether it's allowed.

Delegates to the injected LockoutPolicy. Fails open (allows) only when no policy is configured at all.

Returns:

Type Description
tuple[bool, int | None, int]

(allowed, attempts_remaining, retry_after_seconds).

initialize async

initialize() -> None

Open the session and CSRF storage connections.

shutdown async

shutdown() -> None

Close the session and CSRF storage connections.