Eliminates XSS token theft by storing JWT in httpOnly Secure cookie instead of localStorage. Backend sets cookie on login and clears on logout. Token extraction uses cookie-first with Authorization header fallback for backward compatibility with existing tests. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
91 lines
2.6 KiB
Python
91 lines
2.6 KiB
Python
"""
|
|
Authentication API endpoints.
|
|
"""
|
|
from datetime import timedelta
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.responses import JSONResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.database import get_db
|
|
from app.core.security import (
|
|
verify_password,
|
|
get_password_hash,
|
|
create_access_token,
|
|
)
|
|
from app.core.config import get_settings
|
|
from app.models.user import User
|
|
from app.schemas import Token, UserCreate, UserResponse, LoginRequest
|
|
from app.api.deps import CurrentUser
|
|
|
|
router = APIRouter(prefix="/api/auth", tags=["auth"])
|
|
settings = get_settings()
|
|
|
|
|
|
@router.post("/login")
|
|
async def login(
|
|
login_data: LoginRequest,
|
|
db: Annotated[Session, Depends(get_db)],
|
|
):
|
|
"""Login and get access token. Expects SHA-256 hashed password."""
|
|
user = db.query(User).filter(User.username == login_data.username).first()
|
|
|
|
if not user or not verify_password(login_data.password, user.hashed_password):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Incorrect username or password",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
access_token = create_access_token(
|
|
data={"sub": user.username},
|
|
expires_delta=timedelta(minutes=settings.access_token_expire_minutes),
|
|
)
|
|
|
|
response = JSONResponse(
|
|
content={"access_token": access_token, "token_type": "bearer"},
|
|
)
|
|
response.set_cookie(
|
|
key="access_token",
|
|
value=access_token,
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=False, # Set True in production behind HTTPS
|
|
path="/",
|
|
max_age=settings.access_token_expire_minutes * 60,
|
|
)
|
|
return response
|
|
|
|
|
|
@router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
|
|
async def register(
|
|
user_data: UserCreate,
|
|
db: Annotated[Session, Depends(get_db)],
|
|
):
|
|
"""Register a new user. Disabled for production."""
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Registration is disabled",
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
async def get_current_user_info(current_user: CurrentUser):
|
|
"""Get current user information."""
|
|
return current_user
|
|
|
|
|
|
@router.post("/logout")
|
|
async def logout():
|
|
"""Logout by clearing the access_token cookie."""
|
|
response = JSONResponse(content={"message": "Successfully logged out"})
|
|
response.delete_cookie(
|
|
key="access_token",
|
|
httponly=True,
|
|
samesite="lax",
|
|
secure=False,
|
|
path="/",
|
|
)
|
|
return response
|