Skip to content

Session Management API Reference

The CRUDAdmin session management system provides secure, scalable session handling with multiple backend options and comprehensive security features including CSRF protection, session expiration, and device tracking.

Core Components

Session Manager

The main session management class that handles all session operations.

Session manager for handling secure authentication sessions in crudadmin.

This class implements a comprehensive session-based authentication system with the following features:

  • Secure session creation and validation
  • CSRF protection with token generation and validation
  • Session expiration and automatic cleanup
  • Device fingerprinting and user agent tracking
  • Multi-device support with configurable session limits per user
  • IP address tracking for security monitoring
  • Session metadata for storing additional authentication context
  • Rate limiting for login attempts with IP and username tracking

Authentication Flow: 1. When a user logs in successfully, create_session() generates a new session and CSRF token 2. Session cookies are set via set_session_cookies() - a httpOnly session_id and a non-httpOnly csrf_token 3. On subsequent requests, validate_session() confirms the session is valid and not expired 4. For state-changing operations, validate_csrf_token() provides protection against CSRF attacks 5. Sessions automatically expire after inactivity, or can be manually terminated 6. Periodic cleanup_expired_sessions() removes stale sessions

Security Features: - Sessions are stored server-side with only the ID transmitted to clients - CSRF protection through synchronized tokens - Session hijacking protection via IP and user agent tracking - Automatic session expiration after configurable timeout - Forced logout of oldest sessions when session limit is reached - Different SameSite cookie settings for development and production - Rate limiting for login attempts to prevent brute force attacks

Usage: Sessions should be validated on each authenticated request, with CSRF tokens validated for any state-changing operations. The cleanup method should be called periodically to remove expired sessions.

Source code in crudadmin/session/manager.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
class SessionManager:
    """Session manager for handling secure authentication sessions in crudadmin.

    This class implements a comprehensive session-based authentication system with the following features:

    - Secure session creation and validation
    - CSRF protection with token generation and validation
    - Session expiration and automatic cleanup
    - Device fingerprinting and user agent tracking
    - Multi-device support with configurable session limits per user
    - IP address tracking for security monitoring
    - Session metadata for storing additional authentication context
    - Rate limiting for login attempts with IP and username tracking

    Authentication Flow:
    1. When a user logs in successfully, create_session() generates a new session and CSRF token
    2. Session cookies are set via set_session_cookies() - a httpOnly session_id and a non-httpOnly csrf_token
    3. On subsequent requests, validate_session() confirms the session is valid and not expired
    4. For state-changing operations, validate_csrf_token() provides protection against CSRF attacks
    5. Sessions automatically expire after inactivity, or can be manually terminated
    6. Periodic cleanup_expired_sessions() removes stale sessions

    Security Features:
    - Sessions are stored server-side with only the ID transmitted to clients
    - CSRF protection through synchronized tokens
    - Session hijacking protection via IP and user agent tracking
    - Automatic session expiration after configurable timeout
    - Forced logout of oldest sessions when session limit is reached
    - Different SameSite cookie settings for development and production
    - Rate limiting for login attempts to prevent brute force attacks

    Usage:
    Sessions should be validated on each authenticated request, with CSRF tokens validated
    for any state-changing operations. The cleanup method should be called periodically
    to remove expired sessions.
    """

    def __init__(
        self,
        session_storage: Optional[AbstractSessionStorage[SessionData]] = None,
        max_sessions_per_user: int = 5,
        session_timeout_minutes: int = 30,
        cleanup_interval_minutes: int = 15,
        csrf_token_bytes: int = 32,
        rate_limiter: Optional[SimpleRateLimiter] = None,
        login_max_attempts: int = 5,
        login_window_minutes: int = 15,
        session_backend: str = "memory",
        **backend_kwargs: Any,
    ):
        """Initialize the session manager.

        Args:
            session_storage: Storage backend for sessions (if None, will be created)
            max_sessions_per_user: Maximum number of active sessions per user
            session_timeout_minutes: Session timeout in minutes
            cleanup_interval_minutes: Interval for cleaning up expired sessions
            csrf_token_bytes: Number of bytes to use for CSRF tokens
            rate_limiter: Optional rate limiter implementation for login attempts
            login_max_attempts: Maximum failed login attempts before rate limiting
            login_window_minutes: Time window for tracking failed login attempts
            session_backend: Backend type if creating storage automatically
            **backend_kwargs: Additional arguments for backend creation
        """
        self.max_sessions = max_sessions_per_user
        self.session_timeout = timedelta(minutes=session_timeout_minutes)
        self.cleanup_interval = timedelta(minutes=cleanup_interval_minutes)
        self.last_cleanup = datetime.now(UTC)
        self.csrf_token_bytes = csrf_token_bytes
        self.rate_limiter = rate_limiter
        self.login_max_attempts = login_max_attempts
        self.login_window = timedelta(minutes=login_window_minutes)

        if session_storage is None:
            storage_settings = {
                "prefix": "session:",
                "expiration": session_timeout_minutes * 60,
                **backend_kwargs,
            }
            self.storage: AbstractSessionStorage[SessionData] = get_session_storage(
                backend=session_backend, model_type=SessionData, **storage_settings
            )
        else:
            self.storage = session_storage

        csrf_storage_settings = {
            "prefix": "csrf:",
            "expiration": session_timeout_minutes * 60,
        }
        csrf_storage_settings.update(backend_kwargs)
        self.csrf_storage: AbstractSessionStorage[CSRFToken] = get_session_storage(
            backend=session_backend, model_type=CSRFToken, **csrf_storage_settings
        )

    def parse_user_agent(self, user_agent_string: str) -> UserAgentInfo:
        """Parse User-Agent string into structured information.

        Args:
            user_agent_string: Raw User-Agent header

        Returns:
            Structured UserAgentInfo
        """
        ua_parser = parse(user_agent_string)
        return UserAgentInfo(
            browser=ua_parser.browser.family,
            browser_version=ua_parser.browser.version_string,
            os=ua_parser.os.family,
            device=ua_parser.device.family,
            is_mobile=ua_parser.is_mobile,
            is_tablet=ua_parser.is_tablet,
            is_pc=ua_parser.is_pc,
        )

    async def create_session(
        self, request: Request, user_id: int, metadata: Optional[dict[str, Any]] = None
    ) -> tuple[str, str]:
        """Create a new session for a user and generate a CSRF token.

        Args:
            request: The request object
            user_id: The user ID
            metadata: Optional session metadata

        Returns:
            Tuple of (session_id, csrf_token)

        Raises:
            ValueError: If the request client is invalid
        """
        logger.info(f"Creating new session for user_id: {user_id}")

        try:
            user_agent = request.headers.get("user-agent", "")
            current_time = datetime.now(UTC)

            client = request.client
            if client is None:
                logger.error("Request client is None. Cannot retrieve IP address.")
                raise ValueError("Invalid request client.")

            device_info = self.parse_user_agent(user_agent).model_dump()

            ip_address = (
                request.headers.get("x-forwarded-for", client.host)
                .split(",")[0]
                .strip()
            )

            await self._enforce_session_limit(user_id)

            session_data = SessionCreate(
                user_id=user_id,
                ip_address=ip_address,
                user_agent=user_agent,
                device_info=device_info,
                last_activity=current_time,
                is_active=True,
                metadata=metadata or {},
            )

            session_id = await self.storage.create(session_data)
            csrf_token = await self._generate_csrf_token(user_id, session_id)

            logger.info(f"Session {session_id} created successfully")
            return session_id, csrf_token

        except Exception as e:
            logger.error(f"Error creating session: {str(e)}", exc_info=True)
            raise

    async def validate_session(
        self, session_id: str, update_activity: bool = True
    ) -> Optional[SessionData]:
        """Validate if a session is active and not timed out.

        Args:
            session_id: The session ID
            update_activity: Whether to update the last activity timestamp

        Returns:
            The session data if valid, None otherwise
        """
        if not session_id:
            return None

        try:
            session_data = await self.storage.get(session_id, SessionData)
            if session_data is None:
                logger.warning(f"Session not found: {session_id}")
                return None

            if not session_data.is_active:
                logger.warning(f"Session is not active: {session_id}")
                return None

            current_time = datetime.now(UTC)
            session_age = current_time - session_data.last_activity

            if session_age > self.session_timeout:
                logger.warning(f"Session timed out: {session_id}")
                await self.terminate_session(session_id)
                return None

            if update_activity:
                session_data.last_activity = current_time
                await self.storage.update(session_id, session_data)

            return session_data

        except Exception as e:
            logger.error(f"Error validating session: {str(e)}", exc_info=True)
            return None

    async def validate_csrf_token(
        self,
        session_id: str,
        csrf_token: str,
    ) -> bool:
        """Validate a CSRF token for a session.

        Args:
            session_id: The session ID
            csrf_token: The CSRF token to validate

        Returns:
            True if valid, False otherwise
        """
        if not session_id or not csrf_token:
            logger.warning(
                f"Missing session_id or csrf_token: session_id={session_id}, csrf_token={csrf_token}"
            )
            return False

        try:
            token_data = await self.csrf_storage.get(csrf_token, CSRFToken)
            if token_data is None:
                logger.warning(f"CSRF token not found in storage: {csrf_token}")
                return False

            if token_data.session_id != session_id:
                logger.warning(
                    f"CSRF token session mismatch: {csrf_token} should be for session {session_id}, "
                    f"but is for session {token_data.session_id}"
                )
                return False

            current_time = datetime.now(UTC)
            if token_data.expires_at < current_time:
                logger.warning(
                    f"CSRF token expired: {csrf_token}, expired at {token_data.expires_at}, current time is {current_time}"
                )
                await self.csrf_storage.delete(csrf_token)
                return False

            return True

        except Exception as e:
            logger.error(f"Error validating CSRF token: {str(e)}", exc_info=True)
            return False

    async def regenerate_csrf_token(
        self,
        user_id: int,
        session_id: str,
    ) -> str:
        """Regenerate a CSRF token for an existing session.

        Args:
            user_id: The user ID
            session_id: The session ID

        Returns:
            The new CSRF token
        """
        try:
            if hasattr(self.csrf_storage, "_scan_iter"):
                keys = await self.csrf_storage._scan_iter(
                    match=f"{self.csrf_storage.prefix}*"
                )
                for key in keys:
                    try:
                        token_id = key[len(self.csrf_storage.prefix) :]
                        csrf_data = await self.csrf_storage.get(token_id, CSRFToken)
                        if csrf_data and csrf_data.session_id == session_id:
                            await self.csrf_storage.delete(token_id)
                    except Exception as e:
                        logger.warning(f"Error cleaning up old CSRF token: {e}")
        except Exception as e:
            logger.warning(f"Error scanning for old CSRF tokens: {e}")

        return await self._generate_csrf_token(user_id, session_id)

    async def _generate_csrf_token(
        self,
        user_id: int,
        session_id: str,
    ) -> str:
        """Generate a new CSRF token for a session.

        Args:
            user_id: The user ID
            session_id: The session ID

        Returns:
            The CSRF token
        """
        token = secrets.token_hex(self.csrf_token_bytes)
        expires_at = datetime.now(UTC) + self.session_timeout

        csrf_data = CSRFToken(
            token=token,
            user_id=user_id,
            session_id=session_id,
            expires_at=expires_at,
        )

        await self.csrf_storage.create(csrf_data, session_id=token)
        return token

    async def terminate_session(self, session_id: str) -> bool:
        """Terminate a specific session.

        Args:
            session_id: The session ID

        Returns:
            True if the session was terminated, False otherwise
        """
        try:
            session_data = await self.storage.get(session_id, SessionData)
            if session_data is None:
                return False

            session_data.is_active = False
            session_data.metadata = {
                **session_data.metadata,
                "terminated_at": datetime.now(UTC).isoformat(),
                "termination_reason": "manual_termination",
            }

            return await self.storage.update(session_id, session_data)

        except Exception as e:
            logger.error(f"Error terminating session: {str(e)}", exc_info=True)
            return False

    async def _enforce_session_limit(self, user_id: int) -> None:
        """Enforce the maximum number of sessions per user.

        Terminates the oldest sessions if the limit is exceeded.

        Args:
            user_id: The user ID
        """
        try:
            active_sessions = []

            if hasattr(self.storage, "get_user_sessions"):
                try:
                    session_ids = await self.storage.get_user_sessions(user_id)
                    for session_id in session_ids:
                        try:
                            session_data = await self.storage.get(
                                session_id, SessionData
                            )
                            if session_data and session_data.is_active:
                                active_sessions.append(session_data)
                        except Exception as e:
                            logger.warning(
                                f"Error processing session {session_id}: {e}"
                            )
                            continue
                except Exception as e:
                    logger.warning(f"Error getting user sessions: {e}")
                    active_sessions = await self._get_active_sessions_by_scan(user_id)
            else:
                active_sessions = await self._get_active_sessions_by_scan(user_id)

            if len(active_sessions) >= self.max_sessions:
                active_sessions.sort(key=lambda s: s.last_activity)

                excess_count = len(active_sessions) - self.max_sessions + 1
                for i in range(excess_count):
                    if i < len(active_sessions):
                        await self.terminate_session(active_sessions[i].session_id)

        except Exception as e:
            logger.error(f"Error enforcing session limit: {e}", exc_info=True)

    async def _get_active_sessions_by_scan(self, user_id: int) -> list[SessionData]:
        """Get active sessions for a user by scanning all keys.

        This is a fallback method when indexed groups are not available.

        Args:
            user_id: The user ID

        Returns:
            List of active sessions for the user
        """
        active_sessions = []

        if hasattr(self.storage, "_scan_iter"):
            keys = await self.storage._scan_iter(match=f"{self.storage.prefix}*")
            for key in keys:
                try:
                    session_data_bytes = await self.storage.get(
                        session_id=key[len(self.storage.prefix) :],
                        model_class=SessionData,
                    )
                    if (
                        session_data_bytes
                        and session_data_bytes.user_id == user_id
                        and session_data_bytes.is_active
                    ):
                        active_sessions.append(session_data_bytes)
                except Exception as e:
                    logger.warning(f"Error processing session during cleanup: {e}")
                    continue
        elif hasattr(self.storage, "client") and hasattr(
            self.storage.client, "scan_iter"
        ):
            async for key in self.storage.client.scan_iter(
                match=f"{self.storage.prefix}*"
            ):
                try:
                    if isinstance(key, bytes):
                        key = key.decode("utf-8")
                    session_id = key[len(self.storage.prefix) :]

                    session_data = await self.storage.get(session_id, SessionData)
                    if (
                        session_data
                        and session_data.user_id == user_id
                        and session_data.is_active
                    ):
                        active_sessions.append(session_data)
                except Exception as e:
                    logger.warning(f"Error processing session during cleanup: {e}")
                    continue

        return active_sessions

    async def cleanup_expired_sessions(self) -> None:
        """Cleanup expired and inactive sessions.

        This should be called periodically.
        """
        now = datetime.now(UTC)

        if now - self.last_cleanup < self.cleanup_interval:
            return

        timeout_threshold = now - self.session_timeout

        try:
            if hasattr(self.storage, "_scan_iter"):
                keys = await self.storage._scan_iter(match=f"{self.storage.prefix}*")
                for key in keys:
                    try:
                        session_id = key[len(self.storage.prefix) :]
                        session_data = await self.storage.get(session_id, SessionData)
                        if (
                            session_data
                            and session_data.is_active
                            and session_data.last_activity < timeout_threshold
                        ):
                            session_data.is_active = False
                            session_data.metadata = {
                                **session_data.metadata,
                                "terminated_at": now.isoformat(),
                                "termination_reason": "session_timeout",
                            }
                            await self.storage.update(session_id, session_data)
                    except Exception as e:
                        logger.warning(f"Error processing session during cleanup: {e}")
                        continue
            elif hasattr(self.storage, "client") and hasattr(
                self.storage.client, "scan_iter"
            ):
                async for key in self.storage.client.scan_iter(
                    match=f"{self.storage.prefix}*"
                ):
                    try:
                        if isinstance(key, bytes):
                            key = key.decode("utf-8")
                        session_id = key[len(self.storage.prefix) :]

                        session_data = await self.storage.get(session_id, SessionData)
                        if (
                            session_data
                            and session_data.is_active
                            and session_data.last_activity < timeout_threshold
                        ):
                            session_data.is_active = False
                            session_data.metadata = {
                                **session_data.metadata,
                                "terminated_at": now.isoformat(),
                                "termination_reason": "session_timeout",
                            }
                            await self.storage.update(
                                session_data.session_id, session_data
                            )
                    except Exception as e:
                        logger.warning(f"Error processing session during cleanup: {e}")
                        continue

            if self.rate_limiter:
                try:
                    await self.cleanup_rate_limits()
                except Exception as e:
                    logger.error(f"Error cleaning up rate limits: {e}")

            self.last_cleanup = now

        except Exception as e:
            logger.error(f"Error during session cleanup: {e}", exc_info=True)

    def set_session_cookies(
        self,
        response: Response,
        session_id: str,
        csrf_token: str,
        max_age: Optional[int] = None,
        path: str = "/",
        secure: bool = True,
    ) -> None:
        """Set session cookies in the response.

        Args:
            response: The response object
            session_id: The session ID
            csrf_token: The CSRF token
            max_age: Cookie max age in seconds
            path: Cookie path
            secure: Whether to set the Secure flag
        """
        settings = get_settings()
        samesite: SamesiteType = DEV_SAMESITE if settings.DEBUG else PROD_SAMESITE
        cookie_max_age = (
            max_age if max_age is not None else settings.SESSION_COOKIE_MAX_AGE
        )

        response.set_cookie(
            key="session_id",
            value=session_id,
            httponly=True,
            secure=secure,
            samesite=samesite,
            path=path,
            max_age=cookie_max_age,
        )

        response.set_cookie(
            key="csrf_token",
            value=csrf_token,
            httponly=False,
            secure=secure,
            samesite=samesite,
            path=path,
            max_age=cookie_max_age,
        )

    def clear_session_cookies(
        self,
        response: Response,
        path: str = "/",
    ) -> None:
        """Clear session cookies from the response.

        Args:
            response: The response object
            path: Cookie path
        """
        response.delete_cookie(key="session_id", path=path)
        response.delete_cookie(key="csrf_token", path=path)

    async def track_login_attempt(
        self, ip_address: str, username: str, success: bool = False
    ) -> tuple[bool, Optional[int]]:
        """Track login attempts and apply rate limiting.

        Args:
            ip_address: Client IP address
            username: Username being used for login
            success: Whether the login attempt was successful

        Returns:
            Tuple of (is_allowed, attempts_remaining)

        If rate limiting is not configured, this will always return (True, None)
        but log a warning about missing rate limiting.
        """
        if not self.rate_limiter:
            logger.warning(
                "No rate limiter configured for login attempts. "
                "It is strongly recommended to configure rate limiting for security."
            )
            return True, None

        try:
            ip_key = f"login:ip:{ip_address}"
            username_key = f"login:user:{username}"

            if success:
                try:
                    await self.rate_limiter.delete(ip_key)
                    await self.rate_limiter.delete(username_key)
                    return True, None
                except Exception as e:
                    logger.warning(
                        f"Error clearing rate limit after successful login: {e}"
                    )
                    return True, None

            try:
                expiry_seconds = int(self.login_window.total_seconds())
                ip_count = await self.rate_limiter.increment(ip_key, 1, expiry_seconds)
                username_count = await self.rate_limiter.increment(
                    username_key, 1, expiry_seconds
                )
            except Exception as e:
                logger.warning(f"Error tracking login attempt rate limits: {e}")
                return True, None

            attempt_count = max(ip_count, username_count)
            remaining = max(0, self.login_max_attempts - attempt_count)

            is_allowed = attempt_count <= self.login_max_attempts

            if not is_allowed:
                logger.warning(
                    f"Rate limit exceeded for login: {ip_address}, username: {username}, attempts: {attempt_count}"
                )

            return is_allowed, remaining

        except Exception as e:
            logger.error(f"Unexpected error in login rate limiting: {e}", exc_info=True)
            return True, None

    async def cleanup_rate_limits(self) -> None:
        """Clean up expired rate limit records.

        This should be called periodically along with session cleanup.
        """
        if not self.rate_limiter:
            return

        try:
            if hasattr(self.rate_limiter, "delete_pattern"):
                await self.rate_limiter.delete_pattern("login:*")
            else:
                logger.debug("Rate limiter does not support pattern-based cleanup")
        except Exception as e:
            logger.error(f"Error cleaning up rate limit records: {e}", exc_info=True)

__init__(session_storage=None, max_sessions_per_user=5, session_timeout_minutes=30, cleanup_interval_minutes=15, csrf_token_bytes=32, rate_limiter=None, login_max_attempts=5, login_window_minutes=15, session_backend='memory', **backend_kwargs)

Initialize the session manager.

Parameters:

Name Type Description Default
session_storage Optional[AbstractSessionStorage[SessionData]]

Storage backend for sessions (if None, will be created)

None
max_sessions_per_user int

Maximum number of active sessions per user

5
session_timeout_minutes int

Session timeout in minutes

30
cleanup_interval_minutes int

Interval for cleaning up expired sessions

15
csrf_token_bytes int

Number of bytes to use for CSRF tokens

32
rate_limiter Optional[SimpleRateLimiter]

Optional rate limiter implementation for login attempts

None
login_max_attempts int

Maximum failed login attempts before rate limiting

5
login_window_minutes int

Time window for tracking failed login attempts

15
session_backend str

Backend type if creating storage automatically

'memory'
**backend_kwargs Any

Additional arguments for backend creation

{}
Source code in crudadmin/session/manager.py
def __init__(
    self,
    session_storage: Optional[AbstractSessionStorage[SessionData]] = None,
    max_sessions_per_user: int = 5,
    session_timeout_minutes: int = 30,
    cleanup_interval_minutes: int = 15,
    csrf_token_bytes: int = 32,
    rate_limiter: Optional[SimpleRateLimiter] = None,
    login_max_attempts: int = 5,
    login_window_minutes: int = 15,
    session_backend: str = "memory",
    **backend_kwargs: Any,
):
    """Initialize the session manager.

    Args:
        session_storage: Storage backend for sessions (if None, will be created)
        max_sessions_per_user: Maximum number of active sessions per user
        session_timeout_minutes: Session timeout in minutes
        cleanup_interval_minutes: Interval for cleaning up expired sessions
        csrf_token_bytes: Number of bytes to use for CSRF tokens
        rate_limiter: Optional rate limiter implementation for login attempts
        login_max_attempts: Maximum failed login attempts before rate limiting
        login_window_minutes: Time window for tracking failed login attempts
        session_backend: Backend type if creating storage automatically
        **backend_kwargs: Additional arguments for backend creation
    """
    self.max_sessions = max_sessions_per_user
    self.session_timeout = timedelta(minutes=session_timeout_minutes)
    self.cleanup_interval = timedelta(minutes=cleanup_interval_minutes)
    self.last_cleanup = datetime.now(UTC)
    self.csrf_token_bytes = csrf_token_bytes
    self.rate_limiter = rate_limiter
    self.login_max_attempts = login_max_attempts
    self.login_window = timedelta(minutes=login_window_minutes)

    if session_storage is None:
        storage_settings = {
            "prefix": "session:",
            "expiration": session_timeout_minutes * 60,
            **backend_kwargs,
        }
        self.storage: AbstractSessionStorage[SessionData] = get_session_storage(
            backend=session_backend, model_type=SessionData, **storage_settings
        )
    else:
        self.storage = session_storage

    csrf_storage_settings = {
        "prefix": "csrf:",
        "expiration": session_timeout_minutes * 60,
    }
    csrf_storage_settings.update(backend_kwargs)
    self.csrf_storage: AbstractSessionStorage[CSRFToken] = get_session_storage(
        backend=session_backend, model_type=CSRFToken, **csrf_storage_settings
    )

cleanup_expired_sessions() async

Cleanup expired and inactive sessions.

This should be called periodically.

Source code in crudadmin/session/manager.py
async def cleanup_expired_sessions(self) -> None:
    """Cleanup expired and inactive sessions.

    This should be called periodically.
    """
    now = datetime.now(UTC)

    if now - self.last_cleanup < self.cleanup_interval:
        return

    timeout_threshold = now - self.session_timeout

    try:
        if hasattr(self.storage, "_scan_iter"):
            keys = await self.storage._scan_iter(match=f"{self.storage.prefix}*")
            for key in keys:
                try:
                    session_id = key[len(self.storage.prefix) :]
                    session_data = await self.storage.get(session_id, SessionData)
                    if (
                        session_data
                        and session_data.is_active
                        and session_data.last_activity < timeout_threshold
                    ):
                        session_data.is_active = False
                        session_data.metadata = {
                            **session_data.metadata,
                            "terminated_at": now.isoformat(),
                            "termination_reason": "session_timeout",
                        }
                        await self.storage.update(session_id, session_data)
                except Exception as e:
                    logger.warning(f"Error processing session during cleanup: {e}")
                    continue
        elif hasattr(self.storage, "client") and hasattr(
            self.storage.client, "scan_iter"
        ):
            async for key in self.storage.client.scan_iter(
                match=f"{self.storage.prefix}*"
            ):
                try:
                    if isinstance(key, bytes):
                        key = key.decode("utf-8")
                    session_id = key[len(self.storage.prefix) :]

                    session_data = await self.storage.get(session_id, SessionData)
                    if (
                        session_data
                        and session_data.is_active
                        and session_data.last_activity < timeout_threshold
                    ):
                        session_data.is_active = False
                        session_data.metadata = {
                            **session_data.metadata,
                            "terminated_at": now.isoformat(),
                            "termination_reason": "session_timeout",
                        }
                        await self.storage.update(
                            session_data.session_id, session_data
                        )
                except Exception as e:
                    logger.warning(f"Error processing session during cleanup: {e}")
                    continue

        if self.rate_limiter:
            try:
                await self.cleanup_rate_limits()
            except Exception as e:
                logger.error(f"Error cleaning up rate limits: {e}")

        self.last_cleanup = now

    except Exception as e:
        logger.error(f"Error during session cleanup: {e}", exc_info=True)

cleanup_rate_limits() async

Clean up expired rate limit records.

This should be called periodically along with session cleanup.

Source code in crudadmin/session/manager.py
async def cleanup_rate_limits(self) -> None:
    """Clean up expired rate limit records.

    This should be called periodically along with session cleanup.
    """
    if not self.rate_limiter:
        return

    try:
        if hasattr(self.rate_limiter, "delete_pattern"):
            await self.rate_limiter.delete_pattern("login:*")
        else:
            logger.debug("Rate limiter does not support pattern-based cleanup")
    except Exception as e:
        logger.error(f"Error cleaning up rate limit records: {e}", exc_info=True)

clear_session_cookies(response, path='/')

Clear session cookies from the response.

Parameters:

Name Type Description Default
response Response

The response object

required
path str

Cookie path

'/'
Source code in crudadmin/session/manager.py
def clear_session_cookies(
    self,
    response: Response,
    path: str = "/",
) -> None:
    """Clear session cookies from the response.

    Args:
        response: The response object
        path: Cookie path
    """
    response.delete_cookie(key="session_id", path=path)
    response.delete_cookie(key="csrf_token", path=path)

create_session(request, user_id, metadata=None) async

Create a new session for a user and generate a CSRF token.

Parameters:

Name Type Description Default
request Request

The request object

required
user_id int

The user ID

required
metadata Optional[dict[str, Any]]

Optional session metadata

None

Returns:

Type Description
tuple[str, str]

Tuple of (session_id, csrf_token)

Raises:

Type Description
ValueError

If the request client is invalid

Source code in crudadmin/session/manager.py
async def create_session(
    self, request: Request, user_id: int, metadata: Optional[dict[str, Any]] = None
) -> tuple[str, str]:
    """Create a new session for a user and generate a CSRF token.

    Args:
        request: The request object
        user_id: The user ID
        metadata: Optional session metadata

    Returns:
        Tuple of (session_id, csrf_token)

    Raises:
        ValueError: If the request client is invalid
    """
    logger.info(f"Creating new session for user_id: {user_id}")

    try:
        user_agent = request.headers.get("user-agent", "")
        current_time = datetime.now(UTC)

        client = request.client
        if client is None:
            logger.error("Request client is None. Cannot retrieve IP address.")
            raise ValueError("Invalid request client.")

        device_info = self.parse_user_agent(user_agent).model_dump()

        ip_address = (
            request.headers.get("x-forwarded-for", client.host)
            .split(",")[0]
            .strip()
        )

        await self._enforce_session_limit(user_id)

        session_data = SessionCreate(
            user_id=user_id,
            ip_address=ip_address,
            user_agent=user_agent,
            device_info=device_info,
            last_activity=current_time,
            is_active=True,
            metadata=metadata or {},
        )

        session_id = await self.storage.create(session_data)
        csrf_token = await self._generate_csrf_token(user_id, session_id)

        logger.info(f"Session {session_id} created successfully")
        return session_id, csrf_token

    except Exception as e:
        logger.error(f"Error creating session: {str(e)}", exc_info=True)
        raise

parse_user_agent(user_agent_string)

Parse User-Agent string into structured information.

Parameters:

Name Type Description Default
user_agent_string str

Raw User-Agent header

required

Returns:

Type Description
UserAgentInfo

Structured UserAgentInfo

Source code in crudadmin/session/manager.py
def parse_user_agent(self, user_agent_string: str) -> UserAgentInfo:
    """Parse User-Agent string into structured information.

    Args:
        user_agent_string: Raw User-Agent header

    Returns:
        Structured UserAgentInfo
    """
    ua_parser = parse(user_agent_string)
    return UserAgentInfo(
        browser=ua_parser.browser.family,
        browser_version=ua_parser.browser.version_string,
        os=ua_parser.os.family,
        device=ua_parser.device.family,
        is_mobile=ua_parser.is_mobile,
        is_tablet=ua_parser.is_tablet,
        is_pc=ua_parser.is_pc,
    )

regenerate_csrf_token(user_id, session_id) async

Regenerate a CSRF token for an existing session.

Parameters:

Name Type Description Default
user_id int

The user ID

required
session_id str

The session ID

required

Returns:

Type Description
str

The new CSRF token

Source code in crudadmin/session/manager.py
async def regenerate_csrf_token(
    self,
    user_id: int,
    session_id: str,
) -> str:
    """Regenerate a CSRF token for an existing session.

    Args:
        user_id: The user ID
        session_id: The session ID

    Returns:
        The new CSRF token
    """
    try:
        if hasattr(self.csrf_storage, "_scan_iter"):
            keys = await self.csrf_storage._scan_iter(
                match=f"{self.csrf_storage.prefix}*"
            )
            for key in keys:
                try:
                    token_id = key[len(self.csrf_storage.prefix) :]
                    csrf_data = await self.csrf_storage.get(token_id, CSRFToken)
                    if csrf_data and csrf_data.session_id == session_id:
                        await self.csrf_storage.delete(token_id)
                except Exception as e:
                    logger.warning(f"Error cleaning up old CSRF token: {e}")
    except Exception as e:
        logger.warning(f"Error scanning for old CSRF tokens: {e}")

    return await self._generate_csrf_token(user_id, session_id)

set_session_cookies(response, session_id, csrf_token, max_age=None, path='/', secure=True)

Set session cookies in the response.

Parameters:

Name Type Description Default
response Response

The response object

required
session_id str

The session ID

required
csrf_token str

The CSRF token

required
max_age Optional[int]

Cookie max age in seconds

None
path str

Cookie path

'/'
secure bool

Whether to set the Secure flag

True
Source code in crudadmin/session/manager.py
def set_session_cookies(
    self,
    response: Response,
    session_id: str,
    csrf_token: str,
    max_age: Optional[int] = None,
    path: str = "/",
    secure: bool = True,
) -> None:
    """Set session cookies in the response.

    Args:
        response: The response object
        session_id: The session ID
        csrf_token: The CSRF token
        max_age: Cookie max age in seconds
        path: Cookie path
        secure: Whether to set the Secure flag
    """
    settings = get_settings()
    samesite: SamesiteType = DEV_SAMESITE if settings.DEBUG else PROD_SAMESITE
    cookie_max_age = (
        max_age if max_age is not None else settings.SESSION_COOKIE_MAX_AGE
    )

    response.set_cookie(
        key="session_id",
        value=session_id,
        httponly=True,
        secure=secure,
        samesite=samesite,
        path=path,
        max_age=cookie_max_age,
    )

    response.set_cookie(
        key="csrf_token",
        value=csrf_token,
        httponly=False,
        secure=secure,
        samesite=samesite,
        path=path,
        max_age=cookie_max_age,
    )

terminate_session(session_id) async

Terminate a specific session.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session was terminated, False otherwise

Source code in crudadmin/session/manager.py
async def terminate_session(self, session_id: str) -> bool:
    """Terminate a specific session.

    Args:
        session_id: The session ID

    Returns:
        True if the session was terminated, False otherwise
    """
    try:
        session_data = await self.storage.get(session_id, SessionData)
        if session_data is None:
            return False

        session_data.is_active = False
        session_data.metadata = {
            **session_data.metadata,
            "terminated_at": datetime.now(UTC).isoformat(),
            "termination_reason": "manual_termination",
        }

        return await self.storage.update(session_id, session_data)

    except Exception as e:
        logger.error(f"Error terminating session: {str(e)}", exc_info=True)
        return False

track_login_attempt(ip_address, username, success=False) async

Track login attempts and apply rate limiting.

Parameters:

Name Type Description Default
ip_address str

Client IP address

required
username str

Username being used for login

required
success bool

Whether the login attempt was successful

False

Returns:

Type Description
tuple[bool, Optional[int]]

Tuple of (is_allowed, attempts_remaining)

If rate limiting is not configured, this will always return (True, None) but log a warning about missing rate limiting.

Source code in crudadmin/session/manager.py
async def track_login_attempt(
    self, ip_address: str, username: str, success: bool = False
) -> tuple[bool, Optional[int]]:
    """Track login attempts and apply rate limiting.

    Args:
        ip_address: Client IP address
        username: Username being used for login
        success: Whether the login attempt was successful

    Returns:
        Tuple of (is_allowed, attempts_remaining)

    If rate limiting is not configured, this will always return (True, None)
    but log a warning about missing rate limiting.
    """
    if not self.rate_limiter:
        logger.warning(
            "No rate limiter configured for login attempts. "
            "It is strongly recommended to configure rate limiting for security."
        )
        return True, None

    try:
        ip_key = f"login:ip:{ip_address}"
        username_key = f"login:user:{username}"

        if success:
            try:
                await self.rate_limiter.delete(ip_key)
                await self.rate_limiter.delete(username_key)
                return True, None
            except Exception as e:
                logger.warning(
                    f"Error clearing rate limit after successful login: {e}"
                )
                return True, None

        try:
            expiry_seconds = int(self.login_window.total_seconds())
            ip_count = await self.rate_limiter.increment(ip_key, 1, expiry_seconds)
            username_count = await self.rate_limiter.increment(
                username_key, 1, expiry_seconds
            )
        except Exception as e:
            logger.warning(f"Error tracking login attempt rate limits: {e}")
            return True, None

        attempt_count = max(ip_count, username_count)
        remaining = max(0, self.login_max_attempts - attempt_count)

        is_allowed = attempt_count <= self.login_max_attempts

        if not is_allowed:
            logger.warning(
                f"Rate limit exceeded for login: {ip_address}, username: {username}, attempts: {attempt_count}"
            )

        return is_allowed, remaining

    except Exception as e:
        logger.error(f"Unexpected error in login rate limiting: {e}", exc_info=True)
        return True, None

validate_csrf_token(session_id, csrf_token) async

Validate a CSRF token for a session.

Parameters:

Name Type Description Default
session_id str

The session ID

required
csrf_token str

The CSRF token to validate

required

Returns:

Type Description
bool

True if valid, False otherwise

Source code in crudadmin/session/manager.py
async def validate_csrf_token(
    self,
    session_id: str,
    csrf_token: str,
) -> bool:
    """Validate a CSRF token for a session.

    Args:
        session_id: The session ID
        csrf_token: The CSRF token to validate

    Returns:
        True if valid, False otherwise
    """
    if not session_id or not csrf_token:
        logger.warning(
            f"Missing session_id or csrf_token: session_id={session_id}, csrf_token={csrf_token}"
        )
        return False

    try:
        token_data = await self.csrf_storage.get(csrf_token, CSRFToken)
        if token_data is None:
            logger.warning(f"CSRF token not found in storage: {csrf_token}")
            return False

        if token_data.session_id != session_id:
            logger.warning(
                f"CSRF token session mismatch: {csrf_token} should be for session {session_id}, "
                f"but is for session {token_data.session_id}"
            )
            return False

        current_time = datetime.now(UTC)
        if token_data.expires_at < current_time:
            logger.warning(
                f"CSRF token expired: {csrf_token}, expired at {token_data.expires_at}, current time is {current_time}"
            )
            await self.csrf_storage.delete(csrf_token)
            return False

        return True

    except Exception as e:
        logger.error(f"Error validating CSRF token: {str(e)}", exc_info=True)
        return False

validate_session(session_id, update_activity=True) async

Validate if a session is active and not timed out.

Parameters:

Name Type Description Default
session_id str

The session ID

required
update_activity bool

Whether to update the last activity timestamp

True

Returns:

Type Description
Optional[SessionData]

The session data if valid, None otherwise

Source code in crudadmin/session/manager.py
async def validate_session(
    self, session_id: str, update_activity: bool = True
) -> Optional[SessionData]:
    """Validate if a session is active and not timed out.

    Args:
        session_id: The session ID
        update_activity: Whether to update the last activity timestamp

    Returns:
        The session data if valid, None otherwise
    """
    if not session_id:
        return None

    try:
        session_data = await self.storage.get(session_id, SessionData)
        if session_data is None:
            logger.warning(f"Session not found: {session_id}")
            return None

        if not session_data.is_active:
            logger.warning(f"Session is not active: {session_id}")
            return None

        current_time = datetime.now(UTC)
        session_age = current_time - session_data.last_activity

        if session_age > self.session_timeout:
            logger.warning(f"Session timed out: {session_id}")
            await self.terminate_session(session_id)
            return None

        if update_activity:
            session_data.last_activity = current_time
            await self.storage.update(session_id, session_data)

        return session_data

    except Exception as e:
        logger.error(f"Error validating session: {str(e)}", exc_info=True)
        return None

Session Storage Backends

Abstract Base Class

Bases: Generic[T], ABC

Abstract base class for session storage implementations.

Source code in crudadmin/session/storage.py
class AbstractSessionStorage(Generic[T], ABC):
    """Abstract base class for session storage implementations."""

    def __init__(
        self,
        prefix: str = "session:",
        expiration: int = 1800,
    ):
        """Initialize the session storage.

        Args:
            prefix: Prefix for all session keys
            expiration: Default session expiration in seconds
        """
        self.prefix = prefix
        self.expiration = expiration

    def generate_session_id(self) -> str:
        """Generate a unique session ID.

        Returns:
            A unique session ID string
        """
        return str(uuid4())

    def get_key(self, session_id: str) -> str:
        """Generate the full key for a session ID.

        Args:
            session_id: The session ID

        Returns:
            The full storage key
        """
        return f"{self.prefix}{session_id}"

    @abstractmethod
    async def create(
        self,
        data: T,
        session_id: Optional[str] = None,
        expiration: Optional[int] = None,
    ) -> str:
        """Create a new session.

        Args:
            data: Session data (must be a Pydantic model)
            session_id: Optional session ID. If not provided, one will be generated
            expiration: Optional custom expiration in seconds

        Returns:
            The session ID
        """
        pass

    @abstractmethod
    async def get(self, session_id: str, model_class: Type[T]) -> Optional[T]:
        """Get session data.

        Args:
            session_id: The session ID
            model_class: The Pydantic model class to decode the data into

        Returns:
            The session data or None if session doesn't exist
        """
        pass

    @abstractmethod
    async def update(
        self,
        session_id: str,
        data: T,
        reset_expiration: bool = True,
        expiration: Optional[int] = None,
    ) -> bool:
        """Update session data.

        Args:
            session_id: The session ID
            data: New session data
            reset_expiration: Whether to reset the expiration
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was updated, False if it didn't exist
        """
        pass

    @abstractmethod
    async def delete(self, session_id: str) -> bool:
        """Delete a session.

        Args:
            session_id: The session ID

        Returns:
            True if the session was deleted, False if it didn't exist
        """
        pass

    @abstractmethod
    async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
        """Extend the expiration of a session.

        Args:
            session_id: The session ID
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was extended, False if it didn't exist
        """
        pass

    @abstractmethod
    async def exists(self, session_id: str) -> bool:
        """Check if a session exists.

        Args:
            session_id: The session ID

        Returns:
            True if the session exists, False otherwise
        """
        pass

    @abstractmethod
    async def close(self) -> None:
        """Close the storage connection."""
        pass

__init__(prefix='session:', expiration=1800)

Initialize the session storage.

Parameters:

Name Type Description Default
prefix str

Prefix for all session keys

'session:'
expiration int

Default session expiration in seconds

1800
Source code in crudadmin/session/storage.py
def __init__(
    self,
    prefix: str = "session:",
    expiration: int = 1800,
):
    """Initialize the session storage.

    Args:
        prefix: Prefix for all session keys
        expiration: Default session expiration in seconds
    """
    self.prefix = prefix
    self.expiration = expiration

close() abstractmethod async

Close the storage connection.

Source code in crudadmin/session/storage.py
@abstractmethod
async def close(self) -> None:
    """Close the storage connection."""
    pass

create(data, session_id=None, expiration=None) abstractmethod async

Create a new session.

Parameters:

Name Type Description Default
data T

Session data (must be a Pydantic model)

required
session_id Optional[str]

Optional session ID. If not provided, one will be generated

None
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
str

The session ID

Source code in crudadmin/session/storage.py
@abstractmethod
async def create(
    self,
    data: T,
    session_id: Optional[str] = None,
    expiration: Optional[int] = None,
) -> str:
    """Create a new session.

    Args:
        data: Session data (must be a Pydantic model)
        session_id: Optional session ID. If not provided, one will be generated
        expiration: Optional custom expiration in seconds

    Returns:
        The session ID
    """
    pass

delete(session_id) abstractmethod async

Delete a session.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session was deleted, False if it didn't exist

Source code in crudadmin/session/storage.py
@abstractmethod
async def delete(self, session_id: str) -> bool:
    """Delete a session.

    Args:
        session_id: The session ID

    Returns:
        True if the session was deleted, False if it didn't exist
    """
    pass

exists(session_id) abstractmethod async

Check if a session exists.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session exists, False otherwise

Source code in crudadmin/session/storage.py
@abstractmethod
async def exists(self, session_id: str) -> bool:
    """Check if a session exists.

    Args:
        session_id: The session ID

    Returns:
        True if the session exists, False otherwise
    """
    pass

extend(session_id, expiration=None) abstractmethod async

Extend the expiration of a session.

Parameters:

Name Type Description Default
session_id str

The session ID

required
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was extended, False if it didn't exist

Source code in crudadmin/session/storage.py
@abstractmethod
async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
    """Extend the expiration of a session.

    Args:
        session_id: The session ID
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was extended, False if it didn't exist
    """
    pass

generate_session_id()

Generate a unique session ID.

Returns:

Type Description
str

A unique session ID string

Source code in crudadmin/session/storage.py
def generate_session_id(self) -> str:
    """Generate a unique session ID.

    Returns:
        A unique session ID string
    """
    return str(uuid4())

get(session_id, model_class) abstractmethod async

Get session data.

Parameters:

Name Type Description Default
session_id str

The session ID

required
model_class Type[T]

The Pydantic model class to decode the data into

required

Returns:

Type Description
Optional[T]

The session data or None if session doesn't exist

Source code in crudadmin/session/storage.py
@abstractmethod
async def get(self, session_id: str, model_class: Type[T]) -> Optional[T]:
    """Get session data.

    Args:
        session_id: The session ID
        model_class: The Pydantic model class to decode the data into

    Returns:
        The session data or None if session doesn't exist
    """
    pass

get_key(session_id)

Generate the full key for a session ID.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
str

The full storage key

Source code in crudadmin/session/storage.py
def get_key(self, session_id: str) -> str:
    """Generate the full key for a session ID.

    Args:
        session_id: The session ID

    Returns:
        The full storage key
    """
    return f"{self.prefix}{session_id}"

update(session_id, data, reset_expiration=True, expiration=None) abstractmethod async

Update session data.

Parameters:

Name Type Description Default
session_id str

The session ID

required
data T

New session data

required
reset_expiration bool

Whether to reset the expiration

True
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was updated, False if it didn't exist

Source code in crudadmin/session/storage.py
@abstractmethod
async def update(
    self,
    session_id: str,
    data: T,
    reset_expiration: bool = True,
    expiration: Optional[int] = None,
) -> bool:
    """Update session data.

    Args:
        session_id: The session ID
        data: New session data
        reset_expiration: Whether to reset the expiration
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was updated, False if it didn't exist
    """
    pass

Storage Factory

Get the appropriate session storage backend.

Parameters:

Name Type Description Default
backend str

The backend to use ("redis", "memcached", "memory", "database", "hybrid")

required
model_type Type[BaseModel]

The pydantic model type for type checking

required
**kwargs Any

Additional arguments to pass to the backend

{}

Returns:

Type Description
AbstractSessionStorage[T]

An initialized storage backend

Source code in crudadmin/session/storage.py
def get_session_storage(
    backend: str, model_type: Type[BaseModel], **kwargs: Any
) -> AbstractSessionStorage[T]:
    """Get the appropriate session storage backend.

    Args:
        backend: The backend to use ("redis", "memcached", "memory", "database", "hybrid")
        model_type: The pydantic model type for type checking
        **kwargs: Additional arguments to pass to the backend

    Returns:
        An initialized storage backend
    """
    if backend == "redis":
        from .backends.redis import RedisSessionStorage

        return RedisSessionStorage(**kwargs)
    elif backend == "memcached":
        from .backends.memcached import MemcachedSessionStorage

        return MemcachedSessionStorage(**kwargs)
    elif backend == "memory":
        from .backends.memory import MemorySessionStorage

        return MemorySessionStorage(**kwargs)
    elif backend == "database":
        from .backends.database import DatabaseSessionStorage

        return DatabaseSessionStorage(**kwargs)
    elif backend == "hybrid":
        from .backends.hybrid import HybridSessionStorage

        db_config = kwargs.pop("db_config", None)
        if not db_config:
            raise ValueError("db_config is required for hybrid backend")

        redis_kwargs = {k: v for k, v in kwargs.items() if k not in ["db_config"]}
        from .backends.redis import RedisSessionStorage

        redis_storage: RedisSessionStorage[T] = RedisSessionStorage(**redis_kwargs)

        from .backends.database import DatabaseSessionStorage

        database_storage: DatabaseSessionStorage[T] = DatabaseSessionStorage(
            db_config=db_config, **kwargs
        )

        return HybridSessionStorage(
            redis_storage=redis_storage, database_storage=database_storage, **kwargs
        )
    else:
        raise ValueError(f"Unknown backend: {backend}")

Session Storage Implementations

Memory Storage

Bases: AbstractSessionStorage[T]

In-memory implementation of session storage for testing.

Source code in crudadmin/session/backends/memory.py
class MemorySessionStorage(AbstractSessionStorage[T]):
    """In-memory implementation of session storage for testing."""

    def __init__(
        self,
        prefix: str = "session:",
        expiration: int = 1800,
    ):
        """Initialize the in-memory session storage.

        Args:
            prefix: Prefix for all session keys
            expiration: Default session expiration in seconds
        """
        super().__init__(prefix=prefix, expiration=expiration)
        self.data: dict[str, bytes] = {}
        self.expiry: dict[str, datetime] = {}

    async def create(
        self,
        data: T,
        session_id: Optional[str] = None,
        expiration: Optional[int] = None,
    ) -> str:
        """Create a new session in memory.

        Args:
            data: Session data (must be a Pydantic model)
            session_id: Optional session ID. If not provided, one will be generated
            expiration: Optional custom expiration in seconds

        Returns:
            The session ID
        """
        if session_id is None:
            session_id = self.generate_session_id()

        key = self.get_key(session_id)
        exp = expiration if expiration is not None else self.expiration

        json_data = data.model_dump_json()

        value_bytes = (
            json_data.encode("utf-8") if isinstance(json_data, str) else json_data
        )

        self.data[key] = value_bytes
        self.expiry[key] = datetime.now(UTC) + timedelta(seconds=exp)

        logger.debug(f"Created session {session_id} with expiration {exp}s")
        return session_id

    async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
        """Get session data from memory.

        Args:
            session_id: The session ID
            model_class: The Pydantic model class to decode the data into

        Returns:
            The session data or None if session doesn't exist
        """
        key = self.get_key(session_id)

        if self._check_expiry(key):
            return None

        data_bytes = self.data.get(key)
        if data_bytes is None:
            return None

        try:
            data_str = (
                data_bytes.decode("utf-8")
                if isinstance(data_bytes, bytes)
                else data_bytes
            )
            json_data = json.loads(data_str)
            return model_class.model_validate(json_data)
        except (json.JSONDecodeError, ValueError) as e:
            logger.error(f"Error parsing session data: {e}")
            return None

    async def update(
        self,
        session_id: str,
        data: T,
        reset_expiration: bool = True,
        expiration: Optional[int] = None,
    ) -> bool:
        """Update session data in memory.

        Args:
            session_id: The session ID
            data: New session data
            reset_expiration: Whether to reset the expiration
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was updated, False if it didn't exist
        """
        key = self.get_key(session_id)

        if key not in self.data or self._check_expiry(key):
            return False

        json_data = data.model_dump_json()
        value_bytes = (
            json_data.encode("utf-8") if isinstance(json_data, str) else json_data
        )

        self.data[key] = value_bytes

        if reset_expiration:
            exp = expiration if expiration is not None else self.expiration
            self.expiry[key] = datetime.now(UTC) + timedelta(seconds=exp)

        return True

    async def delete(self, session_id: str) -> bool:
        """Delete a session from memory.

        Args:
            session_id: The session ID

        Returns:
            True if the session was deleted, False if it didn't exist
        """
        key = self.get_key(session_id)

        if key in self.data:
            del self.data[key]
            if key in self.expiry:
                del self.expiry[key]
            return True
        return False

    async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
        """Extend the expiration of a session in memory.

        Args:
            session_id: The session ID
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was extended, False if it didn't exist
        """
        key = self.get_key(session_id)
        exp = expiration if expiration is not None else self.expiration

        if key in self.data and not self._check_expiry(key):
            self.expiry[key] = datetime.now(UTC) + timedelta(seconds=exp)
            return True
        return False

    async def exists(self, session_id: str) -> bool:
        """Check if a session exists in memory.

        Args:
            session_id: The session ID

        Returns:
            True if the session exists, False otherwise
        """
        key = self.get_key(session_id)
        return key in self.data and not self._check_expiry(key)

    async def _scan_iter(self, match: Optional[str] = None) -> list[str]:
        """Scan for keys matching a pattern.

        Args:
            match: Pattern to match

        Returns:
            List of matching keys
        """
        if match:
            pattern = match.replace("*", ".*").replace("?", ".")
            pattern = f"^{pattern}$"
            regex: Pattern = re.compile(pattern)

            matching_keys = []
            for key in list(self.data.keys()):
                if self._check_expiry(key):
                    continue

                if regex.match(key):
                    matching_keys.append(key)
            return matching_keys
        else:
            return [
                key for key in list(self.data.keys()) if not self._check_expiry(key)
            ]

    def _check_expiry(self, key: str) -> bool:
        """Check if a key has expired and remove it if so.

        Args:
            key: The key to check

        Returns:
            True if expired (and removed), False otherwise
        """
        if key in self.expiry and datetime.now(UTC) > self.expiry[key]:
            del self.data[key]
            del self.expiry[key]
            return True
        return False

    async def close(self) -> None:
        """Clear all data."""
        self.data.clear()
        self.expiry.clear()

    async def delete_pattern(self, pattern: str) -> int:
        """Delete all keys matching a pattern.

        Args:
            pattern: The pattern to match keys (e.g., "login:*")

        Returns:
            Number of keys deleted
        """
        matching_keys = await self._scan_iter(match=pattern)

        deleted_count = 0
        for key in matching_keys:
            if key in self.data:
                del self.data[key]
                if key in self.expiry:
                    del self.expiry[key]
                deleted_count += 1

        logger.debug(f"Deleted {deleted_count} keys matching pattern '{pattern}'")
        return deleted_count

__init__(prefix='session:', expiration=1800)

Initialize the in-memory session storage.

Parameters:

Name Type Description Default
prefix str

Prefix for all session keys

'session:'
expiration int

Default session expiration in seconds

1800
Source code in crudadmin/session/backends/memory.py
def __init__(
    self,
    prefix: str = "session:",
    expiration: int = 1800,
):
    """Initialize the in-memory session storage.

    Args:
        prefix: Prefix for all session keys
        expiration: Default session expiration in seconds
    """
    super().__init__(prefix=prefix, expiration=expiration)
    self.data: dict[str, bytes] = {}
    self.expiry: dict[str, datetime] = {}

close() async

Clear all data.

Source code in crudadmin/session/backends/memory.py
async def close(self) -> None:
    """Clear all data."""
    self.data.clear()
    self.expiry.clear()

create(data, session_id=None, expiration=None) async

Create a new session in memory.

Parameters:

Name Type Description Default
data T

Session data (must be a Pydantic model)

required
session_id Optional[str]

Optional session ID. If not provided, one will be generated

None
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
str

The session ID

Source code in crudadmin/session/backends/memory.py
async def create(
    self,
    data: T,
    session_id: Optional[str] = None,
    expiration: Optional[int] = None,
) -> str:
    """Create a new session in memory.

    Args:
        data: Session data (must be a Pydantic model)
        session_id: Optional session ID. If not provided, one will be generated
        expiration: Optional custom expiration in seconds

    Returns:
        The session ID
    """
    if session_id is None:
        session_id = self.generate_session_id()

    key = self.get_key(session_id)
    exp = expiration if expiration is not None else self.expiration

    json_data = data.model_dump_json()

    value_bytes = (
        json_data.encode("utf-8") if isinstance(json_data, str) else json_data
    )

    self.data[key] = value_bytes
    self.expiry[key] = datetime.now(UTC) + timedelta(seconds=exp)

    logger.debug(f"Created session {session_id} with expiration {exp}s")
    return session_id

delete(session_id) async

Delete a session from memory.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session was deleted, False if it didn't exist

Source code in crudadmin/session/backends/memory.py
async def delete(self, session_id: str) -> bool:
    """Delete a session from memory.

    Args:
        session_id: The session ID

    Returns:
        True if the session was deleted, False if it didn't exist
    """
    key = self.get_key(session_id)

    if key in self.data:
        del self.data[key]
        if key in self.expiry:
            del self.expiry[key]
        return True
    return False

delete_pattern(pattern) async

Delete all keys matching a pattern.

Parameters:

Name Type Description Default
pattern str

The pattern to match keys (e.g., "login:*")

required

Returns:

Type Description
int

Number of keys deleted

Source code in crudadmin/session/backends/memory.py
async def delete_pattern(self, pattern: str) -> int:
    """Delete all keys matching a pattern.

    Args:
        pattern: The pattern to match keys (e.g., "login:*")

    Returns:
        Number of keys deleted
    """
    matching_keys = await self._scan_iter(match=pattern)

    deleted_count = 0
    for key in matching_keys:
        if key in self.data:
            del self.data[key]
            if key in self.expiry:
                del self.expiry[key]
            deleted_count += 1

    logger.debug(f"Deleted {deleted_count} keys matching pattern '{pattern}'")
    return deleted_count

exists(session_id) async

Check if a session exists in memory.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session exists, False otherwise

Source code in crudadmin/session/backends/memory.py
async def exists(self, session_id: str) -> bool:
    """Check if a session exists in memory.

    Args:
        session_id: The session ID

    Returns:
        True if the session exists, False otherwise
    """
    key = self.get_key(session_id)
    return key in self.data and not self._check_expiry(key)

extend(session_id, expiration=None) async

Extend the expiration of a session in memory.

Parameters:

Name Type Description Default
session_id str

The session ID

required
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was extended, False if it didn't exist

Source code in crudadmin/session/backends/memory.py
async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
    """Extend the expiration of a session in memory.

    Args:
        session_id: The session ID
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was extended, False if it didn't exist
    """
    key = self.get_key(session_id)
    exp = expiration if expiration is not None else self.expiration

    if key in self.data and not self._check_expiry(key):
        self.expiry[key] = datetime.now(UTC) + timedelta(seconds=exp)
        return True
    return False

get(session_id, model_class) async

Get session data from memory.

Parameters:

Name Type Description Default
session_id str

The session ID

required
model_class type[T]

The Pydantic model class to decode the data into

required

Returns:

Type Description
Optional[T]

The session data or None if session doesn't exist

Source code in crudadmin/session/backends/memory.py
async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
    """Get session data from memory.

    Args:
        session_id: The session ID
        model_class: The Pydantic model class to decode the data into

    Returns:
        The session data or None if session doesn't exist
    """
    key = self.get_key(session_id)

    if self._check_expiry(key):
        return None

    data_bytes = self.data.get(key)
    if data_bytes is None:
        return None

    try:
        data_str = (
            data_bytes.decode("utf-8")
            if isinstance(data_bytes, bytes)
            else data_bytes
        )
        json_data = json.loads(data_str)
        return model_class.model_validate(json_data)
    except (json.JSONDecodeError, ValueError) as e:
        logger.error(f"Error parsing session data: {e}")
        return None

update(session_id, data, reset_expiration=True, expiration=None) async

Update session data in memory.

Parameters:

Name Type Description Default
session_id str

The session ID

required
data T

New session data

required
reset_expiration bool

Whether to reset the expiration

True
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was updated, False if it didn't exist

Source code in crudadmin/session/backends/memory.py
async def update(
    self,
    session_id: str,
    data: T,
    reset_expiration: bool = True,
    expiration: Optional[int] = None,
) -> bool:
    """Update session data in memory.

    Args:
        session_id: The session ID
        data: New session data
        reset_expiration: Whether to reset the expiration
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was updated, False if it didn't exist
    """
    key = self.get_key(session_id)

    if key not in self.data or self._check_expiry(key):
        return False

    json_data = data.model_dump_json()
    value_bytes = (
        json_data.encode("utf-8") if isinstance(json_data, str) else json_data
    )

    self.data[key] = value_bytes

    if reset_expiration:
        exp = expiration if expiration is not None else self.expiration
        self.expiry[key] = datetime.now(UTC) + timedelta(seconds=exp)

    return True

Redis Storage

Bases: AbstractSessionStorage[T]

Redis implementation of session storage.

Source code in crudadmin/session/backends/redis.py
class RedisSessionStorage(AbstractSessionStorage[T]):
    """Redis implementation of session storage."""

    def __init__(
        self,
        prefix: str = "session:",
        expiration: int = 1800,
        host: str = "localhost",
        port: int = 6379,
        db: int = 0,
        password: Optional[str] = None,
        pool_size: int = 10,
        connect_timeout: int = 10,
    ):
        """Initialize the Redis session storage.

        Args:
            prefix: Prefix for all session keys
            expiration: Default session expiration in seconds
            host: Redis host
            port: Redis port
            db: Redis database number
            password: Redis password
            pool_size: Redis connection pool size
            connect_timeout: Redis connection timeout
        """
        super().__init__(prefix=prefix, expiration=expiration)

        self.client = Redis(
            host=host,
            port=port,
            db=db,
            password=password,
            socket_timeout=connect_timeout,
            socket_connect_timeout=connect_timeout,
            socket_keepalive=True,
            decode_responses=False,
            max_connections=pool_size,
        )

        self.user_sessions_prefix = f"{prefix}user:"

    def get_user_sessions_key(self, user_id: int) -> str:
        """Get the key for a user's sessions set.

        Args:
            user_id: The user ID

        Returns:
            The Redis key for the user's sessions set
        """
        return f"{self.user_sessions_prefix}{user_id}"

    async def create(
        self,
        data: T,
        session_id: Optional[str] = None,
        expiration: Optional[int] = None,
    ) -> str:
        """Create a new session in Redis.

        Args:
            data: Session data (must be a Pydantic model)
            session_id: Optional session ID. If not provided, one will be generated
            expiration: Optional custom expiration in seconds

        Returns:
            The session ID

        Raises:
            RedisError: If there is an error with Redis
        """
        if session_id is None:
            session_id = self.generate_session_id()

        key = self.get_key(session_id)
        exp = expiration if expiration is not None else self.expiration

        json_data = data.model_dump_json()

        try:
            pipeline = self.client.pipeline()
            pipeline.set(key, json_data, ex=exp)

            if hasattr(data, "user_id"):
                user_id = data.user_id
                user_sessions_key = self.get_user_sessions_key(user_id)

                pipeline.sadd(user_sessions_key, session_id)

                pipeline.expire(user_sessions_key, exp + 3600)

            await pipeline.execute()
            logger.debug(f"Created session {session_id} with expiration {exp}s")
            return session_id
        except RedisError as e:
            logger.error(f"Error creating session: {e}")
            raise

    async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
        """Get session data from Redis.

        Args:
            session_id: The session ID
            model_class: The Pydantic model class to decode the data into

        Returns:
            The session data or None if session doesn't exist

        Raises:
            RedisError: If there is an error with Redis
            ValueError: If the data cannot be parsed
        """
        key = self.get_key(session_id)

        try:
            data = await self.client.get(key)
            if data is None:
                return None

            try:
                json_data = json.loads(data)
                return model_class.model_validate(json_data)
            except (json.JSONDecodeError, ValueError) as e:
                logger.error(f"Error parsing session data: {e}")
                return None

        except RedisError as e:
            logger.error(f"Error getting session: {e}")
            raise

    async def update(
        self,
        session_id: str,
        data: T,
        reset_expiration: bool = True,
        expiration: Optional[int] = None,
    ) -> bool:
        """Update session data in Redis.

        Args:
            session_id: The session ID
            data: New session data
            reset_expiration: Whether to reset the expiration
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was updated, False if it didn't exist

        Raises:
            RedisError: If there is an error with Redis
        """
        key = self.get_key(session_id)

        try:
            if not await self.client.exists(key):
                return False

            json_data = data.model_dump_json()
            pipeline = self.client.pipeline()

            if reset_expiration:
                exp = expiration if expiration is not None else self.expiration
                pipeline.set(key, json_data, ex=exp)

                if hasattr(data, "user_id"):
                    user_id = data.user_id
                    user_sessions_key = self.get_user_sessions_key(user_id)
                    pipeline.expire(user_sessions_key, exp + 3600)
            else:
                ttl = await self.client.ttl(key)
                if ttl > 0:
                    pipeline.set(key, json_data, ex=ttl)
                else:
                    exp = expiration if expiration is not None else self.expiration
                    pipeline.set(key, json_data, ex=exp)

                    if hasattr(data, "user_id"):
                        user_id = data.user_id
                        user_sessions_key = self.get_user_sessions_key(user_id)
                        pipeline.expire(user_sessions_key, exp + 3600)

            await pipeline.execute()
            return True

        except RedisError as e:
            logger.error(f"Error updating session: {e}")
            raise

    async def delete(self, session_id: str) -> bool:
        """Delete a session from Redis.

        Args:
            session_id: The session ID

        Returns:
            True if the session was deleted, False if it didn't exist

        Raises:
            RedisError: If there is an error with Redis
        """
        key = self.get_key(session_id)

        try:
            data = await self.client.get(key)
            if data is None:
                return False

            pipeline = self.client.pipeline()

            pipeline.delete(key)

            try:
                json_data = json.loads(data)
                if "user_id" in json_data:
                    user_id = json_data["user_id"]
                    user_sessions_key = self.get_user_sessions_key(user_id)
                    pipeline.srem(user_sessions_key, session_id)
            except (json.JSONDecodeError, ValueError):
                pass

            result = await pipeline.execute()
            return bool(result[0] > 0)
        except RedisError as e:
            logger.error(f"Error deleting session: {e}")
            raise

    async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
        """Extend the expiration of a session in Redis.

        Args:
            session_id: The session ID
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was extended, False if it didn't exist

        Raises:
            RedisError: If there is an error with Redis
        """
        key = self.get_key(session_id)
        exp = expiration if expiration is not None else self.expiration

        try:
            data = await self.client.get(key)
            if data is None:
                return False

            pipeline = self.client.pipeline()

            pipeline.expire(key, exp)

            try:
                json_data = json.loads(data)
                if "user_id" in json_data:
                    user_id = json_data["user_id"]
                    user_sessions_key = self.get_user_sessions_key(user_id)
                    pipeline.expire(user_sessions_key, exp + 3600)
            except (json.JSONDecodeError, ValueError):
                pass

            results = await pipeline.execute()
            return bool(results[0])

        except RedisError as e:
            logger.error(f"Error extending session: {e}")
            raise

    async def exists(self, session_id: str) -> bool:
        """Check if a session exists in Redis.

        Args:
            session_id: The session ID

        Returns:
            True if the session exists, False otherwise

        Raises:
            RedisError: If there is an error with Redis
        """
        key = self.get_key(session_id)

        try:
            exists_result = await self.client.exists(key)
            return bool(exists_result)
        except RedisError as e:
            logger.error(f"Error checking session existence: {e}")
            raise

    async def get_user_sessions(self, user_id: int) -> list[str]:
        """Get all session IDs for a user.

        Args:
            user_id: The user ID

        Returns:
            List of session IDs for the user

        Raises:
            RedisError: If there is an error with Redis
        """
        user_sessions_key = self.get_user_sessions_key(user_id)

        try:
            members = await self.client.smembers(user_sessions_key)
            return [m.decode("utf-8") if isinstance(m, bytes) else m for m in members]
        except RedisError as e:
            logger.error(f"Error getting user sessions: {e}")
            raise

    async def close(self) -> None:
        """Close the Redis connection."""
        import warnings

        if hasattr(self.client, "aclose"):
            await self.client.aclose()
        else:
            with warnings.catch_warnings():
                warnings.filterwarnings(
                    "ignore",
                    category=DeprecationWarning,
                    message=".*Call to deprecated close.*",
                )
                await self.client.close()

    async def delete_pattern(self, pattern: str) -> int:
        """Delete all Redis keys matching a pattern.

        This method is useful for bulk cleanup operations like clearing
        expired rate limiting keys or other grouped data.

        Args:
            pattern: The pattern to match keys (e.g., "login:*")

        Returns:
            Number of keys deleted

        Raises:
            RedisError: If there is an error with Redis
        """
        try:
            matched_keys = []
            async for key in self.client.scan_iter(match=pattern):
                matched_keys.append(key)

            if not matched_keys:
                return 0

            pipeline = self.client.pipeline()
            for key in matched_keys:
                pipeline.delete(key)

            results = await pipeline.execute()
            deleted_count = sum(1 for result in results if result > 0)

            logger.debug(f"Deleted {deleted_count} keys matching pattern '{pattern}'")
            return deleted_count

        except RedisError as e:
            logger.error(f"Error deleting keys with pattern '{pattern}': {e}")
            raise

__init__(prefix='session:', expiration=1800, host='localhost', port=6379, db=0, password=None, pool_size=10, connect_timeout=10)

Initialize the Redis session storage.

Parameters:

Name Type Description Default
prefix str

Prefix for all session keys

'session:'
expiration int

Default session expiration in seconds

1800
host str

Redis host

'localhost'
port int

Redis port

6379
db int

Redis database number

0
password Optional[str]

Redis password

None
pool_size int

Redis connection pool size

10
connect_timeout int

Redis connection timeout

10
Source code in crudadmin/session/backends/redis.py
def __init__(
    self,
    prefix: str = "session:",
    expiration: int = 1800,
    host: str = "localhost",
    port: int = 6379,
    db: int = 0,
    password: Optional[str] = None,
    pool_size: int = 10,
    connect_timeout: int = 10,
):
    """Initialize the Redis session storage.

    Args:
        prefix: Prefix for all session keys
        expiration: Default session expiration in seconds
        host: Redis host
        port: Redis port
        db: Redis database number
        password: Redis password
        pool_size: Redis connection pool size
        connect_timeout: Redis connection timeout
    """
    super().__init__(prefix=prefix, expiration=expiration)

    self.client = Redis(
        host=host,
        port=port,
        db=db,
        password=password,
        socket_timeout=connect_timeout,
        socket_connect_timeout=connect_timeout,
        socket_keepalive=True,
        decode_responses=False,
        max_connections=pool_size,
    )

    self.user_sessions_prefix = f"{prefix}user:"

close() async

Close the Redis connection.

Source code in crudadmin/session/backends/redis.py
async def close(self) -> None:
    """Close the Redis connection."""
    import warnings

    if hasattr(self.client, "aclose"):
        await self.client.aclose()
    else:
        with warnings.catch_warnings():
            warnings.filterwarnings(
                "ignore",
                category=DeprecationWarning,
                message=".*Call to deprecated close.*",
            )
            await self.client.close()

create(data, session_id=None, expiration=None) async

Create a new session in Redis.

Parameters:

Name Type Description Default
data T

Session data (must be a Pydantic model)

required
session_id Optional[str]

Optional session ID. If not provided, one will be generated

None
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
str

The session ID

Raises:

Type Description
RedisError

If there is an error with Redis

Source code in crudadmin/session/backends/redis.py
async def create(
    self,
    data: T,
    session_id: Optional[str] = None,
    expiration: Optional[int] = None,
) -> str:
    """Create a new session in Redis.

    Args:
        data: Session data (must be a Pydantic model)
        session_id: Optional session ID. If not provided, one will be generated
        expiration: Optional custom expiration in seconds

    Returns:
        The session ID

    Raises:
        RedisError: If there is an error with Redis
    """
    if session_id is None:
        session_id = self.generate_session_id()

    key = self.get_key(session_id)
    exp = expiration if expiration is not None else self.expiration

    json_data = data.model_dump_json()

    try:
        pipeline = self.client.pipeline()
        pipeline.set(key, json_data, ex=exp)

        if hasattr(data, "user_id"):
            user_id = data.user_id
            user_sessions_key = self.get_user_sessions_key(user_id)

            pipeline.sadd(user_sessions_key, session_id)

            pipeline.expire(user_sessions_key, exp + 3600)

        await pipeline.execute()
        logger.debug(f"Created session {session_id} with expiration {exp}s")
        return session_id
    except RedisError as e:
        logger.error(f"Error creating session: {e}")
        raise

delete(session_id) async

Delete a session from Redis.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session was deleted, False if it didn't exist

Raises:

Type Description
RedisError

If there is an error with Redis

Source code in crudadmin/session/backends/redis.py
async def delete(self, session_id: str) -> bool:
    """Delete a session from Redis.

    Args:
        session_id: The session ID

    Returns:
        True if the session was deleted, False if it didn't exist

    Raises:
        RedisError: If there is an error with Redis
    """
    key = self.get_key(session_id)

    try:
        data = await self.client.get(key)
        if data is None:
            return False

        pipeline = self.client.pipeline()

        pipeline.delete(key)

        try:
            json_data = json.loads(data)
            if "user_id" in json_data:
                user_id = json_data["user_id"]
                user_sessions_key = self.get_user_sessions_key(user_id)
                pipeline.srem(user_sessions_key, session_id)
        except (json.JSONDecodeError, ValueError):
            pass

        result = await pipeline.execute()
        return bool(result[0] > 0)
    except RedisError as e:
        logger.error(f"Error deleting session: {e}")
        raise

delete_pattern(pattern) async

Delete all Redis keys matching a pattern.

This method is useful for bulk cleanup operations like clearing expired rate limiting keys or other grouped data.

Parameters:

Name Type Description Default
pattern str

The pattern to match keys (e.g., "login:*")

required

Returns:

Type Description
int

Number of keys deleted

Raises:

Type Description
RedisError

If there is an error with Redis

Source code in crudadmin/session/backends/redis.py
async def delete_pattern(self, pattern: str) -> int:
    """Delete all Redis keys matching a pattern.

    This method is useful for bulk cleanup operations like clearing
    expired rate limiting keys or other grouped data.

    Args:
        pattern: The pattern to match keys (e.g., "login:*")

    Returns:
        Number of keys deleted

    Raises:
        RedisError: If there is an error with Redis
    """
    try:
        matched_keys = []
        async for key in self.client.scan_iter(match=pattern):
            matched_keys.append(key)

        if not matched_keys:
            return 0

        pipeline = self.client.pipeline()
        for key in matched_keys:
            pipeline.delete(key)

        results = await pipeline.execute()
        deleted_count = sum(1 for result in results if result > 0)

        logger.debug(f"Deleted {deleted_count} keys matching pattern '{pattern}'")
        return deleted_count

    except RedisError as e:
        logger.error(f"Error deleting keys with pattern '{pattern}': {e}")
        raise

exists(session_id) async

Check if a session exists in Redis.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session exists, False otherwise

Raises:

Type Description
RedisError

If there is an error with Redis

Source code in crudadmin/session/backends/redis.py
async def exists(self, session_id: str) -> bool:
    """Check if a session exists in Redis.

    Args:
        session_id: The session ID

    Returns:
        True if the session exists, False otherwise

    Raises:
        RedisError: If there is an error with Redis
    """
    key = self.get_key(session_id)

    try:
        exists_result = await self.client.exists(key)
        return bool(exists_result)
    except RedisError as e:
        logger.error(f"Error checking session existence: {e}")
        raise

extend(session_id, expiration=None) async

Extend the expiration of a session in Redis.

Parameters:

Name Type Description Default
session_id str

The session ID

required
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was extended, False if it didn't exist

Raises:

Type Description
RedisError

If there is an error with Redis

Source code in crudadmin/session/backends/redis.py
async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
    """Extend the expiration of a session in Redis.

    Args:
        session_id: The session ID
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was extended, False if it didn't exist

    Raises:
        RedisError: If there is an error with Redis
    """
    key = self.get_key(session_id)
    exp = expiration if expiration is not None else self.expiration

    try:
        data = await self.client.get(key)
        if data is None:
            return False

        pipeline = self.client.pipeline()

        pipeline.expire(key, exp)

        try:
            json_data = json.loads(data)
            if "user_id" in json_data:
                user_id = json_data["user_id"]
                user_sessions_key = self.get_user_sessions_key(user_id)
                pipeline.expire(user_sessions_key, exp + 3600)
        except (json.JSONDecodeError, ValueError):
            pass

        results = await pipeline.execute()
        return bool(results[0])

    except RedisError as e:
        logger.error(f"Error extending session: {e}")
        raise

get(session_id, model_class) async

Get session data from Redis.

Parameters:

Name Type Description Default
session_id str

The session ID

required
model_class type[T]

The Pydantic model class to decode the data into

required

Returns:

Type Description
Optional[T]

The session data or None if session doesn't exist

Raises:

Type Description
RedisError

If there is an error with Redis

ValueError

If the data cannot be parsed

Source code in crudadmin/session/backends/redis.py
async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
    """Get session data from Redis.

    Args:
        session_id: The session ID
        model_class: The Pydantic model class to decode the data into

    Returns:
        The session data or None if session doesn't exist

    Raises:
        RedisError: If there is an error with Redis
        ValueError: If the data cannot be parsed
    """
    key = self.get_key(session_id)

    try:
        data = await self.client.get(key)
        if data is None:
            return None

        try:
            json_data = json.loads(data)
            return model_class.model_validate(json_data)
        except (json.JSONDecodeError, ValueError) as e:
            logger.error(f"Error parsing session data: {e}")
            return None

    except RedisError as e:
        logger.error(f"Error getting session: {e}")
        raise

get_user_sessions(user_id) async

Get all session IDs for a user.

Parameters:

Name Type Description Default
user_id int

The user ID

required

Returns:

Type Description
list[str]

List of session IDs for the user

Raises:

Type Description
RedisError

If there is an error with Redis

Source code in crudadmin/session/backends/redis.py
async def get_user_sessions(self, user_id: int) -> list[str]:
    """Get all session IDs for a user.

    Args:
        user_id: The user ID

    Returns:
        List of session IDs for the user

    Raises:
        RedisError: If there is an error with Redis
    """
    user_sessions_key = self.get_user_sessions_key(user_id)

    try:
        members = await self.client.smembers(user_sessions_key)
        return [m.decode("utf-8") if isinstance(m, bytes) else m for m in members]
    except RedisError as e:
        logger.error(f"Error getting user sessions: {e}")
        raise

get_user_sessions_key(user_id)

Get the key for a user's sessions set.

Parameters:

Name Type Description Default
user_id int

The user ID

required

Returns:

Type Description
str

The Redis key for the user's sessions set

Source code in crudadmin/session/backends/redis.py
def get_user_sessions_key(self, user_id: int) -> str:
    """Get the key for a user's sessions set.

    Args:
        user_id: The user ID

    Returns:
        The Redis key for the user's sessions set
    """
    return f"{self.user_sessions_prefix}{user_id}"

update(session_id, data, reset_expiration=True, expiration=None) async

Update session data in Redis.

Parameters:

Name Type Description Default
session_id str

The session ID

required
data T

New session data

required
reset_expiration bool

Whether to reset the expiration

True
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was updated, False if it didn't exist

Raises:

Type Description
RedisError

If there is an error with Redis

Source code in crudadmin/session/backends/redis.py
async def update(
    self,
    session_id: str,
    data: T,
    reset_expiration: bool = True,
    expiration: Optional[int] = None,
) -> bool:
    """Update session data in Redis.

    Args:
        session_id: The session ID
        data: New session data
        reset_expiration: Whether to reset the expiration
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was updated, False if it didn't exist

    Raises:
        RedisError: If there is an error with Redis
    """
    key = self.get_key(session_id)

    try:
        if not await self.client.exists(key):
            return False

        json_data = data.model_dump_json()
        pipeline = self.client.pipeline()

        if reset_expiration:
            exp = expiration if expiration is not None else self.expiration
            pipeline.set(key, json_data, ex=exp)

            if hasattr(data, "user_id"):
                user_id = data.user_id
                user_sessions_key = self.get_user_sessions_key(user_id)
                pipeline.expire(user_sessions_key, exp + 3600)
        else:
            ttl = await self.client.ttl(key)
            if ttl > 0:
                pipeline.set(key, json_data, ex=ttl)
            else:
                exp = expiration if expiration is not None else self.expiration
                pipeline.set(key, json_data, ex=exp)

                if hasattr(data, "user_id"):
                    user_id = data.user_id
                    user_sessions_key = self.get_user_sessions_key(user_id)
                    pipeline.expire(user_sessions_key, exp + 3600)

        await pipeline.execute()
        return True

    except RedisError as e:
        logger.error(f"Error updating session: {e}")
        raise

Memcached Storage

Bases: AbstractSessionStorage[T]

Memcached implementation of session storage.

Source code in crudadmin/session/backends/memcached.py
class MemcachedSessionStorage(AbstractSessionStorage[T]):
    """Memcached implementation of session storage."""

    def __init__(
        self,
        prefix: str = "session:",
        expiration: int = 1800,
        host: str = "localhost",
        port: int = 11211,
        pool_size: int = 10,
    ):
        """Initialize the Memcached session storage.

        Args:
            prefix: Prefix for all session keys
            expiration: Default session expiration in seconds
            host: Memcached host
            port: Memcached port
            pool_size: Memcached connection pool size
        """
        super().__init__(prefix=prefix, expiration=expiration)

        self.client = MemcachedClient(
            host=host,
            port=port,
            pool_size=pool_size,
        )

        self.user_sessions_prefix = f"{prefix}user:"

    def _encode_key(self, key: str) -> bytes:
        """Encode a key for Memcached.

        Memcached has a 250 byte key limit, so we hash long keys.

        Args:
            key: The key to encode

        Returns:
            The encoded key as bytes
        """
        if len(key) > 240:
            key_hash = hashlib.md5(key.encode()).hexdigest()
            key = f"{key[:200]}:{key_hash}"
        return key.encode("utf-8")

    def get_user_sessions_key(self, user_id: int) -> str:
        """Get the key for a user's sessions.

        Args:
            user_id: The user ID

        Returns:
            The Memcached key for the user's sessions
        """
        return f"{self.user_sessions_prefix}{user_id}"

    async def create(
        self,
        data: T,
        session_id: Optional[str] = None,
        expiration: Optional[int] = None,
    ) -> str:
        """Create a new session in Memcached.

        Args:
            data: Session data (must be a Pydantic model)
            session_id: Optional session ID. If not provided, one will be generated
            expiration: Optional custom expiration in seconds

        Returns:
            The session ID
        """
        if session_id is None:
            session_id = self.generate_session_id()

        key = self.get_key(session_id)
        exp = expiration if expiration is not None else self.expiration

        json_data = data.model_dump_json().encode("utf-8")

        try:
            await self.client.set(self._encode_key(key), json_data, exptime=exp)

            if hasattr(data, "user_id"):
                user_id = data.user_id
                user_sessions_key = self.get_user_sessions_key(user_id)

                user_sessions_data = await self.client.get(
                    self._encode_key(user_sessions_key)
                )

                if user_sessions_data:
                    try:
                        user_sessions = json.loads(user_sessions_data.decode("utf-8"))
                        if session_id not in user_sessions:
                            user_sessions.append(session_id)
                    except (json.JSONDecodeError, UnicodeDecodeError):
                        user_sessions = [session_id]
                else:
                    user_sessions = [session_id]

                user_sessions_json = json.dumps(user_sessions).encode("utf-8")
                await self.client.set(
                    self._encode_key(user_sessions_key),
                    user_sessions_json,
                    exptime=exp + 3600,
                )

            logger.debug(f"Created session {session_id} with expiration {exp}s")
            return session_id
        except Exception as e:
            logger.error(f"Error creating session: {e}")
            raise

    async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
        """Get session data from Memcached.

        Args:
            session_id: The session ID
            model_class: The Pydantic model class to decode the data into

        Returns:
            The session data or None if session doesn't exist
        """
        key = self.get_key(session_id)

        try:
            data = await self.client.get(self._encode_key(key))
            if data is None:
                return None

            try:
                json_data = json.loads(data.decode("utf-8"))
                return model_class.model_validate(json_data)
            except (json.JSONDecodeError, UnicodeDecodeError) as e:
                logger.error(f"Error parsing session data: {e}")
                return None

        except Exception as e:
            logger.error(f"Error getting session: {e}")
            raise

    async def update(
        self,
        session_id: str,
        data: T,
        reset_expiration: bool = True,
        expiration: Optional[int] = None,
    ) -> bool:
        """Update session data in Memcached.

        Args:
            session_id: The session ID
            data: New session data
            reset_expiration: Whether to reset the expiration
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was updated, False if it didn't exist
        """
        key = self.get_key(session_id)

        try:
            if not await self.client.get(self._encode_key(key)):
                return False

            json_data = data.model_dump_json().encode("utf-8")
            exp = expiration if expiration is not None else self.expiration

            await self.client.set(self._encode_key(key), json_data, exptime=exp)

            if reset_expiration and hasattr(data, "user_id"):
                user_id = data.user_id
                user_sessions_key = self.get_user_sessions_key(user_id)

                user_sessions_data = await self.client.get(
                    self._encode_key(user_sessions_key)
                )

                if user_sessions_data:
                    try:
                        user_sessions = json.loads(user_sessions_data.decode("utf-8"))
                        user_sessions_json = json.dumps(user_sessions).encode("utf-8")
                        await self.client.set(
                            self._encode_key(user_sessions_key),
                            user_sessions_json,
                            exptime=exp + 3600,
                        )
                    except (json.JSONDecodeError, UnicodeDecodeError):
                        pass

            return True

        except Exception as e:
            logger.error(f"Error updating session: {e}")
            raise

    async def delete(self, session_id: str) -> bool:
        """Delete a session from Memcached.

        Args:
            session_id: The session ID

        Returns:
            True if the session was deleted, False if it didn't exist
        """
        key = self.get_key(session_id)

        try:
            session_data = await self.client.get(self._encode_key(key))
            if session_data is None:
                return False

            await self.client.delete(self._encode_key(key))

            try:
                json_data = json.loads(session_data.decode("utf-8"))
                if "user_id" in json_data:
                    user_id = json_data["user_id"]
                    user_sessions_key = self.get_user_sessions_key(user_id)

                    user_sessions_data = await self.client.get(
                        self._encode_key(user_sessions_key)
                    )

                    if user_sessions_data:
                        try:
                            user_sessions = json.loads(
                                user_sessions_data.decode("utf-8")
                            )
                            if session_id in user_sessions:
                                user_sessions.remove(session_id)
                                user_sessions_json = json.dumps(user_sessions).encode(
                                    "utf-8"
                                )
                                await self.client.set(
                                    self._encode_key(user_sessions_key),
                                    user_sessions_json,
                                    exptime=3600 * 24,
                                )
                        except (json.JSONDecodeError, UnicodeDecodeError):
                            pass
            except (json.JSONDecodeError, UnicodeDecodeError):
                pass

            return True
        except Exception as e:
            logger.error(f"Error deleting session: {e}")
            raise

    async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
        """Extend the expiration of a session in Memcached.

        Args:
            session_id: The session ID
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was extended, False if it didn't exist

        Note:
            Memcached doesn't allow extending expiration without updating the value.
            We need to get, then set the value again with a new expiration.
        """
        key = self.get_key(session_id)
        exp = expiration if expiration is not None else self.expiration

        try:
            session_data = await self.client.get(self._encode_key(key))
            if session_data is None:
                return False

            await self.client.set(self._encode_key(key), session_data, exptime=exp)

            try:
                json_data = json.loads(session_data.decode("utf-8"))
                if "user_id" in json_data:
                    user_id = json_data["user_id"]
                    user_sessions_key = self.get_user_sessions_key(user_id)

                    user_sessions_data = await self.client.get(
                        self._encode_key(user_sessions_key)
                    )

                    if user_sessions_data:
                        await self.client.set(
                            self._encode_key(user_sessions_key),
                            user_sessions_data,
                            exptime=exp + 3600,
                        )
            except (json.JSONDecodeError, UnicodeDecodeError):
                pass

            return True
        except Exception as e:
            logger.error(f"Error extending session: {e}")
            raise

    async def exists(self, session_id: str) -> bool:
        """Check if a session exists in Memcached.

        Args:
            session_id: The session ID

        Returns:
            True if the session exists, False otherwise
        """
        key = self.get_key(session_id)

        try:
            data = await self.client.get(self._encode_key(key))
            return data is not None
        except Exception as e:
            logger.error(f"Error checking session existence: {e}")
            raise

    async def get_user_sessions(self, user_id: int) -> list[str]:
        """Get all session IDs for a user.

        Args:
            user_id: The user ID

        Returns:
            List of session IDs for the user
        """
        user_sessions_key = self.get_user_sessions_key(user_id)

        try:
            data = await self.client.get(self._encode_key(user_sessions_key))
            if data is None:
                return []

            try:
                user_sessions = json.loads(data.decode("utf-8"))
                if isinstance(user_sessions, list):
                    return [str(session_id) for session_id in user_sessions]
                else:
                    logger.error(f"User sessions data is not a list: {user_sessions}")
                    return []
            except (json.JSONDecodeError, UnicodeDecodeError) as e:
                logger.error(f"Error parsing user sessions data: {e}")
                return []
        except Exception as e:
            logger.error(f"Error getting user sessions: {e}")
            raise

    async def close(self) -> None:
        """Close the Memcached connection."""
        await self.client.close()

__init__(prefix='session:', expiration=1800, host='localhost', port=11211, pool_size=10)

Initialize the Memcached session storage.

Parameters:

Name Type Description Default
prefix str

Prefix for all session keys

'session:'
expiration int

Default session expiration in seconds

1800
host str

Memcached host

'localhost'
port int

Memcached port

11211
pool_size int

Memcached connection pool size

10
Source code in crudadmin/session/backends/memcached.py
def __init__(
    self,
    prefix: str = "session:",
    expiration: int = 1800,
    host: str = "localhost",
    port: int = 11211,
    pool_size: int = 10,
):
    """Initialize the Memcached session storage.

    Args:
        prefix: Prefix for all session keys
        expiration: Default session expiration in seconds
        host: Memcached host
        port: Memcached port
        pool_size: Memcached connection pool size
    """
    super().__init__(prefix=prefix, expiration=expiration)

    self.client = MemcachedClient(
        host=host,
        port=port,
        pool_size=pool_size,
    )

    self.user_sessions_prefix = f"{prefix}user:"

close() async

Close the Memcached connection.

Source code in crudadmin/session/backends/memcached.py
async def close(self) -> None:
    """Close the Memcached connection."""
    await self.client.close()

create(data, session_id=None, expiration=None) async

Create a new session in Memcached.

Parameters:

Name Type Description Default
data T

Session data (must be a Pydantic model)

required
session_id Optional[str]

Optional session ID. If not provided, one will be generated

None
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
str

The session ID

Source code in crudadmin/session/backends/memcached.py
async def create(
    self,
    data: T,
    session_id: Optional[str] = None,
    expiration: Optional[int] = None,
) -> str:
    """Create a new session in Memcached.

    Args:
        data: Session data (must be a Pydantic model)
        session_id: Optional session ID. If not provided, one will be generated
        expiration: Optional custom expiration in seconds

    Returns:
        The session ID
    """
    if session_id is None:
        session_id = self.generate_session_id()

    key = self.get_key(session_id)
    exp = expiration if expiration is not None else self.expiration

    json_data = data.model_dump_json().encode("utf-8")

    try:
        await self.client.set(self._encode_key(key), json_data, exptime=exp)

        if hasattr(data, "user_id"):
            user_id = data.user_id
            user_sessions_key = self.get_user_sessions_key(user_id)

            user_sessions_data = await self.client.get(
                self._encode_key(user_sessions_key)
            )

            if user_sessions_data:
                try:
                    user_sessions = json.loads(user_sessions_data.decode("utf-8"))
                    if session_id not in user_sessions:
                        user_sessions.append(session_id)
                except (json.JSONDecodeError, UnicodeDecodeError):
                    user_sessions = [session_id]
            else:
                user_sessions = [session_id]

            user_sessions_json = json.dumps(user_sessions).encode("utf-8")
            await self.client.set(
                self._encode_key(user_sessions_key),
                user_sessions_json,
                exptime=exp + 3600,
            )

        logger.debug(f"Created session {session_id} with expiration {exp}s")
        return session_id
    except Exception as e:
        logger.error(f"Error creating session: {e}")
        raise

delete(session_id) async

Delete a session from Memcached.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session was deleted, False if it didn't exist

Source code in crudadmin/session/backends/memcached.py
async def delete(self, session_id: str) -> bool:
    """Delete a session from Memcached.

    Args:
        session_id: The session ID

    Returns:
        True if the session was deleted, False if it didn't exist
    """
    key = self.get_key(session_id)

    try:
        session_data = await self.client.get(self._encode_key(key))
        if session_data is None:
            return False

        await self.client.delete(self._encode_key(key))

        try:
            json_data = json.loads(session_data.decode("utf-8"))
            if "user_id" in json_data:
                user_id = json_data["user_id"]
                user_sessions_key = self.get_user_sessions_key(user_id)

                user_sessions_data = await self.client.get(
                    self._encode_key(user_sessions_key)
                )

                if user_sessions_data:
                    try:
                        user_sessions = json.loads(
                            user_sessions_data.decode("utf-8")
                        )
                        if session_id in user_sessions:
                            user_sessions.remove(session_id)
                            user_sessions_json = json.dumps(user_sessions).encode(
                                "utf-8"
                            )
                            await self.client.set(
                                self._encode_key(user_sessions_key),
                                user_sessions_json,
                                exptime=3600 * 24,
                            )
                    except (json.JSONDecodeError, UnicodeDecodeError):
                        pass
        except (json.JSONDecodeError, UnicodeDecodeError):
            pass

        return True
    except Exception as e:
        logger.error(f"Error deleting session: {e}")
        raise

exists(session_id) async

Check if a session exists in Memcached.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session exists, False otherwise

Source code in crudadmin/session/backends/memcached.py
async def exists(self, session_id: str) -> bool:
    """Check if a session exists in Memcached.

    Args:
        session_id: The session ID

    Returns:
        True if the session exists, False otherwise
    """
    key = self.get_key(session_id)

    try:
        data = await self.client.get(self._encode_key(key))
        return data is not None
    except Exception as e:
        logger.error(f"Error checking session existence: {e}")
        raise

extend(session_id, expiration=None) async

Extend the expiration of a session in Memcached.

Parameters:

Name Type Description Default
session_id str

The session ID

required
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was extended, False if it didn't exist

Note

Memcached doesn't allow extending expiration without updating the value. We need to get, then set the value again with a new expiration.

Source code in crudadmin/session/backends/memcached.py
async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
    """Extend the expiration of a session in Memcached.

    Args:
        session_id: The session ID
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was extended, False if it didn't exist

    Note:
        Memcached doesn't allow extending expiration without updating the value.
        We need to get, then set the value again with a new expiration.
    """
    key = self.get_key(session_id)
    exp = expiration if expiration is not None else self.expiration

    try:
        session_data = await self.client.get(self._encode_key(key))
        if session_data is None:
            return False

        await self.client.set(self._encode_key(key), session_data, exptime=exp)

        try:
            json_data = json.loads(session_data.decode("utf-8"))
            if "user_id" in json_data:
                user_id = json_data["user_id"]
                user_sessions_key = self.get_user_sessions_key(user_id)

                user_sessions_data = await self.client.get(
                    self._encode_key(user_sessions_key)
                )

                if user_sessions_data:
                    await self.client.set(
                        self._encode_key(user_sessions_key),
                        user_sessions_data,
                        exptime=exp + 3600,
                    )
        except (json.JSONDecodeError, UnicodeDecodeError):
            pass

        return True
    except Exception as e:
        logger.error(f"Error extending session: {e}")
        raise

get(session_id, model_class) async

Get session data from Memcached.

Parameters:

Name Type Description Default
session_id str

The session ID

required
model_class type[T]

The Pydantic model class to decode the data into

required

Returns:

Type Description
Optional[T]

The session data or None if session doesn't exist

Source code in crudadmin/session/backends/memcached.py
async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
    """Get session data from Memcached.

    Args:
        session_id: The session ID
        model_class: The Pydantic model class to decode the data into

    Returns:
        The session data or None if session doesn't exist
    """
    key = self.get_key(session_id)

    try:
        data = await self.client.get(self._encode_key(key))
        if data is None:
            return None

        try:
            json_data = json.loads(data.decode("utf-8"))
            return model_class.model_validate(json_data)
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            logger.error(f"Error parsing session data: {e}")
            return None

    except Exception as e:
        logger.error(f"Error getting session: {e}")
        raise

get_user_sessions(user_id) async

Get all session IDs for a user.

Parameters:

Name Type Description Default
user_id int

The user ID

required

Returns:

Type Description
list[str]

List of session IDs for the user

Source code in crudadmin/session/backends/memcached.py
async def get_user_sessions(self, user_id: int) -> list[str]:
    """Get all session IDs for a user.

    Args:
        user_id: The user ID

    Returns:
        List of session IDs for the user
    """
    user_sessions_key = self.get_user_sessions_key(user_id)

    try:
        data = await self.client.get(self._encode_key(user_sessions_key))
        if data is None:
            return []

        try:
            user_sessions = json.loads(data.decode("utf-8"))
            if isinstance(user_sessions, list):
                return [str(session_id) for session_id in user_sessions]
            else:
                logger.error(f"User sessions data is not a list: {user_sessions}")
                return []
        except (json.JSONDecodeError, UnicodeDecodeError) as e:
            logger.error(f"Error parsing user sessions data: {e}")
            return []
    except Exception as e:
        logger.error(f"Error getting user sessions: {e}")
        raise

get_user_sessions_key(user_id)

Get the key for a user's sessions.

Parameters:

Name Type Description Default
user_id int

The user ID

required

Returns:

Type Description
str

The Memcached key for the user's sessions

Source code in crudadmin/session/backends/memcached.py
def get_user_sessions_key(self, user_id: int) -> str:
    """Get the key for a user's sessions.

    Args:
        user_id: The user ID

    Returns:
        The Memcached key for the user's sessions
    """
    return f"{self.user_sessions_prefix}{user_id}"

update(session_id, data, reset_expiration=True, expiration=None) async

Update session data in Memcached.

Parameters:

Name Type Description Default
session_id str

The session ID

required
data T

New session data

required
reset_expiration bool

Whether to reset the expiration

True
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was updated, False if it didn't exist

Source code in crudadmin/session/backends/memcached.py
async def update(
    self,
    session_id: str,
    data: T,
    reset_expiration: bool = True,
    expiration: Optional[int] = None,
) -> bool:
    """Update session data in Memcached.

    Args:
        session_id: The session ID
        data: New session data
        reset_expiration: Whether to reset the expiration
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was updated, False if it didn't exist
    """
    key = self.get_key(session_id)

    try:
        if not await self.client.get(self._encode_key(key)):
            return False

        json_data = data.model_dump_json().encode("utf-8")
        exp = expiration if expiration is not None else self.expiration

        await self.client.set(self._encode_key(key), json_data, exptime=exp)

        if reset_expiration and hasattr(data, "user_id"):
            user_id = data.user_id
            user_sessions_key = self.get_user_sessions_key(user_id)

            user_sessions_data = await self.client.get(
                self._encode_key(user_sessions_key)
            )

            if user_sessions_data:
                try:
                    user_sessions = json.loads(user_sessions_data.decode("utf-8"))
                    user_sessions_json = json.dumps(user_sessions).encode("utf-8")
                    await self.client.set(
                        self._encode_key(user_sessions_key),
                        user_sessions_json,
                        exptime=exp + 3600,
                    )
                except (json.JSONDecodeError, UnicodeDecodeError):
                    pass

        return True

    except Exception as e:
        logger.error(f"Error updating session: {e}")
        raise

Database Storage

Bases: AbstractSessionStorage[T]

Database implementation of session storage using AdminSession table.

Source code in crudadmin/session/backends/database.py
class DatabaseSessionStorage(AbstractSessionStorage[T]):
    """Database implementation of session storage using AdminSession table."""

    def __init__(
        self,
        db_config: DatabaseConfig,
        prefix: str = "session:",
        expiration: int = 1800,
    ):
        """Initialize the Database session storage.

        Args:
            db_config: Database configuration instance
            prefix: Prefix for all session keys (kept for compatibility)
            expiration: Default session expiration in seconds (used for cleanup)
        """
        super().__init__(prefix=prefix, expiration=expiration)
        self.db_config = db_config

    async def _get_db(self) -> AsyncSession:
        """Get database session."""
        return self.db_config.get_admin_session()

    async def create(
        self,
        data: T,
        session_id: Optional[str] = None,
        expiration: Optional[int] = None,
    ) -> str:
        """Create a new session in the database.

        Args:
            data: Session data (must be a SessionData-compatible model)
            session_id: Optional session ID. If not provided, one will be generated
            expiration: Optional custom expiration in seconds (stored but not enforced)

        Returns:
            The session ID
        """
        if session_id is None:
            session_id = self.generate_session_id()

        db = await self._get_db()

        try:
            if hasattr(data, "model_dump"):
                data_dict = data.model_dump()
            else:
                data_dict = data.__dict__

            session_create = AdminSessionCreate(
                user_id=data_dict.get("user_id") or 0,
                session_id=session_id,
                ip_address=data_dict.get("ip_address", ""),
                user_agent=data_dict.get("user_agent", ""),
                device_info=data_dict.get("device_info", {}),
                session_metadata=data_dict.get("metadata", {}),
                is_active=data_dict.get("is_active", True),
                created_at=data_dict.get("created_at", datetime.now(UTC)),
                last_activity=data_dict.get("last_activity", datetime.now(UTC)),
            )

            await self.db_config.crud_sessions.create(db=db, object=session_create)
            await db.commit()

            logger.debug(f"Created session {session_id} in database")
            return session_id

        except Exception as e:
            await db.rollback()
            logger.error(f"Error creating session in database: {e}")
            raise

    async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
        """Get session data from the database.

        Args:
            session_id: The session ID
            model_class: The Pydantic model class to decode the data into

        Returns:
            The session data or None if session doesn't exist
        """
        db = await self._get_db()

        try:
            session_record = await self.db_config.crud_sessions.get(
                db=db, session_id=session_id
            )

            if not session_record:
                return None

            session_dict: dict[str, Any]
            if hasattr(session_record, "user_id"):
                assert not isinstance(session_record, dict), (
                    "Expected AdminSessionRead object"
                )
                session_dict = {
                    "user_id": session_record.user_id,
                    "session_id": session_record.session_id,
                    "ip_address": session_record.ip_address,
                    "user_agent": session_record.user_agent,
                    "device_info": session_record.device_info,
                    "created_at": session_record.created_at.replace(tzinfo=UTC)
                    if session_record.created_at.tzinfo is None
                    else session_record.created_at,
                    "last_activity": session_record.last_activity.replace(tzinfo=UTC)
                    if session_record.last_activity.tzinfo is None
                    else session_record.last_activity,
                    "is_active": session_record.is_active,
                    "metadata": session_record.session_metadata,
                }
            elif isinstance(session_record, dict):
                created_at = session_record.get("created_at")
                last_activity = session_record.get("last_activity")

                if (
                    created_at
                    and hasattr(created_at, "tzinfo")
                    and created_at.tzinfo is None
                ):
                    created_at = created_at.replace(tzinfo=UTC)
                if (
                    last_activity
                    and hasattr(last_activity, "tzinfo")
                    and last_activity.tzinfo is None
                ):
                    last_activity = last_activity.replace(tzinfo=UTC)

                session_dict = {
                    "user_id": session_record.get("user_id"),
                    "session_id": session_record.get("session_id"),
                    "ip_address": session_record.get("ip_address", ""),
                    "user_agent": session_record.get("user_agent", ""),
                    "device_info": session_record.get("device_info", {}),
                    "created_at": created_at,
                    "last_activity": last_activity,
                    "is_active": session_record.get("is_active", True),
                    "metadata": session_record.get("session_metadata", {}),
                }
            else:
                return None

            return model_class.model_validate(session_dict)

        except Exception as e:
            logger.error(f"Error getting session from database: {e}")
            return None

    async def update(
        self,
        session_id: str,
        data: T,
        reset_expiration: bool = True,
        expiration: Optional[int] = None,
    ) -> bool:
        """Update session data in the database.

        Args:
            session_id: The session ID
            data: New session data
            reset_expiration: Whether to reset the expiration (updates last_activity)
            expiration: Optional custom expiration in seconds (ignored for database)

        Returns:
            True if the session was updated, False if it didn't exist
        """
        db = await self._get_db()

        try:
            existing = await self.db_config.crud_sessions.get(
                db=db, session_id=session_id
            )
            if not existing:
                return False

            if hasattr(data, "model_dump"):
                data_dict = data.model_dump()
            else:
                data_dict = data.__dict__

            update_data = AdminSessionUpdate(
                last_activity=data_dict.get("last_activity", datetime.now(UTC))
                if reset_expiration
                else None,
                is_active=data_dict.get("is_active"),
                session_metadata=data_dict.get("metadata"),
            )

            update_dict = {
                k: v for k, v in update_data.model_dump().items() if v is not None
            }

            if update_dict:
                await self.db_config.crud_sessions.update(
                    db=db, object=update_dict, session_id=session_id
                )
                await db.commit()

            return True

        except Exception as e:
            await db.rollback()
            logger.error(f"Error updating session in database: {e}")
            return False

    async def delete(self, session_id: str) -> bool:
        """Delete a session from the database.

        Args:
            session_id: The session ID

        Returns:
            True if the session was deleted, False if it didn't exist
        """
        db = await self._get_db()

        try:
            existing = await self.db_config.crud_sessions.get(
                db=db, session_id=session_id
            )
            if not existing:
                return False

            update_data = AdminSessionUpdate(
                is_active=False,
                last_activity=datetime.now(UTC),
            )

            await self.db_config.crud_sessions.update(
                db=db, object=update_data.model_dump(), session_id=session_id
            )
            await db.commit()

            logger.debug(f"Marked session {session_id} as inactive in database")
            return True

        except Exception as e:
            await db.rollback()
            logger.error(f"Error deleting session from database: {e}")
            return False

    async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
        """Extend the expiration of a session.

        Args:
            session_id: The session ID
            expiration: Optional custom expiration in seconds (ignored for database)

        Returns:
            True if the session was extended, False if it didn't exist
        """
        db = await self._get_db()

        try:
            existing = await self.db_config.crud_sessions.get(
                db=db, session_id=session_id
            )
            if not existing:
                return False

            update_data = AdminSessionUpdate(
                last_activity=datetime.now(UTC),
            )

            await self.db_config.crud_sessions.update(
                db=db, object=update_data.model_dump(), session_id=session_id
            )
            await db.commit()

            return True

        except Exception as e:
            await db.rollback()
            logger.error(f"Error extending session in database: {e}")
            return False

    async def exists(self, session_id: str) -> bool:
        """Check if a session exists in the database.

        Args:
            session_id: The session ID

        Returns:
            True if the session exists, False otherwise
        """
        db = await self._get_db()

        try:
            session_record = await self.db_config.crud_sessions.get(
                db=db, session_id=session_id
            )
            return session_record is not None

        except Exception as e:
            logger.error(f"Error checking session existence in database: {e}")
            return False

    async def get_user_sessions(self, user_id: int) -> list[str]:
        """Get all active session IDs for a user.

        Args:
            user_id: The user ID

        Returns:
            List of session IDs for the user
        """
        db = await self._get_db()

        try:
            sessions = await self.db_config.crud_sessions.get_multi(
                db=db, user_id=user_id, is_active=True
            )

            session_data: list[Any] = []
            if isinstance(sessions, dict) and "data" in sessions:
                session_data = sessions["data"]  # type: ignore[assignment]
            elif isinstance(sessions, list):
                session_data = sessions  # type: ignore[assignment]
            elif isinstance(sessions, int):
                return []
            elif hasattr(sessions, "__iter__") and not isinstance(
                sessions, (str, bytes)
            ):
                try:
                    session_data = list(sessions)
                except (TypeError, ValueError):
                    logger.warning(
                        f"Could not convert sessions to list: {type(sessions)}"
                    )
                    return []
            else:
                logger.warning(f"Unexpected sessions format: {type(sessions)}")
                return []

            session_ids = []
            for session in session_data:
                if hasattr(session, "session_id"):
                    session_ids.append(session.session_id)
                elif isinstance(session, dict) and "session_id" in session:
                    session_ids.append(session["session_id"])

            return session_ids

        except Exception as e:
            logger.error(f"Error getting user sessions from database: {e}")
            return []

    async def close(self) -> None:
        """Close the database connection (no-op for database storage)."""
        pass

__init__(db_config, prefix='session:', expiration=1800)

Initialize the Database session storage.

Parameters:

Name Type Description Default
db_config DatabaseConfig

Database configuration instance

required
prefix str

Prefix for all session keys (kept for compatibility)

'session:'
expiration int

Default session expiration in seconds (used for cleanup)

1800
Source code in crudadmin/session/backends/database.py
def __init__(
    self,
    db_config: DatabaseConfig,
    prefix: str = "session:",
    expiration: int = 1800,
):
    """Initialize the Database session storage.

    Args:
        db_config: Database configuration instance
        prefix: Prefix for all session keys (kept for compatibility)
        expiration: Default session expiration in seconds (used for cleanup)
    """
    super().__init__(prefix=prefix, expiration=expiration)
    self.db_config = db_config

close() async

Close the database connection (no-op for database storage).

Source code in crudadmin/session/backends/database.py
async def close(self) -> None:
    """Close the database connection (no-op for database storage)."""
    pass

create(data, session_id=None, expiration=None) async

Create a new session in the database.

Parameters:

Name Type Description Default
data T

Session data (must be a SessionData-compatible model)

required
session_id Optional[str]

Optional session ID. If not provided, one will be generated

None
expiration Optional[int]

Optional custom expiration in seconds (stored but not enforced)

None

Returns:

Type Description
str

The session ID

Source code in crudadmin/session/backends/database.py
async def create(
    self,
    data: T,
    session_id: Optional[str] = None,
    expiration: Optional[int] = None,
) -> str:
    """Create a new session in the database.

    Args:
        data: Session data (must be a SessionData-compatible model)
        session_id: Optional session ID. If not provided, one will be generated
        expiration: Optional custom expiration in seconds (stored but not enforced)

    Returns:
        The session ID
    """
    if session_id is None:
        session_id = self.generate_session_id()

    db = await self._get_db()

    try:
        if hasattr(data, "model_dump"):
            data_dict = data.model_dump()
        else:
            data_dict = data.__dict__

        session_create = AdminSessionCreate(
            user_id=data_dict.get("user_id") or 0,
            session_id=session_id,
            ip_address=data_dict.get("ip_address", ""),
            user_agent=data_dict.get("user_agent", ""),
            device_info=data_dict.get("device_info", {}),
            session_metadata=data_dict.get("metadata", {}),
            is_active=data_dict.get("is_active", True),
            created_at=data_dict.get("created_at", datetime.now(UTC)),
            last_activity=data_dict.get("last_activity", datetime.now(UTC)),
        )

        await self.db_config.crud_sessions.create(db=db, object=session_create)
        await db.commit()

        logger.debug(f"Created session {session_id} in database")
        return session_id

    except Exception as e:
        await db.rollback()
        logger.error(f"Error creating session in database: {e}")
        raise

delete(session_id) async

Delete a session from the database.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session was deleted, False if it didn't exist

Source code in crudadmin/session/backends/database.py
async def delete(self, session_id: str) -> bool:
    """Delete a session from the database.

    Args:
        session_id: The session ID

    Returns:
        True if the session was deleted, False if it didn't exist
    """
    db = await self._get_db()

    try:
        existing = await self.db_config.crud_sessions.get(
            db=db, session_id=session_id
        )
        if not existing:
            return False

        update_data = AdminSessionUpdate(
            is_active=False,
            last_activity=datetime.now(UTC),
        )

        await self.db_config.crud_sessions.update(
            db=db, object=update_data.model_dump(), session_id=session_id
        )
        await db.commit()

        logger.debug(f"Marked session {session_id} as inactive in database")
        return True

    except Exception as e:
        await db.rollback()
        logger.error(f"Error deleting session from database: {e}")
        return False

exists(session_id) async

Check if a session exists in the database.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session exists, False otherwise

Source code in crudadmin/session/backends/database.py
async def exists(self, session_id: str) -> bool:
    """Check if a session exists in the database.

    Args:
        session_id: The session ID

    Returns:
        True if the session exists, False otherwise
    """
    db = await self._get_db()

    try:
        session_record = await self.db_config.crud_sessions.get(
            db=db, session_id=session_id
        )
        return session_record is not None

    except Exception as e:
        logger.error(f"Error checking session existence in database: {e}")
        return False

extend(session_id, expiration=None) async

Extend the expiration of a session.

Parameters:

Name Type Description Default
session_id str

The session ID

required
expiration Optional[int]

Optional custom expiration in seconds (ignored for database)

None

Returns:

Type Description
bool

True if the session was extended, False if it didn't exist

Source code in crudadmin/session/backends/database.py
async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
    """Extend the expiration of a session.

    Args:
        session_id: The session ID
        expiration: Optional custom expiration in seconds (ignored for database)

    Returns:
        True if the session was extended, False if it didn't exist
    """
    db = await self._get_db()

    try:
        existing = await self.db_config.crud_sessions.get(
            db=db, session_id=session_id
        )
        if not existing:
            return False

        update_data = AdminSessionUpdate(
            last_activity=datetime.now(UTC),
        )

        await self.db_config.crud_sessions.update(
            db=db, object=update_data.model_dump(), session_id=session_id
        )
        await db.commit()

        return True

    except Exception as e:
        await db.rollback()
        logger.error(f"Error extending session in database: {e}")
        return False

get(session_id, model_class) async

Get session data from the database.

Parameters:

Name Type Description Default
session_id str

The session ID

required
model_class type[T]

The Pydantic model class to decode the data into

required

Returns:

Type Description
Optional[T]

The session data or None if session doesn't exist

Source code in crudadmin/session/backends/database.py
async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
    """Get session data from the database.

    Args:
        session_id: The session ID
        model_class: The Pydantic model class to decode the data into

    Returns:
        The session data or None if session doesn't exist
    """
    db = await self._get_db()

    try:
        session_record = await self.db_config.crud_sessions.get(
            db=db, session_id=session_id
        )

        if not session_record:
            return None

        session_dict: dict[str, Any]
        if hasattr(session_record, "user_id"):
            assert not isinstance(session_record, dict), (
                "Expected AdminSessionRead object"
            )
            session_dict = {
                "user_id": session_record.user_id,
                "session_id": session_record.session_id,
                "ip_address": session_record.ip_address,
                "user_agent": session_record.user_agent,
                "device_info": session_record.device_info,
                "created_at": session_record.created_at.replace(tzinfo=UTC)
                if session_record.created_at.tzinfo is None
                else session_record.created_at,
                "last_activity": session_record.last_activity.replace(tzinfo=UTC)
                if session_record.last_activity.tzinfo is None
                else session_record.last_activity,
                "is_active": session_record.is_active,
                "metadata": session_record.session_metadata,
            }
        elif isinstance(session_record, dict):
            created_at = session_record.get("created_at")
            last_activity = session_record.get("last_activity")

            if (
                created_at
                and hasattr(created_at, "tzinfo")
                and created_at.tzinfo is None
            ):
                created_at = created_at.replace(tzinfo=UTC)
            if (
                last_activity
                and hasattr(last_activity, "tzinfo")
                and last_activity.tzinfo is None
            ):
                last_activity = last_activity.replace(tzinfo=UTC)

            session_dict = {
                "user_id": session_record.get("user_id"),
                "session_id": session_record.get("session_id"),
                "ip_address": session_record.get("ip_address", ""),
                "user_agent": session_record.get("user_agent", ""),
                "device_info": session_record.get("device_info", {}),
                "created_at": created_at,
                "last_activity": last_activity,
                "is_active": session_record.get("is_active", True),
                "metadata": session_record.get("session_metadata", {}),
            }
        else:
            return None

        return model_class.model_validate(session_dict)

    except Exception as e:
        logger.error(f"Error getting session from database: {e}")
        return None

get_user_sessions(user_id) async

Get all active session IDs for a user.

Parameters:

Name Type Description Default
user_id int

The user ID

required

Returns:

Type Description
list[str]

List of session IDs for the user

Source code in crudadmin/session/backends/database.py
async def get_user_sessions(self, user_id: int) -> list[str]:
    """Get all active session IDs for a user.

    Args:
        user_id: The user ID

    Returns:
        List of session IDs for the user
    """
    db = await self._get_db()

    try:
        sessions = await self.db_config.crud_sessions.get_multi(
            db=db, user_id=user_id, is_active=True
        )

        session_data: list[Any] = []
        if isinstance(sessions, dict) and "data" in sessions:
            session_data = sessions["data"]  # type: ignore[assignment]
        elif isinstance(sessions, list):
            session_data = sessions  # type: ignore[assignment]
        elif isinstance(sessions, int):
            return []
        elif hasattr(sessions, "__iter__") and not isinstance(
            sessions, (str, bytes)
        ):
            try:
                session_data = list(sessions)
            except (TypeError, ValueError):
                logger.warning(
                    f"Could not convert sessions to list: {type(sessions)}"
                )
                return []
        else:
            logger.warning(f"Unexpected sessions format: {type(sessions)}")
            return []

        session_ids = []
        for session in session_data:
            if hasattr(session, "session_id"):
                session_ids.append(session.session_id)
            elif isinstance(session, dict) and "session_id" in session:
                session_ids.append(session["session_id"])

        return session_ids

    except Exception as e:
        logger.error(f"Error getting user sessions from database: {e}")
        return []

update(session_id, data, reset_expiration=True, expiration=None) async

Update session data in the database.

Parameters:

Name Type Description Default
session_id str

The session ID

required
data T

New session data

required
reset_expiration bool

Whether to reset the expiration (updates last_activity)

True
expiration Optional[int]

Optional custom expiration in seconds (ignored for database)

None

Returns:

Type Description
bool

True if the session was updated, False if it didn't exist

Source code in crudadmin/session/backends/database.py
async def update(
    self,
    session_id: str,
    data: T,
    reset_expiration: bool = True,
    expiration: Optional[int] = None,
) -> bool:
    """Update session data in the database.

    Args:
        session_id: The session ID
        data: New session data
        reset_expiration: Whether to reset the expiration (updates last_activity)
        expiration: Optional custom expiration in seconds (ignored for database)

    Returns:
        True if the session was updated, False if it didn't exist
    """
    db = await self._get_db()

    try:
        existing = await self.db_config.crud_sessions.get(
            db=db, session_id=session_id
        )
        if not existing:
            return False

        if hasattr(data, "model_dump"):
            data_dict = data.model_dump()
        else:
            data_dict = data.__dict__

        update_data = AdminSessionUpdate(
            last_activity=data_dict.get("last_activity", datetime.now(UTC))
            if reset_expiration
            else None,
            is_active=data_dict.get("is_active"),
            session_metadata=data_dict.get("metadata"),
        )

        update_dict = {
            k: v for k, v in update_data.model_dump().items() if v is not None
        }

        if update_dict:
            await self.db_config.crud_sessions.update(
                db=db, object=update_dict, session_id=session_id
            )
            await db.commit()

        return True

    except Exception as e:
        await db.rollback()
        logger.error(f"Error updating session in database: {e}")
        return False

Hybrid Storage

Bases: AbstractSessionStorage[T]

Hybrid storage: Redis for active sessions + Database for audit trail.

Source code in crudadmin/session/backends/hybrid.py
class HybridSessionStorage(AbstractSessionStorage[T]):
    """Hybrid storage: Redis for active sessions + Database for audit trail."""

    def __init__(
        self,
        redis_storage: AbstractSessionStorage[T],
        database_storage: AbstractSessionStorage[T],
        prefix: str = "session:",
        expiration: int = 1800,
    ):
        """Initialize the Hybrid session storage.

        Args:
            redis_storage: Redis storage instance for active sessions
            database_storage: Database storage instance for audit trail
            prefix: Prefix for all session keys (inherited from redis_storage)
            expiration: Default session expiration in seconds
        """
        super().__init__(prefix=prefix, expiration=expiration)
        self.redis_storage = redis_storage
        self.database_storage = database_storage

    async def create(
        self,
        data: T,
        session_id: Optional[str] = None,
        expiration: Optional[int] = None,
    ) -> str:
        """Create a new session in both Redis and Database.

        Args:
            data: Session data (must be a Pydantic model)
            session_id: Optional session ID. If not provided, one will be generated
            expiration: Optional custom expiration in seconds

        Returns:
            The session ID
        """
        session_id = await self.redis_storage.create(data, session_id, expiration)

        try:
            await self.database_storage.create(data, session_id, None)
            logger.debug(f"Session {session_id} stored in both Redis and Database")
        except Exception as e:
            logger.warning(f"Failed to store session audit trail in database: {e}")

        return session_id

    async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
        """Get session data from Redis (active sessions only).

        Args:
            session_id: The session ID
            model_class: The Pydantic model class to decode the data into

        Returns:
            The session data or None if session doesn't exist or expired
        """
        return await self.redis_storage.get(session_id, model_class)

    async def update(
        self,
        session_id: str,
        data: T,
        reset_expiration: bool = True,
        expiration: Optional[int] = None,
    ) -> bool:
        """Update session data in both Redis and Database.

        Args:
            session_id: The session ID
            data: New session data
            reset_expiration: Whether to reset the expiration
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was updated in Redis, False if it didn't exist
        """
        result = await self.redis_storage.update(
            session_id, data, reset_expiration, expiration
        )

        try:
            await self.database_storage.update(session_id, data, reset_expiration, None)
            logger.debug(f"Session {session_id} updated in both Redis and Database")
        except Exception as e:
            logger.warning(f"Failed to update session audit trail in database: {e}")

        return result

    async def delete(self, session_id: str) -> bool:
        """Delete session from Redis and mark as inactive in Database.

        Args:
            session_id: The session ID

        Returns:
            True if the session was deleted from Redis, False if it didn't exist
        """
        result = await self.redis_storage.delete(session_id)

        try:
            session_data = None

            if hasattr(self.database_storage, "get_raw"):
                session_data = await self.database_storage.get_raw(session_id)
            else:
                try:
                    session_data = await self.database_storage.get(
                        session_id, cast("type[T]", BaseModel)
                    )
                except TypeError:
                    await self.database_storage.delete(session_id)
                    logger.debug(
                        f"Session {session_id} deleted from Redis and hard-deleted from Database"
                    )
                    return result

            if session_data:
                if hasattr(session_data, "is_active"):
                    session_data.is_active = False
                elif isinstance(session_data, dict):
                    session_data["is_active"] = False

                await self.database_storage.update(
                    session_id, session_data, False, None
                )
                logger.debug(
                    f"Session {session_id} deleted from Redis and marked inactive in Database"
                )
            else:
                await self.database_storage.delete(session_id)
                logger.debug(
                    f"Session {session_id} deleted from both Redis and Database"
                )
        except Exception as e:
            logger.warning(f"Failed to update session audit trail on delete: {e}")

        return result

    async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
        """Extend the expiration of a session in Redis.

        Args:
            session_id: The session ID
            expiration: Optional custom expiration in seconds

        Returns:
            True if the session was extended in Redis, False if it didn't exist
        """
        result = await self.redis_storage.extend(session_id, expiration)

        try:
            await self.database_storage.extend(session_id, None)
            logger.debug(
                f"Session {session_id} extended in Redis and last_activity updated in Database"
            )
        except Exception as e:
            logger.warning(f"Failed to update last_activity in database: {e}")

        return result

    async def exists(self, session_id: str) -> bool:
        """Check if a session exists in Redis (active sessions only).

        Args:
            session_id: The session ID

        Returns:
            True if the session exists in Redis, False otherwise
        """
        return await self.redis_storage.exists(session_id)

    async def get_user_sessions(self, user_id: int) -> list[str]:
        """Get all active session IDs for a user from Redis.

        Args:
            user_id: The user ID

        Returns:
            List of active session IDs for the user
        """
        try:
            if hasattr(self.redis_storage, "get_user_sessions"):
                result = await self.redis_storage.get_user_sessions(user_id)
                return result if isinstance(result, list) else []
            else:
                return []
        except Exception as e:
            logger.warning(f"Failed to get user sessions from Redis: {e}")
            try:
                if hasattr(self.database_storage, "get_user_sessions"):
                    result = await self.database_storage.get_user_sessions(user_id)
                    return result if isinstance(result, list) else []
                else:
                    return []
            except Exception as db_e:
                logger.error(
                    f"Failed to get user sessions from Database fallback: {db_e}"
                )
                return []

    async def close(self) -> None:
        """Close both storage connections."""
        await self.redis_storage.close()
        await self.database_storage.close()

__init__(redis_storage, database_storage, prefix='session:', expiration=1800)

Initialize the Hybrid session storage.

Parameters:

Name Type Description Default
redis_storage AbstractSessionStorage[T]

Redis storage instance for active sessions

required
database_storage AbstractSessionStorage[T]

Database storage instance for audit trail

required
prefix str

Prefix for all session keys (inherited from redis_storage)

'session:'
expiration int

Default session expiration in seconds

1800
Source code in crudadmin/session/backends/hybrid.py
def __init__(
    self,
    redis_storage: AbstractSessionStorage[T],
    database_storage: AbstractSessionStorage[T],
    prefix: str = "session:",
    expiration: int = 1800,
):
    """Initialize the Hybrid session storage.

    Args:
        redis_storage: Redis storage instance for active sessions
        database_storage: Database storage instance for audit trail
        prefix: Prefix for all session keys (inherited from redis_storage)
        expiration: Default session expiration in seconds
    """
    super().__init__(prefix=prefix, expiration=expiration)
    self.redis_storage = redis_storage
    self.database_storage = database_storage

close() async

Close both storage connections.

Source code in crudadmin/session/backends/hybrid.py
async def close(self) -> None:
    """Close both storage connections."""
    await self.redis_storage.close()
    await self.database_storage.close()

create(data, session_id=None, expiration=None) async

Create a new session in both Redis and Database.

Parameters:

Name Type Description Default
data T

Session data (must be a Pydantic model)

required
session_id Optional[str]

Optional session ID. If not provided, one will be generated

None
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
str

The session ID

Source code in crudadmin/session/backends/hybrid.py
async def create(
    self,
    data: T,
    session_id: Optional[str] = None,
    expiration: Optional[int] = None,
) -> str:
    """Create a new session in both Redis and Database.

    Args:
        data: Session data (must be a Pydantic model)
        session_id: Optional session ID. If not provided, one will be generated
        expiration: Optional custom expiration in seconds

    Returns:
        The session ID
    """
    session_id = await self.redis_storage.create(data, session_id, expiration)

    try:
        await self.database_storage.create(data, session_id, None)
        logger.debug(f"Session {session_id} stored in both Redis and Database")
    except Exception as e:
        logger.warning(f"Failed to store session audit trail in database: {e}")

    return session_id

delete(session_id) async

Delete session from Redis and mark as inactive in Database.

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session was deleted from Redis, False if it didn't exist

Source code in crudadmin/session/backends/hybrid.py
async def delete(self, session_id: str) -> bool:
    """Delete session from Redis and mark as inactive in Database.

    Args:
        session_id: The session ID

    Returns:
        True if the session was deleted from Redis, False if it didn't exist
    """
    result = await self.redis_storage.delete(session_id)

    try:
        session_data = None

        if hasattr(self.database_storage, "get_raw"):
            session_data = await self.database_storage.get_raw(session_id)
        else:
            try:
                session_data = await self.database_storage.get(
                    session_id, cast("type[T]", BaseModel)
                )
            except TypeError:
                await self.database_storage.delete(session_id)
                logger.debug(
                    f"Session {session_id} deleted from Redis and hard-deleted from Database"
                )
                return result

        if session_data:
            if hasattr(session_data, "is_active"):
                session_data.is_active = False
            elif isinstance(session_data, dict):
                session_data["is_active"] = False

            await self.database_storage.update(
                session_id, session_data, False, None
            )
            logger.debug(
                f"Session {session_id} deleted from Redis and marked inactive in Database"
            )
        else:
            await self.database_storage.delete(session_id)
            logger.debug(
                f"Session {session_id} deleted from both Redis and Database"
            )
    except Exception as e:
        logger.warning(f"Failed to update session audit trail on delete: {e}")

    return result

exists(session_id) async

Check if a session exists in Redis (active sessions only).

Parameters:

Name Type Description Default
session_id str

The session ID

required

Returns:

Type Description
bool

True if the session exists in Redis, False otherwise

Source code in crudadmin/session/backends/hybrid.py
async def exists(self, session_id: str) -> bool:
    """Check if a session exists in Redis (active sessions only).

    Args:
        session_id: The session ID

    Returns:
        True if the session exists in Redis, False otherwise
    """
    return await self.redis_storage.exists(session_id)

extend(session_id, expiration=None) async

Extend the expiration of a session in Redis.

Parameters:

Name Type Description Default
session_id str

The session ID

required
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was extended in Redis, False if it didn't exist

Source code in crudadmin/session/backends/hybrid.py
async def extend(self, session_id: str, expiration: Optional[int] = None) -> bool:
    """Extend the expiration of a session in Redis.

    Args:
        session_id: The session ID
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was extended in Redis, False if it didn't exist
    """
    result = await self.redis_storage.extend(session_id, expiration)

    try:
        await self.database_storage.extend(session_id, None)
        logger.debug(
            f"Session {session_id} extended in Redis and last_activity updated in Database"
        )
    except Exception as e:
        logger.warning(f"Failed to update last_activity in database: {e}")

    return result

get(session_id, model_class) async

Get session data from Redis (active sessions only).

Parameters:

Name Type Description Default
session_id str

The session ID

required
model_class type[T]

The Pydantic model class to decode the data into

required

Returns:

Type Description
Optional[T]

The session data or None if session doesn't exist or expired

Source code in crudadmin/session/backends/hybrid.py
async def get(self, session_id: str, model_class: type[T]) -> Optional[T]:
    """Get session data from Redis (active sessions only).

    Args:
        session_id: The session ID
        model_class: The Pydantic model class to decode the data into

    Returns:
        The session data or None if session doesn't exist or expired
    """
    return await self.redis_storage.get(session_id, model_class)

get_user_sessions(user_id) async

Get all active session IDs for a user from Redis.

Parameters:

Name Type Description Default
user_id int

The user ID

required

Returns:

Type Description
list[str]

List of active session IDs for the user

Source code in crudadmin/session/backends/hybrid.py
async def get_user_sessions(self, user_id: int) -> list[str]:
    """Get all active session IDs for a user from Redis.

    Args:
        user_id: The user ID

    Returns:
        List of active session IDs for the user
    """
    try:
        if hasattr(self.redis_storage, "get_user_sessions"):
            result = await self.redis_storage.get_user_sessions(user_id)
            return result if isinstance(result, list) else []
        else:
            return []
    except Exception as e:
        logger.warning(f"Failed to get user sessions from Redis: {e}")
        try:
            if hasattr(self.database_storage, "get_user_sessions"):
                result = await self.database_storage.get_user_sessions(user_id)
                return result if isinstance(result, list) else []
            else:
                return []
        except Exception as db_e:
            logger.error(
                f"Failed to get user sessions from Database fallback: {db_e}"
            )
            return []

update(session_id, data, reset_expiration=True, expiration=None) async

Update session data in both Redis and Database.

Parameters:

Name Type Description Default
session_id str

The session ID

required
data T

New session data

required
reset_expiration bool

Whether to reset the expiration

True
expiration Optional[int]

Optional custom expiration in seconds

None

Returns:

Type Description
bool

True if the session was updated in Redis, False if it didn't exist

Source code in crudadmin/session/backends/hybrid.py
async def update(
    self,
    session_id: str,
    data: T,
    reset_expiration: bool = True,
    expiration: Optional[int] = None,
) -> bool:
    """Update session data in both Redis and Database.

    Args:
        session_id: The session ID
        data: New session data
        reset_expiration: Whether to reset the expiration
        expiration: Optional custom expiration in seconds

    Returns:
        True if the session was updated in Redis, False if it didn't exist
    """
    result = await self.redis_storage.update(
        session_id, data, reset_expiration, expiration
    )

    try:
        await self.database_storage.update(session_id, data, reset_expiration, None)
        logger.debug(f"Session {session_id} updated in both Redis and Database")
    except Exception as e:
        logger.warning(f"Failed to update session audit trail in database: {e}")

    return result

Session Schemas

Core Session Data

Bases: BaseSession

Common session data for any user session.

Source code in crudadmin/session/schemas.py
class SessionData(BaseSession):
    """Common session data for any user session."""

    metadata: dict[str, Any] = Field(
        default_factory=dict, description="Additional session metadata"
    )
    created_at: datetime = Field(
        default_factory=lambda: datetime.now(UTC),
        description="Session creation timestamp",
    )
    last_activity: datetime = Field(
        default_factory=lambda: datetime.now(UTC), description="Last activity timestamp"
    )

Bases: SessionData

Schema for creating a new session.

Source code in crudadmin/session/schemas.py
class SessionCreate(SessionData):
    """Schema for creating a new session."""

    pass

CSRF Protection

Bases: BaseModel

CSRF token data.

Source code in crudadmin/session/schemas.py
class CSRFToken(BaseModel):
    """CSRF token data."""

    token: str
    user_id: int
    session_id: str
    created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
    expires_at: datetime

User Agent Information

Bases: BaseModel

User agent information parsed from the User-Agent header.

Source code in crudadmin/session/schemas.py
class UserAgentInfo(BaseModel):
    """User agent information parsed from the User-Agent header."""

    browser: str = Field(..., description="Browser name")
    browser_version: str = Field(..., description="Browser version")
    os: str = Field(..., description="Operating System")
    device: str
    is_mobile: bool
    is_tablet: bool
    is_pc: bool

Database Session Models

Bases: BaseSession

Schema for creating AdminSession in database.

Source code in crudadmin/session/schemas.py
class AdminSessionCreate(BaseSession):
    """Schema for creating AdminSession in database."""

    created_at: datetime = Field(default_factory=lambda: datetime.now(UTC))
    last_activity: datetime = Field(default_factory=lambda: datetime.now(UTC))
    session_metadata: dict[str, Any] = Field(default_factory=dict)

Bases: BaseSession

Schema for reading AdminSession data.

Source code in crudadmin/session/schemas.py
class AdminSessionRead(BaseSession):
    """Schema for reading AdminSession data."""

    id: int
    session_metadata: dict[str, Any]
    created_at: datetime
    last_activity: datetime
    is_active: bool

Bases: BaseModel

Schema for updating AdminSession.

Source code in crudadmin/session/schemas.py
class AdminSessionUpdate(BaseModel):
    """Schema for updating AdminSession."""

    last_activity: Optional[datetime] = None
    is_active: Optional[bool] = None
    session_metadata: Optional[dict[str, Any]] = None

Usage Examples

Basic Session Management

from crudadmin.session.manager import SessionManager
from crudadmin.session.storage import get_session_storage

# Create session manager with memory backend
session_manager = SessionManager(
    session_backend="memory",
    max_sessions_per_user=5,
    session_timeout_minutes=30
)

# Create a new session
session_id, csrf_token = await session_manager.create_session(
    request=request,
    user_id=user.id,
    metadata={"role": "admin", "permissions": ["read", "write"]}
)

# Validate session
session_data = await session_manager.validate_session(session_id)
if session_data:
    print(f"Valid session for user {session_data.user_id}")

Redis Backend Configuration

# Configure with Redis for production
session_manager = SessionManager(
    session_backend="redis",
    redis_host="localhost",
    redis_port=6379,
    redis_db=0,
    redis_password="your-redis-password",
    max_sessions_per_user=10,
    session_timeout_minutes=60
)

Database Backend for Audit Trail

from your_app.database import DatabaseConfig

# Configure with database backend for admin visibility
db_config = DatabaseConfig(...)  # Your database configuration

session_manager = SessionManager(
    session_backend="database",
    db_config=db_config,
    max_sessions_per_user=5,
    session_timeout_minutes=30
)

Hybrid Backend (Best of Both Worlds)

# Hybrid: Redis for performance + Database for audit
session_manager = SessionManager(
    session_backend="hybrid",
    db_config=db_config,
    redis_host="localhost",
    redis_port=6379,
    max_sessions_per_user=10,
    session_timeout_minutes=60
)

CSRF Protection

# Validate CSRF token for state-changing operations
is_valid = await session_manager.validate_csrf_token(
    csrf_token=request.headers.get("X-CSRF-Token"),
    session_id=session_id,
    user_id=current_user.id
)

if not is_valid:
    raise HTTPException(status_code=403, detail="Invalid CSRF token")

Session Cleanup

# Cleanup expired sessions (should be called periodically)
await session_manager.cleanup_expired_sessions()

# Terminate specific session
await session_manager.terminate_session(session_id)

# Terminate all user sessions
await session_manager.terminate_user_sessions(user_id)

Backend Comparison

Backend Performance Scalability Persistence Admin Visibility Use Case
Memory Excellent Single node No No Development, testing
Redis Excellent Horizontal Yes* No Production, high traffic
Memcached Excellent Horizontal No No High performance caching
Database Good Vertical Yes Yes Audit requirements
Hybrid Excellent Horizontal Yes Yes Best of all worlds

*Redis persistence depends on configuration

Security Features

Session Security

# Session manager provides multiple security layers
session_manager = SessionManager(
    # Limit concurrent sessions per user
    max_sessions_per_user=5,

    # Automatic session expiration
    session_timeout_minutes=30,

    # CSRF protection
    csrf_token_bytes=32,

    # Login rate limiting
    login_max_attempts=5,
    login_window_minutes=15
)

Device Tracking

Sessions automatically track device information:

# Device info is automatically parsed and stored
session_data = await session_manager.validate_session(session_id)
device_info = session_data.device_info

print(f"Browser: {device_info['browser']}")
print(f"OS: {device_info['os']}")
print(f"Mobile: {device_info['is_mobile']}")

IP Address Monitoring

# Sessions track IP addresses for security monitoring
session_data = await session_manager.validate_session(session_id)
print(f"Session from IP: {session_data.ip_address}")

# Detect IP changes (potential session hijacking)
if session_data.ip_address != request.client.host:
    # Handle potential security issue
    await session_manager.terminate_session(session_id)

Configuration Options

Session Manager Settings

session_manager = SessionManager(
    # Storage configuration
    session_backend="redis",
    redis_host="localhost",
    redis_port=6379,
    redis_db=0,
    redis_password=None,

    # Session limits
    max_sessions_per_user=5,
    session_timeout_minutes=30,

    # Cleanup
    cleanup_interval_minutes=15,

    # CSRF
    csrf_token_bytes=32,

    # Rate limiting
    login_max_attempts=5,
    login_window_minutes=15
)

Backend-Specific Options

Redis Configuration

redis_storage = get_session_storage(
    backend="redis",
    model_type=SessionData,
    host="localhost",
    port=6379,
    db=0,
    password="your-password",
    pool_size=10,
    connect_timeout=10,
    prefix="session:",
    expiration=1800  # 30 minutes
)

Database Configuration

database_storage = get_session_storage(
    backend="database",
    model_type=SessionData,
    db_config=your_db_config,
    prefix="session:",
    expiration=1800
)

Integration with CRUDAdmin

Automatic Session Management

from crudadmin import CRUDAdmin

# CRUDAdmin automatically creates and manages sessions
crud_admin = CRUDAdmin(
    # Session backend configuration
    session_backend="redis",
    redis_url="redis://localhost:6379",

    # Session settings
    session_timeout=30,  # minutes
    max_sessions_per_user=5,

    # Security
    secret_key="your-secret-key",
    csrf_protection=True
)

Custom Session Storage

# Use custom session storage
custom_storage = YourCustomSessionStorage()

crud_admin = CRUDAdmin(
    session_storage=custom_storage,
    secret_key="your-secret-key"
)

Session Data Structure

SessionData Fields

Field Type Description
user_id int ID of the authenticated user
session_id str Unique session identifier
ip_address str IP address when session was created
user_agent str User agent string from browser
device_info dict Parsed device/browser information
created_at datetime When the session was created
last_activity datetime Last time session was validated
is_active bool Whether the session is active
metadata dict Additional session-specific data

Device Information

device_info = {
    "browser": "Chrome",
    "browser_version": "120.0.0.0",
    "os": "Windows",
    "device": "PC",
    "is_mobile": False,
    "is_tablet": False,
    "is_pc": True
}

Error Handling

Session Validation Errors

try:
    session_data = await session_manager.validate_session(session_id)
    if not session_data:
        # Session not found, expired, or inactive
        raise HTTPException(status_code=401, detail="Invalid session")
except Exception as e:
    logger.error(f"Session validation error: {e}")
    raise HTTPException(status_code=500, detail="Session validation failed")

Backend Connection Errors

try:
    await session_manager.create_session(request, user_id)
except ConnectionError:
    # Backend (Redis/Memcached) unavailable
    # Fallback to memory storage or return error
    pass
except Exception as e:
    logger.error(f"Session creation failed: {e}")
    raise

Performance Considerations

Session Cleanup

import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler

# Schedule periodic cleanup
scheduler = AsyncIOScheduler()
scheduler.add_job(
    session_manager.cleanup_expired_sessions,
    'interval',
    minutes=15,
    id='session_cleanup'
)
scheduler.start()

Connection Pooling

# Redis with connection pooling
redis_storage = get_session_storage(
    backend="redis",
    model_type=SessionData,
    host="localhost",
    port=6379,
    pool_size=20,  # Increase pool size for high traffic
    connect_timeout=10
)

Session Limits

# Prevent memory exhaustion
session_manager = SessionManager(
    max_sessions_per_user=10,  # Limit per user
    session_timeout_minutes=30,  # Auto-expire
    cleanup_interval_minutes=15  # Regular cleanup
)

Monitoring and Debugging

Session Metrics

# Get active session count for user
user_sessions = await session_manager.get_user_sessions(user_id)
print(f"User has {len(user_sessions)} active sessions")

# Monitor session activity
session_data = await session_manager.validate_session(session_id)
if session_data:
    session_age = datetime.now(UTC) - session_data.last_activity
    print(f"Session last active {session_age} ago")

Debug Information

# Enable detailed logging
import logging
logging.getLogger('crudadmin.session').setLevel(logging.DEBUG)

# Session data includes debug information
print(f"Session metadata: {session_data.metadata}")
print(f"Device info: {session_data.device_info}")

The session management system provides a robust, secure foundation for authentication in CRUDAdmin with flexibility to scale from development to production environments.