52 lines
1.5 KiB
Python
52 lines
1.5 KiB
Python
from jose import jwt, JOSEError
|
|
from pydantic import BaseModel
|
|
from datetime import timedelta, datetime
|
|
from typing import ClassVar, Literal
|
|
import logging
|
|
from fooder.settings import settings
|
|
from fooder.exc import Unauthorized
|
|
|
|
|
|
class Token(BaseModel):
|
|
exp: datetime
|
|
sub: int
|
|
|
|
secret_key: ClassVar[str]
|
|
expire_delta: ClassVar[timedelta]
|
|
|
|
@classmethod
|
|
def calculate_exp(cls, now: datetime) -> datetime:
|
|
return now + cls.expire_delta
|
|
|
|
@classmethod
|
|
def decode(cls, jwt_token: str | bytes) -> "Token":
|
|
try:
|
|
data = jwt.decode(
|
|
jwt_token, cls.secret_key, algorithms=[settings.ALGORITHM]
|
|
)
|
|
except JOSEError as e:
|
|
logging.error(e)
|
|
raise Unauthorized()
|
|
return cls(**data)
|
|
|
|
def encode(self) -> str:
|
|
data = self.model_dump()
|
|
data["sub"] = str(data["sub"])
|
|
return jwt.encode(data, self.secret_key, settings.ALGORITHM)
|
|
|
|
|
|
class AccessToken(Token):
|
|
secret_key = settings.SECRET_KEY
|
|
expire_delta = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
|
|
|
|
class RefreshToken(Token):
|
|
secret_key = settings.REFRESH_SECRET_KEY
|
|
expire_delta = timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
|
|
|
|
|
|
def generate_token_pair(entity_id: int, now: datetime) -> tuple[AccessToken, RefreshToken]:
|
|
return (
|
|
AccessToken(exp=AccessToken.calculate_exp(now), sub=entity_id),
|
|
RefreshToken(exp=RefreshToken.calculate_exp(now), sub=entity_id),
|
|
)
|