mirror of
https://github.com/zoriya/Kyoo.git
synced 2025-07-07 18:24:14 -04:00
wip: Reconnect to database on connection failure
This commit is contained in:
parent
df23cf54d7
commit
12d9ad3183
@ -21,7 +21,7 @@ logging.getLogger("rebulk").setLevel(logging.WARNING)
|
|||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def lifespan(_):
|
async def lifespan(_):
|
||||||
async with (
|
async with (
|
||||||
init_pool(),
|
init_pool() as pool,
|
||||||
get_db() as db,
|
get_db() as db,
|
||||||
KyooClient() as client,
|
KyooClient() as client,
|
||||||
TheMovieDatabase() as tmdb,
|
TheMovieDatabase() as tmdb,
|
||||||
@ -30,18 +30,17 @@ async def lifespan(_):
|
|||||||
is_master = await db.fetchval("select pg_try_advisory_lock(198347)")
|
is_master = await db.fetchval("select pg_try_advisory_lock(198347)")
|
||||||
if is_master:
|
if is_master:
|
||||||
await migrate()
|
await migrate()
|
||||||
async with get_db() as scanner_db:
|
processor = RequestProcessor(pool, client, tmdb)
|
||||||
processor = RequestProcessor(db, client, tmdb)
|
scanner = FsScanner(client, RequestCreator(db))
|
||||||
scanner = FsScanner(client, RequestCreator(scanner_db))
|
tasks = create_task(
|
||||||
tasks = create_task(
|
background_startup(
|
||||||
background_startup(
|
scanner,
|
||||||
scanner,
|
processor,
|
||||||
processor,
|
is_master,
|
||||||
is_master,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
yield
|
)
|
||||||
_ = tasks.cancel()
|
yield
|
||||||
|
_ = tasks.cancel()
|
||||||
|
|
||||||
|
|
||||||
async def background_startup(
|
async def background_startup(
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from asyncio import CancelledError, Future, TaskGroup, sleep
|
from asyncio import CancelledError, Event, Future, TaskGroup, sleep
|
||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
from types import TracebackType
|
from types import TracebackType
|
||||||
from typing import Literal, cast
|
from typing import Literal, cast
|
||||||
@ -61,25 +61,45 @@ class RequestCreator:
|
|||||||
class RequestProcessor:
|
class RequestProcessor:
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
database: Connection,
|
pool: Pool,
|
||||||
client: KyooClient,
|
client: KyooClient,
|
||||||
providers: Provider,
|
providers: Provider,
|
||||||
):
|
):
|
||||||
self._database = database
|
self._pool = pool
|
||||||
|
self._database: Connection = None # type: ignore
|
||||||
self._client = client
|
self._client = client
|
||||||
self._providers = providers
|
self._providers = providers
|
||||||
|
|
||||||
async def listen(self, tg: TaskGroup):
|
async def listen(self, tg: TaskGroup):
|
||||||
|
closed = Event()
|
||||||
|
|
||||||
def process(*_):
|
def process(*_):
|
||||||
_ = tg.create_task(self.process_all())
|
_ = tg.create_task(self.process_all())
|
||||||
|
|
||||||
try:
|
def terminated(*_):
|
||||||
logger.info("Listening for requestes")
|
logger.info("terminated")
|
||||||
await self._database.add_listener("scanner_requests", process)
|
closed.set()
|
||||||
await Future()
|
|
||||||
except CancelledError as e:
|
while True:
|
||||||
logger.info("Stopped listening for requsets")
|
closed.clear()
|
||||||
await self._database.remove_listener("scanner_requests", process)
|
logger.info("aquire")
|
||||||
|
try:
|
||||||
|
async with self._pool.acquire(timeout=10) as db:
|
||||||
|
try:
|
||||||
|
self._database = cast(Connection, db)
|
||||||
|
self._database.add_termination_listener(terminated)
|
||||||
|
await self._database.add_listener("scanner_requests", process)
|
||||||
|
|
||||||
|
logger.info("Listening for requestes")
|
||||||
|
_ = await closed.wait()
|
||||||
|
logger.info("stopping...")
|
||||||
|
except CancelledError as e:
|
||||||
|
logger.info("Stopped listening for requsets")
|
||||||
|
await self._database.remove_listener("scanner_requests", process)
|
||||||
|
self._database.remove_termination_listener(terminated)
|
||||||
|
raise
|
||||||
|
except TimeoutError:
|
||||||
|
logger.info("temiout")
|
||||||
|
|
||||||
async def process_all(self):
|
async def process_all(self):
|
||||||
found = True
|
found = True
|
||||||
|
Loading…
x
Reference in New Issue
Block a user