Skip to content

Rate limiting & lockout

Per-endpoint throttling (auth.rate_limit(...)) and escalating login lockout, both over a pluggable backend. Use redis_rate_limiter(...) in production.

crudauth.ratelimit.RateLimit dataclass

RateLimit(times: int, seconds: int)

A fixed-window allowance: times events per seconds.

times=0 disables the limit (an explicit, documented off switch, never the low-friction default).

Example
CRUDAuth(..., rate_limits={"password_reset_request": RateLimit(3, 1800)})

disabled property

disabled: bool

True when times == 0 (the limit is turned off).

crudauth.ratelimit.KeyBy

Bases: str, Enum

Which dimension a CRUDAuth.rate_limit dependency keys on.

crudauth.ratelimit.LockoutPolicy

LockoutPolicy(
    backend: RateLimiterBackend,
    *,
    max_attempts: int = DEFAULT_LOGIN_MAX_ATTEMPTS,
    attempt_window_seconds: int = DEFAULT_LOGIN_ATTEMPT_WINDOW_SECONDS,
    lockout_base_seconds: int = DEFAULT_LOGIN_LOCKOUT_BASE_SECONDS,
    lockout_max_seconds: int = DEFAULT_LOGIN_LOCKOUT_MAX_SECONDS,
    round_retention_seconds: int = DEFAULT_LOGIN_ROUND_RETENTION_SECONDS,
    on_login_success: Literal[
        "clear_all", "clear_user_only"
    ] = "clear_all",
    fail_open: bool = False,
)

Tracks failed logins and escalates lockout duration across rounds.

Fails closed by default: if the backend errors, a login attempt is blocked rather than silently allowed. Rationale: under fail-open an attacker who can DoS the limiter backend then brute-forces with lockout disabled.

Note

Fail-closed means a network-backend outage blocks logins; pair a redis backend with HA redis (the in-memory backend has no outage mode).

Parameters:

Name Type Description Default
backend RateLimiterBackend

The shared rate-limiter backend (counters live here).

required
max_attempts int

Failures allowed within attempt_window_seconds before a lockout trips.

DEFAULT_LOGIN_MAX_ATTEMPTS
attempt_window_seconds int

Sliding window for counting failures.

DEFAULT_LOGIN_ATTEMPT_WINDOW_SECONDS
lockout_base_seconds int

First lockout duration; doubles each round.

DEFAULT_LOGIN_LOCKOUT_BASE_SECONDS
lockout_max_seconds int

Cap on the exponential lockout duration.

DEFAULT_LOGIN_LOCKOUT_MAX_SECONDS
round_retention_seconds int

How long a round counter persists, so repeat offenders resume escalating rather than resetting. The TTL is slid forward on every lockout (sliding window), so a slow, paced attack keeps climbing the escalation ladder instead of forgetting it.

DEFAULT_LOGIN_ROUND_RETENTION_SECONDS
on_login_success Literal['clear_all', 'clear_user_only']

What a successful login clears. "clear_all" (default) clears both the per-username and per-IP failure pressure - friendly to legitimate users behind a shared egress (corporate NAT / mobile CGNAT), at the cost of letting a co-located attacker's per-IP budget be refreshed by an unrelated user's success. "clear_user_only" clears just the username dimension, keeping per-IP pressure - tighter against a single-source brute force, but only safe when your per-IP key reliably identifies an individual client (you terminate behind a proxy with trusted_proxy_hops set AND your users aren't predominantly behind shared egress); otherwise it can lock out innocent co-located users. The lockout's primary key is ip + username together; this governs only the secondary per-IP pressure valve on success.

'clear_all'
fail_open bool

On a backend error, allow (True) or block (False).

False

check_and_record async

check_and_record(
    ip_address: str, username: str, success: bool = False
) -> tuple[bool, int | None, int]

Record an attempt and report whether it's allowed.

Parameters:

Name Type Description Default
ip_address str

Caller IP (one of the two keyed dimensions).

required
username str

Submitted username/email (the other dimension).

required
success bool

When True, clears all counters for this pair and allows.

False

Returns:

Type Description
tuple[bool, int | None, int]

(allowed, attempts_remaining, retry_after_seconds).

Note

The escalation branch issues several sequential backend ops (per-IP and per-username lock + round counters). It runs only on repeated failures (already past the attempt cap), so it's intentionally not pipelined - the hot success/under-cap paths stay at one or two ops.

crudauth.ratelimit.RateLimiterBackend

Bases: ABC

Async counter store backing rate limiting and login lockout.

A backend implements the dumb primitives; the package layers the fixed-window check and the escalating LockoutPolicy on top. The primitives raise on a backend error - the policy layer catches and decides fail-open vs fail-closed, so one backend can serve both fail-open window limits and fail-closed lockout.

Example
from crudauth.ratelimit import MemoryRateLimiterBackend
backend = MemoryRateLimiterBackend()
count, limited, retry_after = await backend.increment_and_check("ip:1.2.3.4", 5, 60)

increment abstractmethod async

increment(
    key: str, amount: int = 1, expiry: int | None = None
) -> int

Increment key by amount, return the new count.

Note

TTL is armed on first touch only. expiry must be applied when the key is created (the increment that brings it into existence) and NOT re-armed on subsequent increments - otherwise sustained load would push the TTL forward forever and a fixed window would never close. Both in-tree backends honor this (Redis: value == amount; memory: key absent from the deadline map); a new backend must too.

increment_and_refresh_ttl abstractmethod async

increment_and_refresh_ttl(
    key: str, amount: int = 1, expiry: int | None = None
) -> int

Increment key and (re-)arm its TTL on every call, atomically.

The counterpart to increment's first-touch-only TTL: this slides the expiry forward on each call, for a counter that must live as long as activity continues - the lockout escalation "rounds" counter, so a slow, paced attack can't let it expire and reset the backoff. The increment and the TTL re-arm MUST be atomic (one round trip on a networked backend) so a concurrent attempt can't interleave between them.

get_count abstractmethod async

get_count(key: str) -> int | None

Current counter value, or None if the key is absent.

reset abstractmethod async

reset(key: str) -> None

Reset key to absent (alias of delete that ignores the result).

delete abstractmethod async

delete(key: str) -> bool

Delete key; return whether it existed.

ping abstractmethod async

ping() -> bool

Liveness check for the backing store.

get_ttl async

get_ttl(key: str) -> int

Remaining TTL in seconds for key (0 if unknown/absent).

Overridable; the escalating lockout policy uses it. The default returns 0 for backends that can't report TTL.

Note

LockoutPolicy reads this to detect an active lockout. A backend that leaves the default 0 still counts attempts but can never hold a lockout open - so any backend used with lockout MUST implement get_ttl (both in-tree backends do).

increment_and_check async

increment_and_check(
    key: str,
    limit: int,
    period: int,
    *,
    fail_open: bool = True,
) -> tuple[int, bool, int]

Fixed-window limit check over the dumb counter.

Override for a single-call atomic version (Redis does). The window-stamped key is what makes re-expiring on every increment correct: the key rolls each window, so old windows expire on their own rather than the TTL being pushed forever under sustained load.

Parameters:

Name Type Description Default
key str

Logical counter key (an action+identity namespace).

required
limit int

Max events allowed within period.

required
period int

Window length in seconds.

required
fail_open bool

On a backend error, allow (True) or block (False).

True

Returns:

Type Description
tuple[int, bool, int]

(count, limited, retry_after_seconds).

initialize async

initialize() -> None

Open connections / warm up. Default no-op.

close async

close() -> None

Release resources. Default no-op.

crudauth.ratelimit.MemoryRateLimiterBackend

MemoryRateLimiterBackend()

Bases: RateLimiterBackend

Dict-backed counters with monotonic TTLs. Not shared across processes.

Note

Window-stamped keys ({key}:{window_start}) roll every window, so an abandoned past window's key is never accessed again. Eviction is therefore lazy-on-access plus an occasional full sweep on increment, so a high-cardinality keyspace (per-IP / per-email) can't grow unbounded. Still single-process - use the redis backend in production.

increment async

increment(
    key: str, amount: int = 1, expiry: int | None = None
) -> int

Increment; arm the TTL only on first touch (key absent from the deadline map) - the contract from RateLimiterBackend.increment.

increment_and_refresh_ttl async

increment_and_refresh_ttl(
    key: str, amount: int = 1, expiry: int | None = None
) -> int

Increment and slide the TTL forward on every call (see the base method).

crudauth.ratelimit.redis_rate_limiter

redis_rate_limiter(
    redis_url: str | None = None, client: Any = None
) -> RateLimiterBackend

Construct a Redis rate-limiter backend, guarding the optional dependency.

Parameters:

Name Type Description Default
redis_url str | None

Connection URL (defaults to localhost when omitted).

None
client Any

A pre-built redis.asyncio client to reuse instead of a URL.

None

Returns:

Type Description
RateLimiterBackend

A [RedisBackend][crudauth.ratelimit.backends.redis.RedisBackend].

Raises:

Type Description
ImportError

If the redis extra isn't installed.

Example
CRUDAuth(..., rate_limiter=redis_rate_limiter(redis_url=settings.REDIS_URL))