JWT Tokens¶
JSON Web Tokens (JWT) form the backbone of modern web authentication. This comprehensive guide explains how the boilerplate implements a secure, stateless authentication system using access and refresh tokens.
Understanding JWT Authentication¶
JWT tokens are self-contained, digitally signed packages of information that can be safely transmitted between parties. Unlike traditional session-based authentication that requires server-side storage, JWT tokens are stateless - all the information needed to verify a user's identity is contained within the token itself.
Why Use JWT?¶
Stateless Design: No need to store session data on the server, making it perfect for distributed systems and microservices.
Scalability: Since tokens contain all necessary information, they work seamlessly across multiple servers without shared session storage.
Security: Digital signatures ensure tokens can't be tampered with, and expiration times limit exposure if compromised.
Cross-Domain Support: Unlike cookies, JWT tokens work across different domains and can be used in mobile applications.
Token Types¶
The authentication system uses a dual-token approach for maximum security and user experience:
Access Tokens¶
Access tokens are short-lived credentials that prove a user's identity for API requests. Think of them as temporary keys that grant access to protected resources.
- Purpose: Authenticate API requests and authorize actions
- Lifetime: 30 minutes (configurable) - short enough to limit damage if compromised
- Storage: Authorization header (
Bearer <token>
) - sent with each API request - Usage: Include in every call to protected endpoints
Why Short-Lived? If an access token is stolen (e.g., through XSS), the damage window is limited to 30 minutes before it expires naturally.
Refresh Tokens¶
Refresh tokens are longer-lived credentials used solely to generate new access tokens. They provide a balance between security and user convenience.
- Purpose: Generate new access tokens without requiring re-login
- Lifetime: 7 days (configurable) - long enough for good UX, short enough for security
- Storage: Secure HTTP-only cookie - inaccessible to JavaScript, preventing XSS attacks
- Usage: Automatically used by the browser when access tokens need refreshing
Why HTTP-Only Cookies? This prevents malicious JavaScript from accessing refresh tokens, providing protection against XSS attacks while allowing automatic renewal.
Token Creation¶
Understanding how tokens are created helps you customize the authentication system for your specific needs.
Creating Access Tokens¶
Access tokens are generated during login and token refresh operations. The process involves encoding user information with an expiration time and signing it with your secret key.
from datetime import timedelta
from app.core.security import create_access_token, ACCESS_TOKEN_EXPIRE_MINUTES
# Basic access token with default expiration
access_token = await create_access_token(data={"sub": username})
# Custom expiration for special cases (e.g., admin sessions)
custom_expires = timedelta(minutes=60)
access_token = await create_access_token(
data={"sub": username},
expires_delta=custom_expires
)
When to Customize Expiration: - High-security environments: Shorter expiration (15 minutes) - Development/testing: Longer expiration for convenience - Admin operations: Variable expiration based on sensitivity
Creating Refresh Tokens¶
Refresh tokens follow the same creation pattern but with longer expiration times. They're typically created only during login.
from app.core.security import create_refresh_token, REFRESH_TOKEN_EXPIRE_DAYS
# Standard refresh token
refresh_token = await create_refresh_token(data={"sub": username})
# Extended refresh token for "remember me" functionality
extended_expires = timedelta(days=30)
refresh_token = await create_refresh_token(
data={"sub": username},
expires_delta=extended_expires
)
Token Structure¶
JWT tokens consist of three parts separated by dots: header.payload.signature
. The payload contains the actual user information and metadata.
# Access token payload structure
{
"sub": "username", # Subject (user identifier)
"exp": 1234567890, # Expiration timestamp (Unix)
"token_type": "access", # Distinguishes from refresh tokens
"iat": 1234567890 # Issued at (automatic)
}
# Refresh token payload structure
{
"sub": "username", # Same user identifier
"exp": 1234567890, # Longer expiration time
"token_type": "refresh", # Prevents confusion/misuse
"iat": 1234567890 # Issue timestamp
}
Key Fields Explained:
- sub
(Subject): Identifies the user - can be username, email, or user ID
- exp
(Expiration): Unix timestamp when token becomes invalid
- token_type
: Custom field preventing tokens from being used incorrectly
- iat
(Issued At): Useful for token rotation and audit trails
Token Verification¶
Token verification is a multi-step process that ensures both the token's authenticity and the user's current authorization status.
Verifying Access Tokens¶
Every protected endpoint must verify the access token before processing the request. This involves checking the signature, expiration, and blacklist status.
from app.core.security import verify_token, TokenType
# Verify access token in endpoint
token_data = await verify_token(token, TokenType.ACCESS, db)
if token_data:
username = token_data.username_or_email
# Token is valid, proceed with request processing
else:
# Token is invalid, expired, or blacklisted
raise UnauthorizedException("Invalid or expired token")
Verifying Refresh Tokens¶
Refresh token verification follows the same process but with different validation rules and outcomes.
# Verify refresh token for renewal
token_data = await verify_token(token, TokenType.REFRESH, db)
if token_data:
# Generate new access token
new_access_token = await create_access_token(
data={"sub": token_data.username_or_email}
)
return {"access_token": new_access_token, "token_type": "bearer"}
else:
# Refresh token invalid - user must log in again
raise UnauthorizedException("Invalid refresh token")
Token Verification Process¶
The verification process includes several security checks to prevent various attack vectors:
async def verify_token(token: str, expected_token_type: TokenType, db: AsyncSession) -> TokenData | None:
# 1. Check blacklist first (prevents use of logged-out tokens)
is_blacklisted = await crud_token_blacklist.exists(db, token=token)
if is_blacklisted:
return None
try:
# 2. Verify signature and decode payload
payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM])
# 3. Extract and validate claims
username_or_email: str | None = payload.get("sub")
token_type: str | None = payload.get("token_type")
# 4. Ensure token type matches expectation
if username_or_email is None or token_type != expected_token_type:
return None
# 5. Return validated data
return TokenData(username_or_email=username_or_email)
except JWTError:
# Token is malformed, expired, or signature invalid
return None
Security Checks Explained:
- Blacklist Check: Prevents use of tokens from logged-out users
- Signature Verification: Ensures token hasn't been tampered with
- Expiration Check: Automatically handled by JWT library
- Type Validation: Prevents refresh tokens from being used as access tokens
- Subject Validation: Ensures token contains valid user identifier
Client-Side Authentication Flow¶
Understanding the complete authentication flow helps frontend developers integrate properly with the API.
Recommended Client Flow¶
1. Login Process
// Send credentials to login endpoint
const response = await fetch('/api/v1/login', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: 'username=user&password=pass',
credentials: 'include' // Important: includes cookies
});
const { access_token, token_type } = await response.json();
// Store access token in memory (not localStorage)
sessionStorage.setItem('access_token', access_token);
2. Making Authenticated Requests
// Include access token in Authorization header
const response = await fetch('/api/v1/protected-endpoint', {
headers: {
'Authorization': `Bearer ${sessionStorage.getItem('access_token')}`
},
credentials: 'include'
});
3. Handling Token Expiration
// Automatic token refresh on 401 errors
async function apiCall(url, options = {}) {
let response = await fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${sessionStorage.getItem('access_token')}`
},
credentials: 'include'
});
// If token expired, try to refresh
if (response.status === 401) {
const refreshResponse = await fetch('/api/v1/refresh', {
method: 'POST',
credentials: 'include' // Sends refresh token cookie
});
if (refreshResponse.ok) {
const { access_token } = await refreshResponse.json();
sessionStorage.setItem('access_token', access_token);
// Retry original request
response = await fetch(url, {
...options,
headers: {
...options.headers,
'Authorization': `Bearer ${access_token}`
},
credentials: 'include'
});
} else {
// Refresh failed - redirect to login
window.location.href = '/login';
}
}
return response;
}
4. Logout Process
// Clear tokens and call logout endpoint
await fetch('/api/v1/logout', {
method: 'POST',
credentials: 'include'
});
sessionStorage.removeItem('access_token');
// Refresh token cookie is cleared by server
Cookie Configuration¶
The refresh token cookie is configured for maximum security:
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True, # Prevents JavaScript access (XSS protection)
secure=True, # HTTPS only in production
samesite="Lax", # CSRF protection with good usability
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
)
SameSite Options:
Lax
(Recommended): Cookies sent on top-level navigation but not cross-site requestsStrict
: Maximum security but may break some user flowsNone
: Required for cross-origin requests (must use with Secure)
Token Blacklisting¶
Token blacklisting solves a fundamental problem with JWT tokens: once issued, they remain valid until expiration, even if the user logs out. Blacklisting provides immediate token revocation.
Why Blacklisting Matters¶
Without blacklisting, logged-out users could continue accessing your API until their tokens naturally expire. This creates security risks, especially on shared computers or if tokens are compromised.
Blacklisting Implementation¶
The system uses a database table to track invalidated tokens:
# models/token_blacklist.py
class TokenBlacklist(Base):
__tablename__ = "token_blacklist"
id: Mapped[int] = mapped_column(primary_key=True)
token: Mapped[str] = mapped_column(unique=True, index=True) # Full token string
expires_at: Mapped[datetime] = mapped_column() # When to clean up
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
Design Considerations: - Unique constraint: Prevents duplicate entries - Index on token: Fast lookup during verification - Expires_at field: Enables automatic cleanup of old entries
Blacklisting Tokens¶
The system provides functions for both single token and dual token blacklisting:
from app.core.security import blacklist_token, blacklist_tokens
# Single token blacklisting (for specific scenarios)
await blacklist_token(token, db)
# Dual token blacklisting (standard logout)
await blacklist_tokens(access_token, refresh_token, db)
Blacklisting Process¶
The blacklisting process extracts the expiration time from the token to set an appropriate cleanup schedule:
async def blacklist_token(token: str, db: AsyncSession) -> None:
# 1. Decode token to extract expiration (no verification needed)
payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM])
exp_timestamp = payload.get("exp")
if exp_timestamp is not None:
# 2. Convert Unix timestamp to datetime
expires_at = datetime.fromtimestamp(exp_timestamp)
# 3. Store in blacklist with expiration
await crud_token_blacklist.create(
db,
object=TokenBlacklistCreate(token=token, expires_at=expires_at)
)
Cleanup Strategy: Blacklisted tokens can be automatically removed from the database after their natural expiration time, preventing unlimited database growth.
Login Flow Implementation¶
Complete Login Endpoint¶
@router.post("/login", response_model=Token)
async def login_for_access_token(
response: Response,
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: Annotated[AsyncSession, Depends(async_get_db)],
) -> dict[str, str]:
# 1. Authenticate user
user = await authenticate_user(
username_or_email=form_data.username,
password=form_data.password,
db=db
)
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password"
)
# 2. Create access token
access_token = await create_access_token(data={"sub": user["username"]})
# 3. Create refresh token
refresh_token = await create_refresh_token(data={"sub": user["username"]})
# 4. Set refresh token as HTTP-only cookie
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
)
return {"access_token": access_token, "token_type": "bearer"}
Token Refresh Endpoint¶
@router.post("/refresh", response_model=Token)
async def refresh_access_token(
response: Response,
db: Annotated[AsyncSession, Depends(async_get_db)],
refresh_token: str = Cookie(None)
) -> dict[str, str]:
if not refresh_token:
raise HTTPException(status_code=401, detail="Refresh token missing")
# 1. Verify refresh token
token_data = await verify_token(refresh_token, TokenType.REFRESH, db)
if not token_data:
raise HTTPException(status_code=401, detail="Invalid refresh token")
# 2. Create new access token
new_access_token = await create_access_token(
data={"sub": token_data.username_or_email}
)
# 3. Optionally create new refresh token (token rotation)
new_refresh_token = await create_refresh_token(
data={"sub": token_data.username_or_email}
)
# 4. Blacklist old refresh token
await blacklist_token(refresh_token, db)
# 5. Set new refresh token cookie
response.set_cookie(
key="refresh_token",
value=new_refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
)
return {"access_token": new_access_token, "token_type": "bearer"}
Logout Implementation¶
@router.post("/logout")
async def logout(
response: Response,
db: Annotated[AsyncSession, Depends(async_get_db)],
current_user: dict = Depends(get_current_user),
token: str = Depends(oauth2_scheme),
refresh_token: str = Cookie(None)
) -> dict[str, str]:
# 1. Blacklist access token
await blacklist_token(token, db)
# 2. Blacklist refresh token if present
if refresh_token:
await blacklist_token(refresh_token, db)
# 3. Clear refresh token cookie
response.delete_cookie(
key="refresh_token",
httponly=True,
secure=True,
samesite="strict"
)
return {"message": "Successfully logged out"}
Authentication Dependencies¶
get_current_user¶
async def get_current_user(
db: AsyncSession = Depends(async_get_db),
token: str = Depends(oauth2_scheme)
) -> dict:
# 1. Verify token
token_data = await verify_token(token, TokenType.ACCESS, db)
if not token_data:
raise HTTPException(status_code=401, detail="Invalid token")
# 2. Get user from database
user = await crud_users.get(
db=db,
username=token_data.username_or_email,
schema_to_select=UserRead
)
if user is None:
raise HTTPException(status_code=401, detail="User not found")
return user
get_optional_user¶
async def get_optional_user(
db: AsyncSession = Depends(async_get_db),
token: str = Depends(optional_oauth2_scheme)
) -> dict | None:
if not token:
return None
try:
return await get_current_user(db=db, token=token)
except HTTPException:
return None
get_current_superuser¶
async def get_current_superuser(
current_user: dict = Depends(get_current_user)
) -> dict:
if not current_user.get("is_superuser", False):
raise HTTPException(
status_code=403,
detail="Not enough permissions"
)
return current_user
Configuration¶
Environment Variables¶
# JWT Configuration
SECRET_KEY=your-secret-key-here
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
# Security Headers
SECURE_COOKIES=true
CORS_ORIGINS=["http://localhost:3000", "https://yourapp.com"]
Security Configuration¶
# app/core/config.py
class Settings(BaseSettings):
SECRET_KEY: SecretStr
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Cookie settings
SECURE_COOKIES: bool = True
COOKIE_DOMAIN: str | None = None
COOKIE_SAMESITE: str = "strict"
Security Best Practices¶
Token Security¶
- Use strong secrets: Generate cryptographically secure SECRET_KEY
- Rotate secrets: Regularly change SECRET_KEY in production
- Environment separation: Different secrets for dev/staging/production
- Secure transmission: Always use HTTPS in production
Cookie Security¶
- HttpOnly flag: Prevents JavaScript access to refresh tokens
- Secure flag: Ensures cookies only sent over HTTPS
- SameSite attribute: Prevents CSRF attacks
- Domain restrictions: Set cookie domain appropriately
Implementation Security¶
- Input validation: Validate all token inputs
- Rate limiting: Implement login attempt limits
- Audit logging: Log authentication events
- Token rotation: Regularly refresh tokens
Common Patterns¶
API Key Authentication¶
For service-to-service communication:
async def get_api_key_user(
api_key: str = Header(None),
db: AsyncSession = Depends(async_get_db)
) -> dict:
if not api_key:
raise HTTPException(status_code=401, detail="API key required")
# Verify API key
user = await crud_users.get(db=db, api_key=api_key)
if not user:
raise HTTPException(status_code=401, detail="Invalid API key")
return user
Multiple Authentication Methods¶
async def get_authenticated_user(
db: AsyncSession = Depends(async_get_db),
token: str = Depends(optional_oauth2_scheme),
api_key: str = Header(None)
) -> dict:
# Try JWT token first
if token:
try:
return await get_current_user(db=db, token=token)
except HTTPException:
pass
# Fall back to API key
if api_key:
return await get_api_key_user(api_key=api_key, db=db)
raise HTTPException(status_code=401, detail="Authentication required")
Troubleshooting¶
Common Issues¶
Token Expired: Implement automatic refresh using refresh tokens Invalid Signature: Check SECRET_KEY consistency across environments Blacklisted Token: User logged out - redirect to login Missing Token: Ensure Authorization header is properly set
Debugging Tips¶
# Enable debug logging
import logging
logging.getLogger("app.core.security").setLevel(logging.DEBUG)
# Test token validation
async def debug_token(token: str, db: AsyncSession):
try:
payload = jwt.decode(token, SECRET_KEY.get_secret_value(), algorithms=[ALGORITHM])
print(f"Token payload: {payload}")
is_blacklisted = await crud_token_blacklist.exists(db, token=token)
print(f"Is blacklisted: {is_blacklisted}")
except JWTError as e:
print(f"JWT Error: {e}")
This comprehensive JWT implementation provides secure, scalable authentication for your FastAPI application.