Rewrite autosync to use msgspec and aio-pika (#435)

This commit is contained in:
Zoe Roux 2024-04-23 23:55:13 +02:00 committed by GitHub
parent 580109666f
commit a2a58422a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 113 additions and 138 deletions

View File

@ -1,67 +1,11 @@
import logging
import os
async def main():
import logging
from autosync.services.simkl import Simkl
from autosync.services.aggregate import Aggregate
from autosync.consumer import Consumer
import dataclasses_json
from datetime import datetime
from marshmallow import fields
logging.basicConfig(level=logging.INFO)
dataclasses_json.cfg.global_config.encoders[datetime] = datetime.isoformat
dataclasses_json.cfg.global_config.decoders[datetime] = datetime.fromisoformat
dataclasses_json.cfg.global_config.mm_fields[datetime] = fields.DateTime(format="iso")
dataclasses_json.cfg.global_config.encoders[datetime | None] = datetime.isoformat
dataclasses_json.cfg.global_config.decoders[datetime | None] = datetime.fromisoformat
dataclasses_json.cfg.global_config.mm_fields[datetime | None] = fields.DateTime(
format="iso"
)
import pika
from pika import spec
from pika.adapters.blocking_connection import BlockingChannel
import pika.credentials
from autosync.models.message import Message
from autosync.services.aggregate import Aggregate
from autosync.services.simkl import Simkl
logging.basicConfig(level=logging.INFO)
service = Aggregate([Simkl()])
def on_message(
ch: BlockingChannel,
method: spec.Basic.Deliver,
properties: spec.BasicProperties,
body: bytes,
):
try:
message = Message.from_json(body)
service.update(message.value.user, message.value.resource, message.value)
except Exception as e:
logging.exception("Error processing message.", exc_info=e)
logging.exception("Body: %s", body)
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
port=os.environ.get("RABBITMQ_PORT", 5672),
credentials=pika.credentials.PlainCredentials(
os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
),
)
)
channel = connection.channel()
channel.exchange_declare(exchange="events.watched", exchange_type="topic")
result = channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange="events.watched", queue=queue_name, routing_key="#")
channel.basic_consume(
queue=queue_name, on_message_callback=on_message, auto_ack=True
)
logging.info("Listening for autosync.")
channel.start_consuming()
service = Aggregate([Simkl()])
async with Consumer() as consumer:
await consumer.listen(service)

View File

@ -1,4 +1,6 @@
#!/usr/bin/env python
import asyncio
import autosync
autosync.main()
asyncio.run(autosync.main())

View File

@ -0,0 +1,51 @@
import asyncio
from msgspec import json
import os
from logging import getLogger
from aio_pika import ExchangeType, connect_robust
from aio_pika.abc import AbstractIncomingMessage
from autosync.services.service import Service
from autosync.models.message import Message
logger = getLogger(__name__)
class Consumer:
QUEUE = "autosync"
async def __aenter__(self):
self._con = await connect_robust(
host=os.environ.get("RABBITMQ_HOST", "rabbitmq"),
port=int(os.environ.get("RABBITMQ_PORT", "5672")),
login=os.environ.get("RABBITMQ_DEFAULT_USER", "guest"),
password=os.environ.get("RABBITMQ_DEFAULT_PASS", "guest"),
)
self._channel = await self._con.channel()
self._exchange = await self._channel.declare_exchange(
"events.watched", type=ExchangeType.TOPIC
)
self._queue = await self._channel.declare_queue(self.QUEUE)
await self._queue.bind(self._exchange, routing_key="#")
return self
async def __aexit__(self, exc_type, exc_value, exc_tb):
await self._con.close()
async def listen(self, service: Service):
async def on_message(message: AbstractIncomingMessage):
try:
msg = json.decode(message.body, type=Message)
service.update(msg.value.user, msg.value.resource, msg.value)
await message.ack()
except Exception as e:
logger.exception("Unhandled error", exc_info=e)
await message.reject()
# Allow up to 20 requests to run in parallel on the same listener.
# Since most work is calling API not doing that is a waste.
await self._channel.set_qos(prefetch_count=20)
await self._queue.consume(on_message)
logger.info("Listening for autosync.")
await asyncio.Future()

View File

@ -1,18 +1,12 @@
from typing import Literal
from dataclasses import dataclass
from dataclasses_json import dataclass_json, LetterCase
from msgspec import Struct
from autosync.models.show import Show
from .metadataid import MetadataID
from .metadataid import EpisodeID
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class Episode:
external_id: dict[str, MetadataID]
class Episode(Struct, rename="camel", tag_field="kind", tag="episode"):
external_id: dict[str, EpisodeID]
show: Show
season_number: int
episode_number: int
absolute_number: int
kind: Literal["episode"]

View File

@ -1,6 +1,4 @@
from dataclasses import dataclass
from dataclasses_json import DataClassJsonMixin, dataclass_json, LetterCase
from msgspec import Struct
from autosync.models.episode import Episode
from autosync.models.movie import Movie
from autosync.models.show import Show
@ -8,16 +6,12 @@ from autosync.models.user import User
from autosync.models.watch_status import WatchStatus
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class WatchStatusMessage(WatchStatus):
user: User
resource: Movie | Show | Episode
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class Message(DataClassJsonMixin):
class Message(Struct, rename="camel"):
action: str
type: str
value: WatchStatusMessage

View File

@ -1,10 +1,14 @@
from dataclasses import dataclass
from dataclasses_json import dataclass_json, LetterCase
from msgspec import Struct
from typing import Optional
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class MetadataID:
class MetadataID(Struct, rename="camel"):
data_id: str
link: Optional[str]
class EpisodeID(Struct, rename="camel"):
show_id: str
season: Optional[int]
episode: int
link: Optional[str]

View File

@ -1,18 +1,14 @@
from typing import Literal, Optional
from datetime import datetime
from dataclasses import dataclass
from dataclasses_json import dataclass_json, LetterCase
from typing import Optional
from datetime import date
from msgspec import Struct
from .metadataid import MetadataID
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class Movie:
class Movie(Struct, rename="camel", tag_field="kind", tag="movie"):
name: str
air_date: Optional[datetime]
air_date: Optional[date]
external_id: dict[str, MetadataID]
kind: Literal["movie"]
@property
def year(self):

View File

@ -1,18 +1,14 @@
from typing import Literal, Optional
from datetime import datetime
from dataclasses import dataclass
from dataclasses_json import dataclass_json, LetterCase
from typing import Optional
from datetime import date
from msgspec import Struct
from .metadataid import MetadataID
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class Show:
class Show(Struct, rename="camel", tag_field="kind", tag="show"):
name: str
start_air: Optional[datetime]
start_air: Optional[date]
external_id: dict[str, MetadataID]
kind: Literal["show"]
@property
def year(self):

View File

@ -1,31 +1,23 @@
from datetime import datetime, time
from dataclasses import dataclass
from dataclasses_json import dataclass_json, LetterCase
from msgspec import Struct
from datetime import datetime
from typing import Optional
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class JwtToken:
class JwtToken(Struct):
token_type: str
access_token: str
refresh_token: Optional[str]
expire_in: time
expire_at: datetime
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class ExternalToken:
class ExternalToken(Struct, rename="camel"):
id: str
username: str
profileUrl: Optional[str]
profile_url: Optional[str]
token: JwtToken
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class User:
class User(Struct, rename="camel", tag_field="kind", tag="user"):
id: str
username: str
email: str

View File

@ -1,9 +1,9 @@
from datetime import datetime
from dataclasses import dataclass
from dataclasses_json import dataclass_json, LetterCase
from typing import Optional
from enum import Enum
from msgspec import Struct
class Status(str, Enum):
COMPLETED = "Completed"
@ -13,9 +13,7 @@ class Status(str, Enum):
DELETED = "Deleted"
@dataclass_json(letter_case=LetterCase.CAMEL)
@dataclass
class WatchStatus:
class WatchStatus(Struct, rename="camel"):
added_date: datetime
played_date: Optional[datetime]
status: Status

View File

@ -1,4 +1,4 @@
import logging
from logging import getLogger
from autosync.services.service import Service
from ..models.user import User
from ..models.show import Show
@ -6,11 +6,13 @@ from ..models.movie import Movie
from ..models.episode import Episode
from ..models.watch_status import WatchStatus
logger = getLogger(__name__)
class Aggregate(Service):
def __init__(self, services: list[Service]):
self._services = [x for x in services if x.enabled]
logging.info("Autosync enabled with %s", [x.name for x in self._services])
logger.info("Autosync enabled with %s", [x.name for x in self._services])
@property
def name(self) -> str:
@ -21,6 +23,6 @@ class Aggregate(Service):
try:
service.update(user, resource, status)
except Exception as e:
logging.exception(
logger.exception(
"Unhandled error on autosync %s:", service.name, exc_info=e
)

View File

@ -1,6 +1,6 @@
import os
import requests
import logging
from logging import getLogger
from autosync.models.metadataid import MetadataID
from autosync.services.service import Service
@ -10,6 +10,8 @@ from ..models.movie import Movie
from ..models.episode import Episode
from ..models.watch_status import WatchStatus, Status
logger = getLogger(__name__)
class Simkl(Service):
def __init__(self) -> None:
@ -29,7 +31,7 @@ class Simkl(Service):
watch_date = status.played_date or status.added_date
if resource.kind == "episode":
if isinstance(resource, Episode):
if status.status != Status.COMPLETED:
return
@ -60,10 +62,10 @@ class Simkl(Service):
"simkl-api-key": self._api_key,
},
)
logging.info("Simkl response: %s %s", resp.status_code, resp.text)
logger.info("Simkl response: %s %s", resp.status_code, resp.text)
return
category = "movies" if resource.kind == "movie" else "shows"
category = "movies" if isinstance(resource, Movie) else "shows"
simkl_status = self._map_status(status.status)
if simkl_status is None:
@ -89,7 +91,7 @@ class Simkl(Service):
"simkl-api-key": self._api_key,
},
)
logging.info("Simkl response: %s %s", resp.status_code, resp.text)
logger.info("Simkl response: %s %s", resp.status_code, resp.text)
def _map_status(self, status: Status):
match status:

View File

@ -1,3 +1,3 @@
pika
aio-pika
msgspec
requests
dataclasses-json

View File

@ -2,13 +2,13 @@ import asyncio
from typing import Union, Literal
from msgspec import Struct, json
import os
import logging
from logging import getLogger
from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage
from matcher.matcher import Matcher
logger = logging.getLogger(__name__)
logger = getLogger(__name__)
class Message(Struct, tag_field="action", tag=str.lower):