Я работаю над функцией для нашей базы кода, где
ASK: Нам нужна возможность переподключиться к базе данных в случае потери соединения с базой данных. Интервал повторного подключения может быть экспоненциальным, если попытка повторного подключения не удалась. В рамках изменений нам также необходимо решить, как обрабатывать данные, которые находятся в памяти, чтобы убедиться, что устаревшие данные не фиксируются в базе данных.
у нас есть существующий код, я слышал об обработке такой ситуации с пессимисти c отключиться. подходит ли такой подход для моей нынешней ситуации, и если бы кто-то мог направить меня как таковой, было бы здорово.
class AioDatabase(Inspectable):
def __init__(self, url, thread_count=5, loop=None, **options):
self._loop = loop
self.engine = self._build_engine(url, options)
self.executor = self._build_executor(thread_count)
def _build_engine(self, url, options):
return create_engine(url, _coerce_config=True, **options)
def _build_executor(self, thread_count):
return ThreadPoolExecutor(thread_count)
def execute(self, *args, **kwargs):
return self.run(self._execute, *args, **kwargs)
async def dispose(self):
await self.run(self._dispose)
self.executor.shutdown()
def run(self, func, *args, **kwargs):
loop = self._loop or get_event_loop()
func = func if not kwargs else partial(func, **kwargs)
return loop.run_in_executor(self.executor, func, self.engine, *args)
def _execute(self, engine, *args, **kwargs):
result = engine.execute(*args, **kwargs)
if result.returns_rows:
rows = result.fetchall()
return ResultSet(rows)
def _dispose(self, engine):
return engine.dispose()
async def is_healthy(self):
try:
await self.execute(PING_COMMAND)
except Exception as e:
return False
else:
return True
async def inspect(self):
return {
'engine': str(self.engine),
'connected': await self.is_healthy()
}