feat: добавлен класс для работы с Redis

- Создан новый файл `redis_dependency.py` в каталоге `lkeep/core/core_dependency`.
- Добавлен класс `RedisDependency` для управления соединениями с Redis и взаимодействия с базой данных.
- Реализован метод `_init_pool` для инициализации пула соединений.
- Создан асинхронный контекст менеджер `get_client` для получения клиентской сессии Redis.
This commit is contained in:
proDream 2025-04-10 12:48:37 +04:00
parent c33e898218
commit e2d0669064

View File

@ -0,0 +1,55 @@
"""
Проект: Lkeep
Автор: Иван Ашихмин
Год: 2025
Специально для проекта "Код на салфетке"
https://pressanybutton.ru/category/servis-na-fastapi/
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from redis.asyncio import ConnectionPool, Redis
from lkeep.core.settings import settings
class RedisDependency:
"""
Класс, предоставляющий инструменты для работы с Redis через асинхронный клиент.
:ivar _url: URL подключения к Redis серверу.
:type _url: str
:ivar _pool: Пул соединений для управления соединениями с Redis.
:type _pool: ConnectionPool
"""
def __init__(self) -> None:
"""
Инициализирует экземпляр класса для работы с Redis.
"""
self._url = settings.redis_settings.redis_url
self._pool: ConnectionPool = self._init_pool()
def _init_pool(self) -> ConnectionPool:
"""
Инициализирует пул соединений Redis.
:returns: Пул соединений для работы с Redis.
:rtype: ConnectionPool
"""
return ConnectionPool.from_url(url=self._url, encoding="utf-8", decode_responses=True)
@asynccontextmanager
async def get_client(self) -> AsyncGenerator[Redis, None]:
"""
Получает клиентскую сессию Redis для взаимодействия с базой данных.
:returns: Асинхронный генератор клиента Redis.
:rtype: AsyncGenerator[Redis, None]
"""
redis_client = Redis(connection_pool=self._pool)
try:
yield redis_client
finally:
await redis_client.aclose()