feat: добавлены новые функции и схемы в модуль auth
All checks were successful
Lint project / lint (push) Successful in 1m41s

- Добавлен класс UserVerifySchema для валидации данных пользователя, наследующий GetUserByID и GetUserByEmail.
- Обновлена модель AuthUser для регистрации пользователя.
- Добавлены роуты logout и get-user для работы с аутентификацией.
- Создан новый модуль utils с функцией get_token_from_cookies для извлечения токена из куки.
- Добавлена логика для выхода пользователя и получения информации об авторизованном пользователе в сервисах и роутах.
- Обновлены схемы и зависимости в модуле handlers и managers для работы с аутентификацией.
This commit is contained in:
proDream 2025-04-29 22:51:00 +04:00
parent 40d45e8379
commit 78cc7e3f54
8 changed files with 225 additions and 4 deletions

View File

@ -0,0 +1,52 @@
"""
Проект: Lkeep
Автор: Иван Ашихмин
Год: 2025
Специально для проекта "Код на салфетке"
https://pressanybutton.ru/category/servis-na-fastapi/
"""
import uuid
from typing import Annotated
from fastapi import Depends, HTTPException
from starlette import status
from lkeep.apps.auth.handlers import AuthHandler
from lkeep.apps.auth.managers import UserManager
from lkeep.apps.auth.schemas import UserVerifySchema
from lkeep.apps.auth.utils import get_token_from_cookies
async def get_current_user(
token: Annotated[str, Depends(get_token_from_cookies)],
handler: AuthHandler = Depends(AuthHandler),
manager: UserManager = Depends(UserManager),
) -> UserVerifySchema:
"""
Получает текущего пользователя из токена аутентификации.
:param token: Токен аутентификации, полученный из куки.
:type token: str
:param handler: Обработчик аутентификации, использующийся для декодирования токена.
:type handler: AuthHandler
:param manager: Менеджер пользователей, используется для проверки и получения данных о пользователе.
:type manager: UserManager
:returns: Схема с информацией о текущем пользователе.
:rtype: UserVerifySchema
:raises HTTPException: Если токен невалиден или пользователь не найден.
"""
decoded_token = await handler.decode_access_token(token=token)
user_id = str(decoded_token.get("user_id"))
session_id = str(decoded_token.get("session_id"))
if not await manager.get_access_token(user_id=user_id, session_id=session_id):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token is invalid")
user = await manager.get_user_by_id(user_id=uuid.UUID(user_id))
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
user.session_id = session_id
return user

View File

@ -10,7 +10,9 @@ import datetime
import uuid
import jwt
from fastapi import HTTPException
from passlib.context import CryptContext
from starlette import status
from lkeep.apps.auth.named_tuples import CreateTokenTuple
from lkeep.core.settings import settings
@ -70,3 +72,22 @@ class AuthHandler:
encoded_jwt = jwt.encode(payload=data, key=self.secret, algorithm="HS256")
return CreateTokenTuple(encoded_jwt=encoded_jwt, session_id=session_id)
async def decode_access_token(self, token: str) -> dict:
"""
Декодирует JWT-токен и возвращает его содержимое.
:param token: Строка с JWT-токеном, который нужно декодировать.
:type token: str
:returns: Данные, содержащиеся в декодированном токене.
:rtype: dict
:raises HTTPException: При ошибке декодирования токена (например, токен просрочен или невалиден).
Статус-код ответа 401 UNAUTHORIZED, детализация "Token has expired" при просрочке,
и "Invalid token" при недопустимости токена.
"""
try:
return jwt.decode(jwt=token, key=self.secret, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")

View File

@ -12,7 +12,12 @@ from fastapi import Depends, HTTPException
from sqlalchemy import insert, select, update
from sqlalchemy.exc import IntegrityError
from lkeep.apps.auth.schemas import CreateUser, GetUserWithIDAndEmail, UserReturnData
from lkeep.apps.auth.schemas import (
CreateUser,
GetUserWithIDAndEmail,
UserReturnData,
UserVerifySchema,
)
from lkeep.core.core_dependency.db_dependency import DBDependency
from lkeep.core.core_dependency.redis_dependency import RedisDependency
from lkeep.database.models import User
@ -93,6 +98,27 @@ class UserManager:
return None
async def get_user_by_id(self, user_id: uuid.UUID | str) -> UserVerifySchema | None:
"""
Возвращает информацию о пользователе по его идентификатору.
:param user_id: Идентификатор пользователя, для которого нужно получить информацию.
Может быть представлен в виде UUID или строки.
:type user_id: uuid.UUID | str
:returns: Схема данных пользователя, если пользователь найден; None, если пользователь не найден.
:rtype: UserVerifySchema | None
"""
async with self.db.db_session() as session:
query = select(self.model.id, self.model.email).where(self.model.id == user_id)
result = await session.execute(query)
user = result.mappings().one_or_none()
if user:
return UserVerifySchema(**user)
return None
async def store_access_token(self, token: str, user_id: uuid.UUID | str, session_id: str) -> None:
"""
Сохраняет токен доступа в хранилище (Redis).
@ -106,3 +132,30 @@ class UserManager:
"""
async with self.redis.get_client() as client:
await client.set(f"{user_id}:{session_id}", token)
async def get_access_token(self, user_id: uuid.UUID | str, session_id: str) -> str | None:
"""
Получает токен доступа из кэша по идентификаторам пользователя и сессии.
:param user_id: Идентификатор пользователя, может быть в формате UUID или строка.
:type user_id: uuid.UUID | str
:param session_id: Идентификатор сессии, строка.
:type session_id: str
:returns: Токен доступа из кэша, если он существует; иначе None.
:rtype: str | None
"""
async with self.redis.get_client() as client:
return await client.get(f"{user_id}:{session_id}")
async def revoke_access_token(self, user_id: uuid.UUID | str, session_id: str | uuid.UUID | None) -> None:
"""
Отзывает доступный токен доступа пользователя.
:param user_id: Идентификатор пользователя, которому принадлежит токен.
:type user_id: uuid.UUID | str
:param session_id: Идентификатор сессии, для которой должен быть отозван токен.
Если None, то все сессии пользователя будут отозваны.
:type session_id: str | uuid.UUID | None
"""
async with self.redis.get_client() as client:
await client.delete(f"{user_id}:{session_id}")

View File

@ -6,11 +6,14 @@
https://pressanybutton.ru/category/servis-na-fastapi/
"""
from typing import Annotated
from fastapi import APIRouter, Depends
from starlette import status
from starlette.responses import JSONResponse
from lkeep.apps.auth.schemas import AuthUser, UserReturnData
from lkeep.apps.auth.depends import get_current_user
from lkeep.apps.auth.schemas import AuthUser, UserReturnData, UserVerifySchema
from lkeep.apps.auth.services import UserService
auth_router = APIRouter(prefix="/auth", tags=["auth"])
@ -63,3 +66,35 @@ async def login(user: AuthUser, service: UserService = Depends(UserService)) ->
:raises HTTPException: Если учетные данные не верны или произошла другая ошибка при входе.
"""
return await service.login_user(user=user)
@auth_router.get(path="/logout", status_code=status.HTTP_200_OK)
async def logout(
user: Annotated[UserVerifySchema, Depends(get_current_user)], service: UserService = Depends(UserService)
) -> JSONResponse:
"""
Описание функции logout.
:param user: Текущий авторизованный пользователь.
:type user: UserVerifySchema
:param service: Сервис для управления пользователями.
:type service: UserService
:returns: JSON-ответ, содержащий результат логаута.
:rtype: JSONResponse
"""
return await service.logout_user(user=user)
@auth_router.get(path="/get-user", status_code=status.HTTP_200_OK, response_model=UserVerifySchema)
async def get_auth_user(user: Annotated[UserVerifySchema, Depends(get_current_user)]) -> UserVerifySchema:
"""
Возвращает информацию об авторизованном пользователе.
:param user: Информация о пользователе, полученная с помощью механизма аутентификации.
:type user: UserVerifySchema
:return: Схема данных пользователя, содержащая необходимую информацию для работы системы.
:rtype: UserVerifySchema
:raises HTTPException 401: Если пользователь не авторизован и попытка получить доступ к защищенному ресурсу.
"""
return user

View File

@ -34,6 +34,17 @@ class GetUserByEmail(BaseModel):
email: EmailStr
class UserVerifySchema(GetUserByID, GetUserByEmail):
"""
Класс для валидации данных пользователя.
Данный класс наследует методы из классов GetUserByID и GetUserByEmail,
что позволяет использовать их функциональность для проверки данных пользователя.
"""
session_id: uuid.UUID | str | None = None
class AuthUser(GetUserByEmail):
"""
Класс для регистрации пользователя, наследующий класс GetUserByEmail.

View File

@ -13,7 +13,12 @@ from starlette.responses import JSONResponse
from lkeep.apps.auth.handlers import AuthHandler
from lkeep.apps.auth.managers import UserManager
from lkeep.apps.auth.schemas import AuthUser, CreateUser, UserReturnData
from lkeep.apps.auth.schemas import (
AuthUser,
CreateUser,
UserReturnData,
UserVerifySchema,
)
from lkeep.apps.auth.tasks import send_confirmation_email
from lkeep.core.settings import settings
@ -85,7 +90,7 @@ class UserService:
"""
exist_user = await self.manager.get_user_by_email(email=user.email)
if exist_user is None or not self.handler.verify_password(
if exist_user is None or not await self.handler.verify_password(
hashed_password=exist_user.hashed_password, raw_password=user.password
):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Wrong email or password")
@ -103,3 +108,20 @@ class UserService:
)
return response
async def logout_user(self, user: UserVerifySchema) -> JSONResponse:
"""
Отправляет запрос на выход пользователя из системы.
:param user: Схема, содержащая информацию о пользователе для аутентификации.
:type user: UserVerifySchema
:returns: Ответ сервера с сообщением об успешном выходе пользователя.
:rtype: JSONResponse
:raises Exception: Если произошла ошибка при отмене токена доступа.
"""
await self.manager.revoke_access_token(user_id=user.id, session_id=user.session_id)
response = JSONResponse(content={"message": "Logged out"})
response.delete_cookie(key="Authorization")
return response

27
lkeep/apps/auth/utils.py Normal file
View File

@ -0,0 +1,27 @@
"""
Проект: Lkeep
Автор: Иван Ашихмин
Год: 2025
Специально для проекта "Код на салфетке"
https://pressanybutton.ru/category/servis-na-fastapi/
"""
from fastapi import HTTPException
from starlette import status
from starlette.requests import Request
async def get_token_from_cookies(request: Request) -> str:
"""
Получает токен из куки запроса.
:param request: Объект HTTP-запроса.
:type request: Request
:return: Токен из cookies.
:rtype: str
:raises HTTPException: Если в запросе отсутствует cookie с ключом "Authorization".
"""
token = request.cookies.get("Authorization")
if token is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token is missing")
return token

0
tests/__init__.py Normal file
View File