diff --git a/autosync/autosync/__init__.py b/autosync/autosync/__init__.py index 58727949..083dc892 100644 --- a/autosync/autosync/__init__.py +++ b/autosync/autosync/__init__.py @@ -1,67 +1,11 @@ -import logging -import os +async def main(): + import logging + from autosync.services.simkl import Simkl + from autosync.services.aggregate import Aggregate + from autosync.consumer import Consumer -import dataclasses_json -from datetime import datetime -from marshmallow import fields + logging.basicConfig(level=logging.INFO) -dataclasses_json.cfg.global_config.encoders[datetime] = datetime.isoformat -dataclasses_json.cfg.global_config.decoders[datetime] = datetime.fromisoformat -dataclasses_json.cfg.global_config.mm_fields[datetime] = fields.DateTime(format="iso") -dataclasses_json.cfg.global_config.encoders[datetime | None] = datetime.isoformat -dataclasses_json.cfg.global_config.decoders[datetime | None] = datetime.fromisoformat -dataclasses_json.cfg.global_config.mm_fields[datetime | None] = fields.DateTime( - format="iso" -) - -import pika -from pika import spec -from pika.adapters.blocking_connection import BlockingChannel -import pika.credentials -from autosync.models.message import Message -from autosync.services.aggregate import Aggregate - -from autosync.services.simkl import Simkl - - -logging.basicConfig(level=logging.INFO) -service = Aggregate([Simkl()]) - - -def on_message( - ch: BlockingChannel, - method: spec.Basic.Deliver, - properties: spec.BasicProperties, - body: bytes, -): - try: - message = Message.from_json(body) - service.update(message.value.user, message.value.resource, message.value) - except Exception as e: - logging.exception("Error processing message.", exc_info=e) - logging.exception("Body: %s", body) - - -def main(): - connection = pika.BlockingConnection( - pika.ConnectionParameters( - host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), - port=os.environ.get("RABBITMQ_PORT", 5672), - credentials=pika.credentials.PlainCredentials( - os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), - os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), - ), - ) - ) - channel = connection.channel() - - channel.exchange_declare(exchange="events.watched", exchange_type="topic") - result = channel.queue_declare("", exclusive=True) - queue_name = result.method.queue - channel.queue_bind(exchange="events.watched", queue=queue_name, routing_key="#") - - channel.basic_consume( - queue=queue_name, on_message_callback=on_message, auto_ack=True - ) - logging.info("Listening for autosync.") - channel.start_consuming() + service = Aggregate([Simkl()]) + async with Consumer() as consumer: + await consumer.listen(service) diff --git a/autosync/autosync/__main__.py b/autosync/autosync/__main__.py index 85d7bc52..a410d7b2 100644 --- a/autosync/autosync/__main__.py +++ b/autosync/autosync/__main__.py @@ -1,4 +1,6 @@ #!/usr/bin/env python + +import asyncio import autosync -autosync.main() +asyncio.run(autosync.main()) diff --git a/autosync/autosync/consumer.py b/autosync/autosync/consumer.py new file mode 100644 index 00000000..f530309d --- /dev/null +++ b/autosync/autosync/consumer.py @@ -0,0 +1,51 @@ +import asyncio +from msgspec import json +import os +from logging import getLogger +from aio_pika import ExchangeType, connect_robust +from aio_pika.abc import AbstractIncomingMessage + +from autosync.services.service import Service +from autosync.models.message import Message + + +logger = getLogger(__name__) + + +class Consumer: + QUEUE = "autosync" + + async def __aenter__(self): + self._con = await connect_robust( + host=os.environ.get("RABBITMQ_HOST", "rabbitmq"), + port=int(os.environ.get("RABBITMQ_PORT", "5672")), + login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"), + password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"), + ) + self._channel = await self._con.channel() + self._exchange = await self._channel.declare_exchange( + "events.watched", type=ExchangeType.TOPIC + ) + self._queue = await self._channel.declare_queue(self.QUEUE) + await self._queue.bind(self._exchange, routing_key="#") + return self + + async def __aexit__(self, exc_type, exc_value, exc_tb): + await self._con.close() + + async def listen(self, service: Service): + async def on_message(message: AbstractIncomingMessage): + try: + msg = json.decode(message.body, type=Message) + service.update(msg.value.user, msg.value.resource, msg.value) + await message.ack() + except Exception as e: + logger.exception("Unhandled error", exc_info=e) + await message.reject() + + # Allow up to 20 requests to run in parallel on the same listener. + # Since most work is calling API not doing that is a waste. + await self._channel.set_qos(prefetch_count=20) + await self._queue.consume(on_message) + logger.info("Listening for autosync.") + await asyncio.Future() diff --git a/autosync/autosync/models/episode.py b/autosync/autosync/models/episode.py index 6378e148..0623ec52 100644 --- a/autosync/autosync/models/episode.py +++ b/autosync/autosync/models/episode.py @@ -1,18 +1,12 @@ -from typing import Literal -from dataclasses import dataclass -from dataclasses_json import dataclass_json, LetterCase - +from msgspec import Struct from autosync.models.show import Show -from .metadataid import MetadataID +from .metadataid import EpisodeID -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass -class Episode: - external_id: dict[str, MetadataID] +class Episode(Struct, rename="camel", tag_field="kind", tag="episode"): + external_id: dict[str, EpisodeID] show: Show season_number: int episode_number: int absolute_number: int - kind: Literal["episode"] diff --git a/autosync/autosync/models/message.py b/autosync/autosync/models/message.py index d24f39fc..36d37d6c 100644 --- a/autosync/autosync/models/message.py +++ b/autosync/autosync/models/message.py @@ -1,6 +1,4 @@ -from dataclasses import dataclass -from dataclasses_json import DataClassJsonMixin, dataclass_json, LetterCase - +from msgspec import Struct from autosync.models.episode import Episode from autosync.models.movie import Movie from autosync.models.show import Show @@ -8,16 +6,12 @@ from autosync.models.user import User from autosync.models.watch_status import WatchStatus -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass class WatchStatusMessage(WatchStatus): user: User resource: Movie | Show | Episode -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass -class Message(DataClassJsonMixin): +class Message(Struct, rename="camel"): action: str type: str value: WatchStatusMessage diff --git a/autosync/autosync/models/metadataid.py b/autosync/autosync/models/metadataid.py index a9ec2267..13b67458 100644 --- a/autosync/autosync/models/metadataid.py +++ b/autosync/autosync/models/metadataid.py @@ -1,10 +1,14 @@ -from dataclasses import dataclass -from dataclasses_json import dataclass_json, LetterCase +from msgspec import Struct from typing import Optional -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass -class MetadataID: +class MetadataID(Struct, rename="camel"): data_id: str link: Optional[str] + + +class EpisodeID(Struct, rename="camel"): + show_id: str + season: Optional[int] + episode: int + link: Optional[str] diff --git a/autosync/autosync/models/movie.py b/autosync/autosync/models/movie.py index dd109ec6..873f1823 100644 --- a/autosync/autosync/models/movie.py +++ b/autosync/autosync/models/movie.py @@ -1,18 +1,14 @@ -from typing import Literal, Optional -from datetime import datetime -from dataclasses import dataclass -from dataclasses_json import dataclass_json, LetterCase +from typing import Optional +from datetime import date +from msgspec import Struct from .metadataid import MetadataID -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass -class Movie: +class Movie(Struct, rename="camel", tag_field="kind", tag="movie"): name: str - air_date: Optional[datetime] + air_date: Optional[date] external_id: dict[str, MetadataID] - kind: Literal["movie"] @property def year(self): diff --git a/autosync/autosync/models/show.py b/autosync/autosync/models/show.py index 261d215c..978b99b9 100644 --- a/autosync/autosync/models/show.py +++ b/autosync/autosync/models/show.py @@ -1,18 +1,14 @@ -from typing import Literal, Optional -from datetime import datetime -from dataclasses import dataclass -from dataclasses_json import dataclass_json, LetterCase +from typing import Optional +from datetime import date +from msgspec import Struct from .metadataid import MetadataID -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass -class Show: +class Show(Struct, rename="camel", tag_field="kind", tag="show"): name: str - start_air: Optional[datetime] + start_air: Optional[date] external_id: dict[str, MetadataID] - kind: Literal["show"] @property def year(self): diff --git a/autosync/autosync/models/user.py b/autosync/autosync/models/user.py index fe393499..51301a35 100644 --- a/autosync/autosync/models/user.py +++ b/autosync/autosync/models/user.py @@ -1,31 +1,23 @@ -from datetime import datetime, time -from dataclasses import dataclass -from dataclasses_json import dataclass_json, LetterCase +from msgspec import Struct +from datetime import datetime from typing import Optional -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass -class JwtToken: +class JwtToken(Struct): token_type: str access_token: str refresh_token: Optional[str] - expire_in: time expire_at: datetime -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass -class ExternalToken: +class ExternalToken(Struct, rename="camel"): id: str username: str - profileUrl: Optional[str] + profile_url: Optional[str] token: JwtToken -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass -class User: +class User(Struct, rename="camel", tag_field="kind", tag="user"): id: str username: str email: str diff --git a/autosync/autosync/models/watch_status.py b/autosync/autosync/models/watch_status.py index 2e85b542..815d8a66 100644 --- a/autosync/autosync/models/watch_status.py +++ b/autosync/autosync/models/watch_status.py @@ -1,9 +1,9 @@ from datetime import datetime -from dataclasses import dataclass -from dataclasses_json import dataclass_json, LetterCase from typing import Optional from enum import Enum +from msgspec import Struct + class Status(str, Enum): COMPLETED = "Completed" @@ -13,9 +13,7 @@ class Status(str, Enum): DELETED = "Deleted" -@dataclass_json(letter_case=LetterCase.CAMEL) -@dataclass -class WatchStatus: +class WatchStatus(Struct, rename="camel"): added_date: datetime played_date: Optional[datetime] status: Status diff --git a/autosync/autosync/services/aggregate.py b/autosync/autosync/services/aggregate.py index 6044ed46..2b5decb0 100644 --- a/autosync/autosync/services/aggregate.py +++ b/autosync/autosync/services/aggregate.py @@ -1,4 +1,4 @@ -import logging +from logging import getLogger from autosync.services.service import Service from ..models.user import User from ..models.show import Show @@ -6,11 +6,13 @@ from ..models.movie import Movie from ..models.episode import Episode from ..models.watch_status import WatchStatus +logger = getLogger(__name__) + class Aggregate(Service): def __init__(self, services: list[Service]): self._services = [x for x in services if x.enabled] - logging.info("Autosync enabled with %s", [x.name for x in self._services]) + logger.info("Autosync enabled with %s", [x.name for x in self._services]) @property def name(self) -> str: @@ -21,6 +23,6 @@ class Aggregate(Service): try: service.update(user, resource, status) except Exception as e: - logging.exception( + logger.exception( "Unhandled error on autosync %s:", service.name, exc_info=e ) diff --git a/autosync/autosync/services/simkl.py b/autosync/autosync/services/simkl.py index 74df8cbc..b9183dd6 100644 --- a/autosync/autosync/services/simkl.py +++ b/autosync/autosync/services/simkl.py @@ -1,6 +1,6 @@ import os import requests -import logging +from logging import getLogger from autosync.models.metadataid import MetadataID from autosync.services.service import Service @@ -10,6 +10,8 @@ from ..models.movie import Movie from ..models.episode import Episode from ..models.watch_status import WatchStatus, Status +logger = getLogger(__name__) + class Simkl(Service): def __init__(self) -> None: @@ -29,7 +31,7 @@ class Simkl(Service): watch_date = status.played_date or status.added_date - if resource.kind == "episode": + if isinstance(resource, Episode): if status.status != Status.COMPLETED: return @@ -60,10 +62,10 @@ class Simkl(Service): "simkl-api-key": self._api_key, }, ) - logging.info("Simkl response: %s %s", resp.status_code, resp.text) + logger.info("Simkl response: %s %s", resp.status_code, resp.text) return - category = "movies" if resource.kind == "movie" else "shows" + category = "movies" if isinstance(resource, Movie) else "shows" simkl_status = self._map_status(status.status) if simkl_status is None: @@ -89,7 +91,7 @@ class Simkl(Service): "simkl-api-key": self._api_key, }, ) - logging.info("Simkl response: %s %s", resp.status_code, resp.text) + logger.info("Simkl response: %s %s", resp.status_code, resp.text) def _map_status(self, status: Status): match status: diff --git a/autosync/requirements.txt b/autosync/requirements.txt index 0976c85c..dc75c80b 100644 --- a/autosync/requirements.txt +++ b/autosync/requirements.txt @@ -1,3 +1,3 @@ -pika +aio-pika +msgspec requests -dataclasses-json diff --git a/scanner/matcher/subscriber.py b/scanner/matcher/subscriber.py index ca2fc603..acbe3e36 100644 --- a/scanner/matcher/subscriber.py +++ b/scanner/matcher/subscriber.py @@ -2,13 +2,13 @@ import asyncio from typing import Union, Literal from msgspec import Struct, json import os -import logging +from logging import getLogger from aio_pika import connect_robust from aio_pika.abc import AbstractIncomingMessage from matcher.matcher import Matcher -logger = logging.getLogger(__name__) +logger = getLogger(__name__) class Message(Struct, tag_field="action", tag=str.lower):