Manually instrument scanner

This commit is contained in:
Zoe Roux 2025-11-17 21:35:19 +01:00
parent 7124a3d3c6
commit 2194831d86
No known key found for this signature in database
2 changed files with 42 additions and 32 deletions

View File

@ -5,6 +5,7 @@ from logging import getLogger
from mimetypes import guess_file_type from mimetypes import guess_file_type
from os.path import dirname, exists, isdir, join from os.path import dirname, exists, isdir, join
from opentelemetry import trace
from watchfiles import Change, awatch from watchfiles import Change, awatch
from .client import KyooClient from .client import KyooClient
@ -16,6 +17,7 @@ from .models.videos import For, Video, VideoInfo
from .requests import RequestCreator from .requests import RequestCreator
logger = getLogger(__name__) logger = getLogger(__name__)
tracer = trace.get_tracer("kyoo.scanner")
@asynccontextmanager @asynccontextmanager
@ -36,6 +38,7 @@ class FsScanner:
except re.error as e: except re.error as e:
logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}") logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}")
@tracer.start_as_current_span("scan")
async def scan(self, path: str | None = None, remove_deleted=False): async def scan(self, path: str | None = None, remove_deleted=False):
if path is None: if path is None:
path = self._root_path path = self._root_path
@ -78,6 +81,7 @@ class FsScanner:
except Exception as e: except Exception as e:
logger.error("Unexpected error while running scan.", exc_info=e) logger.error("Unexpected error while running scan.", exc_info=e)
@tracer.start_as_current_span("monitor")
async def monitor(self): async def monitor(self):
logger.info(f"Watching for new files in {self._root_path}") logger.info(f"Watching for new files in {self._root_path}")
async for changes in awatch(self._root_path, ignore_permission_denied=True): async for changes in awatch(self._root_path, ignore_permission_denied=True):

View File

@ -3,6 +3,7 @@ from logging import getLogger
from typing import cast from typing import cast
from asyncpg import Connection, Pool from asyncpg import Connection, Pool
from opentelemetry import trace
from pydantic import TypeAdapter from pydantic import TypeAdapter
from .client import KyooClient from .client import KyooClient
@ -11,6 +12,7 @@ from .models.videos import Resource
from .providers.provider import Provider from .providers.provider import Provider
logger = getLogger(__name__) logger = getLogger(__name__)
tracer = trace.get_tracer("kyoo.scanner")
class RequestCreator: class RequestCreator:
@ -54,6 +56,7 @@ class RequestProcessor:
self._client = client self._client = client
self._providers = providers self._providers = providers
@tracer.start_as_current_span("listen_requests")
async def listen(self, tg: TaskGroup): async def listen(self, tg: TaskGroup):
closed = Event() closed = Event()
@ -118,40 +121,43 @@ class RequestProcessor:
return False return False
request = Request.model_validate(cur) request = Request.model_validate(cur)
logger.info(f"Starting to process {request.title}") with tracer.start_as_current_span(f"process {request.title}") as span:
try: logger.info(f"Starting to process {request.title}")
show = await self._run_request(request) try:
finished = await self._database.fetchrow( show = await self._run_request(request)
""" finished = await self._database.fetchrow(
delete from scanner.requests """
where pk = $1 delete from scanner.requests
returning where pk = $1
videos returning
""", videos
request.pk, """,
) request.pk,
if finished and finished["videos"] != request.videos:
videos = TypeAdapter(list[Request.Video]).validate_python(
finished["videos"]
) )
await self._client.link_videos( if finished and finished["videos"] != request.videos:
"movie" if request.kind == "movie" else "serie", videos = TypeAdapter(list[Request.Video]).validate_python(
show.slug, finished["videos"]
videos, )
await self._client.link_videos(
"movie" if request.kind == "movie" else "serie",
show.slug,
videos,
)
except Exception as e:
span.set_status(trace.Status(trace.StatusCode.ERROR))
span.record_exception(e)
logger.error("Couldn't process request", exc_info=e)
cur = await self._database.execute(
"""
update
scanner.requests
set
status = 'failed'
where
pk = $1
""",
request.pk,
) )
except Exception as e:
logger.error("Couldn't process request", exc_info=e)
cur = await self._database.execute(
"""
update
scanner.requests
set
status = 'failed'
where
pk = $1
""",
request.pk,
)
return True return True
async def _run_request(self, request: Request) -> Resource: async def _run_request(self, request: Request) -> Resource: