From 2194831d869ed7970f9b25b24ac28d6b04f669df Mon Sep 17 00:00:00 2001 From: Zoe Roux Date: Mon, 17 Nov 2025 21:35:19 +0100 Subject: [PATCH] Manually instrument scanner --- scanner/scanner/fsscan.py | 4 +++ scanner/scanner/requests.py | 70 ++++++++++++++++++++----------------- 2 files changed, 42 insertions(+), 32 deletions(-) diff --git a/scanner/scanner/fsscan.py b/scanner/scanner/fsscan.py index 0fe6aef7..018d33a3 100644 --- a/scanner/scanner/fsscan.py +++ b/scanner/scanner/fsscan.py @@ -5,6 +5,7 @@ from logging import getLogger from mimetypes import guess_file_type from os.path import dirname, exists, isdir, join +from opentelemetry import trace from watchfiles import Change, awatch from .client import KyooClient @@ -16,6 +17,7 @@ from .models.videos import For, Video, VideoInfo from .requests import RequestCreator logger = getLogger(__name__) +tracer = trace.get_tracer("kyoo.scanner") @asynccontextmanager @@ -36,6 +38,7 @@ class FsScanner: except re.error as e: logger.error(f"Invalid ignore pattern. Ignoring. Error: {e}") + @tracer.start_as_current_span("scan") async def scan(self, path: str | None = None, remove_deleted=False): if path is None: path = self._root_path @@ -78,6 +81,7 @@ class FsScanner: except Exception as e: logger.error("Unexpected error while running scan.", exc_info=e) + @tracer.start_as_current_span("monitor") async def monitor(self): logger.info(f"Watching for new files in {self._root_path}") async for changes in awatch(self._root_path, ignore_permission_denied=True): diff --git a/scanner/scanner/requests.py b/scanner/scanner/requests.py index 5217b2b7..50ad5bff 100644 --- a/scanner/scanner/requests.py +++ b/scanner/scanner/requests.py @@ -3,6 +3,7 @@ from logging import getLogger from typing import cast from asyncpg import Connection, Pool +from opentelemetry import trace from pydantic import TypeAdapter from .client import KyooClient @@ -11,6 +12,7 @@ from .models.videos import Resource from .providers.provider import Provider logger = getLogger(__name__) +tracer = trace.get_tracer("kyoo.scanner") class RequestCreator: @@ -54,6 +56,7 @@ class RequestProcessor: self._client = client self._providers = providers + @tracer.start_as_current_span("listen_requests") async def listen(self, tg: TaskGroup): closed = Event() @@ -118,40 +121,43 @@ class RequestProcessor: return False request = Request.model_validate(cur) - logger.info(f"Starting to process {request.title}") - try: - show = await self._run_request(request) - finished = await self._database.fetchrow( - """ - delete from scanner.requests - where pk = $1 - returning - videos - """, - request.pk, - ) - if finished and finished["videos"] != request.videos: - videos = TypeAdapter(list[Request.Video]).validate_python( - finished["videos"] + with tracer.start_as_current_span(f"process {request.title}") as span: + logger.info(f"Starting to process {request.title}") + try: + show = await self._run_request(request) + finished = await self._database.fetchrow( + """ + delete from scanner.requests + where pk = $1 + returning + videos + """, + request.pk, ) - await self._client.link_videos( - "movie" if request.kind == "movie" else "serie", - show.slug, - videos, + if finished and finished["videos"] != request.videos: + videos = TypeAdapter(list[Request.Video]).validate_python( + finished["videos"] + ) + await self._client.link_videos( + "movie" if request.kind == "movie" else "serie", + show.slug, + videos, + ) + except Exception as e: + span.set_status(trace.Status(trace.StatusCode.ERROR)) + span.record_exception(e) + logger.error("Couldn't process request", exc_info=e) + cur = await self._database.execute( + """ + update + scanner.requests + set + status = 'failed' + where + pk = $1 + """, + request.pk, ) - except Exception as e: - logger.error("Couldn't process request", exc_info=e) - cur = await self._database.execute( - """ - update - scanner.requests - set - status = 'failed' - where - pk = $1 - """, - request.pk, - ) return True async def _run_request(self, request: Request) -> Resource: