""" Authentication API endpoints. """ from datetime import timedelta from typing import Annotated from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session from app.core.database import get_db from app.core.security import ( verify_password, get_password_hash, create_access_token, ) from app.core.config import get_settings from app.models.user import User from app.schemas import Token, UserCreate, UserResponse, LoginRequest from app.api.deps import CurrentUser router = APIRouter(prefix="/api/auth", tags=["auth"]) settings = get_settings() @router.post("/login", response_model=Token) async def login( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Annotated[Session, Depends(get_db)], ): """Login and get access token.""" user = db.query(User).filter(User.username == form_data.username).first() if not user or not verify_password(form_data.password, user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = create_access_token( data={"sub": user.username}, expires_delta=timedelta(minutes=settings.access_token_expire_minutes), ) return Token(access_token=access_token) @router.post("/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED) async def register( user_data: UserCreate, db: Annotated[Session, Depends(get_db)], ): """Register a new user.""" if db.query(User).filter(User.username == user_data.username).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Username already registered", ) if db.query(User).filter(User.email == user_data.email).first(): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered", ) user = User( username=user_data.username, email=user_data.email, hashed_password=get_password_hash(user_data.password), ) db.add(user) db.commit() db.refresh(user) return user @router.get("/me", response_model=UserResponse) async def get_current_user_info(current_user: CurrentUser): """Get current user information.""" return current_user @router.post("/logout") async def logout(): """Logout (client should discard token).""" return {"message": "Successfully logged out"}