55 lines
1.9 KiB
Python
55 lines
1.9 KiB
Python
from typing import AsyncIterator, Annotated
|
|
|
|
from fastapi import HTTPException, Depends
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
|
|
from ..model.token import Token, RefreshTokenPayload
|
|
from ..domain.user import User as DBUser
|
|
from .base import BaseController, AsyncSession
|
|
from ..auth import (
|
|
authenticate_user,
|
|
create_access_token,
|
|
create_refresh_token,
|
|
verify_refresh_token,
|
|
)
|
|
|
|
|
|
class CreateToken(BaseController):
|
|
async def call(self, content: OAuth2PasswordRequestForm) -> Token:
|
|
async with self.async_session.begin() as session:
|
|
user = await authenticate_user(session, content.username, content.password)
|
|
|
|
if user is None:
|
|
raise HTTPException(
|
|
status_code=401, detail="Invalid username or password"
|
|
)
|
|
|
|
refresh_token = await create_refresh_token(session, user)
|
|
access_token = create_access_token(user)
|
|
|
|
return Token(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token.token,
|
|
token_type="bearer",
|
|
)
|
|
|
|
|
|
class RefreshToken(BaseController):
|
|
async def call(self, content: RefreshTokenPayload) -> Token:
|
|
async with self.async_session.begin() as session:
|
|
current_token = await verify_refresh_token(session, content.refresh_token)
|
|
|
|
if current_token is None:
|
|
raise HTTPException(status_code=401, detail="Invalid token")
|
|
|
|
user = await DBUser.get(session, current_token.user_id)
|
|
await current_token.delete(session)
|
|
|
|
refresh_token = await create_refresh_token(session, user)
|
|
access_token = create_access_token(user)
|
|
|
|
return Token(
|
|
access_token=access_token,
|
|
refresh_token=refresh_token.token,
|
|
token_type="bearer",
|
|
)
|