Add some type hints

This commit is contained in:
krateng 2024-03-14 17:51:28 +01:00
parent 163746c06e
commit 915808a020

View File

@ -1,3 +1,5 @@
from typing import TypedDict, Optional, cast
import sqlalchemy as sql import sqlalchemy as sql
from sqlalchemy.dialects.sqlite import insert as sqliteinsert from sqlalchemy.dialects.sqlite import insert as sqliteinsert
import json import json
@ -213,6 +215,25 @@ def set_maloja_info(info,dbconn=None):
# The last two fields are not returned under normal circumstances # The last two fields are not returned under normal circumstances
class AlbumDict(TypedDict):
albumtitle: str
artists: list[str]
class TrackDict(TypedDict):
artists: list[str]
title: str
album: AlbumDict
length: int | None
class ScrobbleDict(TypedDict):
time: int
track: TrackDict
duration: int
origin: str
extra: Optional[dict]
rawscrobble: Optional[dict]
##### Conversions between DB and dicts ##### Conversions between DB and dicts
@ -222,73 +243,78 @@ def set_maloja_info(info,dbconn=None):
### DB -> DICT ### DB -> DICT
def scrobbles_db_to_dict(rows,include_internal=False,dbconn=None): def scrobbles_db_to_dict(rows, include_internal=False, dbconn=None) -> list[ScrobbleDict]:
tracks = get_tracks_map(set(row.track_id for row in rows),dbconn=dbconn) tracks: list[TrackDict] = get_tracks_map(set(row.track_id for row in rows), dbconn=dbconn)
return [ return [
{ cast({
**{ **{
"time": row.timestamp, "time": row.timestamp,
"track": tracks[row.track_id], "track": tracks[row.track_id],
"duration": row.duration, "duration": row.duration,
"origin":row.origin, "origin": row.origin
}, },
**({ **({
"extra": json.loads(row.extra or '{}'), "extra": json.loads(row.extra or '{}'),
"rawscrobble": json.loads(row.rawscrobble or '{}') "rawscrobble": json.loads(row.rawscrobble or '{}')
} if include_internal else {}) } if include_internal else {})
} }, ScrobbleDict)
for row in rows for row in rows
] ]
def scrobble_db_to_dict(row,dbconn=None):
def scrobble_db_to_dict(row, dbconn=None) -> ScrobbleDict:
return scrobbles_db_to_dict([row], dbconn=dbconn)[0] return scrobbles_db_to_dict([row], dbconn=dbconn)[0]
def tracks_db_to_dict(rows,dbconn=None):
def tracks_db_to_dict(rows, dbconn=None) -> list[TrackDict]:
artists = get_artists_of_tracks(set(row.id for row in rows), dbconn=dbconn) artists = get_artists_of_tracks(set(row.id for row in rows), dbconn=dbconn)
albums = get_albums_map(set(row.album_id for row in rows), dbconn=dbconn) albums = get_albums_map(set(row.album_id for row in rows), dbconn=dbconn)
return [ return [
{ cast({
"artists":artists[row.id], "artists":artists[row.id],
"title":row.title, "title":row.title,
"album":albums.get(row.album_id), "album":albums.get(row.album_id),
"length":row.length "length":row.length
} }, TrackDict)
for row in rows for row in rows
] ]
def track_db_to_dict(row,dbconn=None):
def track_db_to_dict(row, dbconn=None) -> TrackDict:
return tracks_db_to_dict([row], dbconn=dbconn)[0] return tracks_db_to_dict([row], dbconn=dbconn)[0]
def artists_db_to_dict(rows,dbconn=None):
def artists_db_to_dict(rows, dbconn=None) -> list[str]:
return [ return [
row.name row.name
for row in rows for row in rows
] ]
def artist_db_to_dict(row,dbconn=None):
def artist_db_to_dict(row, dbconn=None) -> str:
return artists_db_to_dict([row], dbconn=dbconn)[0] return artists_db_to_dict([row], dbconn=dbconn)[0]
def albums_db_to_dict(rows,dbconn=None):
def albums_db_to_dict(rows, dbconn=None) -> list[AlbumDict]:
artists = get_artists_of_albums(set(row.id for row in rows), dbconn=dbconn) artists = get_artists_of_albums(set(row.id for row in rows), dbconn=dbconn)
return [ return [
{ cast({
"artists": artists.get(row.id), "artists": artists.get(row.id),
"albumtitle": row.albtitle, "albumtitle": row.albtitle,
} }, AlbumDict)
for row in rows for row in rows
] ]
def album_db_to_dict(row,dbconn=None):
def album_db_to_dict(row, dbconn=None) -> AlbumDict:
return albums_db_to_dict([row], dbconn=dbconn)[0] return albums_db_to_dict([row], dbconn=dbconn)[0]
### DICT -> DB ### DICT -> DB
# These should return None when no data is in the dict so they can be used for update statements # These should return None when no data is in the dict so they can be used for update statements
def scrobble_dict_to_db(info,update_album=False,dbconn=None): def scrobble_dict_to_db(info: ScrobbleDict, update_album=False, dbconn=None):
return { return {
"timestamp": info.get('time'), "timestamp": info.get('time'),
"origin": info.get('origin'), "origin": info.get('origin'),
@ -298,20 +324,23 @@ def scrobble_dict_to_db(info,update_album=False,dbconn=None):
"rawscrobble": json.dumps(info.get('rawscrobble')) if info.get('rawscrobble') else None "rawscrobble": json.dumps(info.get('rawscrobble')) if info.get('rawscrobble') else None
} }
def track_dict_to_db(info,dbconn=None):
def track_dict_to_db(info: TrackDict, dbconn=None):
return { return {
"title": info.get('title'), "title": info.get('title'),
"title_normalized": normalize_name(info.get('title', '')) or None, "title_normalized": normalize_name(info.get('title', '')) or None,
"length": info.get('length') "length": info.get('length')
} }
def artist_dict_to_db(info,dbconn=None):
def artist_dict_to_db(info: str, dbconn=None):
return { return {
"name": info, "name": info,
"name_normalized": normalize_name(info) "name_normalized": normalize_name(info)
} }
def album_dict_to_db(info,dbconn=None):
def album_dict_to_db(info: AlbumDict, dbconn=None):
return { return {
"albtitle": info.get('albumtitle'), "albtitle": info.get('albumtitle'),
"albtitle_normalized": normalize_name(info.get('albumtitle')) "albtitle_normalized": normalize_name(info.get('albumtitle'))
@ -320,14 +349,13 @@ def album_dict_to_db(info,dbconn=None):
##### Actual Database interactions ##### Actual Database interactions
# TODO: remove all resolve_id args and do that logic outside the caching to improve hit chances # TODO: remove all resolve_id args and do that logic outside the caching to improve hit chances
# TODO: maybe also factor out all intitial get entity funcs (some here, some in __init__) and throw exceptions # TODO: maybe also factor out all intitial get entity funcs (some here, some in __init__) and throw exceptions
@connection_provider @connection_provider
def add_scrobble(scrobbledict,update_album=False,dbconn=None): def add_scrobble(scrobbledict: ScrobbleDict, update_album=False, dbconn=None):
_, ex, er = add_scrobbles([scrobbledict], update_album=update_album, dbconn=dbconn) _, ex, er = add_scrobbles([scrobbledict], update_album=update_album, dbconn=dbconn)
if er > 0: if er > 0:
raise exc.DuplicateTimestamp(existing_scrobble=None, rejected_scrobble=scrobbledict) raise exc.DuplicateTimestamp(existing_scrobble=None, rejected_scrobble=scrobbledict)
@ -335,8 +363,9 @@ def add_scrobble(scrobbledict,update_album=False,dbconn=None):
elif ex > 0: elif ex > 0:
raise exc.DuplicateScrobble(scrobble=scrobbledict) raise exc.DuplicateScrobble(scrobble=scrobbledict)
@connection_provider @connection_provider
def add_scrobbles(scrobbleslist,update_album=False,dbconn=None): def add_scrobbles(scrobbleslist: list[ScrobbleDict], update_album=False, dbconn=None) -> tuple[int, int, int]:
with SCROBBLE_LOCK: with SCROBBLE_LOCK:
@ -365,13 +394,13 @@ def add_scrobbles(scrobbleslist,update_album=False,dbconn=None):
else: else:
errors += 1 errors += 1
if errors > 0: log(f"{errors} Scrobbles have not been written to database (duplicate timestamps)!", color='red') if errors > 0: log(f"{errors} Scrobbles have not been written to database (duplicate timestamps)!", color='red')
if exists > 0: log(f"{exists} Scrobbles have not been written to database (already exist)", color='orange') if exists > 0: log(f"{exists} Scrobbles have not been written to database (already exist)", color='orange')
return success, exists, errors return success, exists, errors
@connection_provider @connection_provider
def delete_scrobble(scrobble_id,dbconn=None): def delete_scrobble(scrobble_id: int, dbconn=None) -> bool:
with SCROBBLE_LOCK: with SCROBBLE_LOCK:
@ -385,7 +414,7 @@ def delete_scrobble(scrobble_id,dbconn=None):
@connection_provider @connection_provider
def add_track_to_album(track_id,album_id,replace=False,dbconn=None): def add_track_to_album(track_id: int, album_id: int, replace=False, dbconn=None) -> bool:
conditions = [ conditions = [
DB['tracks'].c.id == track_id DB['tracks'].c.id == track_id
@ -414,37 +443,37 @@ def add_track_to_album(track_id,album_id,replace=False,dbconn=None):
# ALL OF RECORDED HISTORY in order to display top weeks # ALL OF RECORDED HISTORY in order to display top weeks
# lmao # lmao
# TODO: figure out something better # TODO: figure out something better
return True return True
@connection_provider @connection_provider
def add_tracks_to_albums(track_to_album_id_dict,replace=False,dbconn=None): def add_tracks_to_albums(track_to_album_id_dict: dict[int, int], replace=False, dbconn=None) -> bool:
for track_id in track_to_album_id_dict: for track_id in track_to_album_id_dict:
add_track_to_album(track_id,track_to_album_id_dict[track_id], replace=replace, dbconn=dbconn) add_track_to_album(track_id,track_to_album_id_dict[track_id], replace=replace, dbconn=dbconn)
return True
@connection_provider @connection_provider
def remove_album(*track_ids,dbconn=None): def remove_album(*track_ids: list[int], dbconn=None) -> bool:
DB['tracks'].update().where( DB['tracks'].update().where(
DB['tracks'].c.track_id.in_(track_ids) DB['tracks'].c.track_id.in_(track_ids)
).values( ).values(
album_id=None album_id=None
) )
return True
### these will 'get' the ID of an entity, creating it if necessary ### these will 'get' the ID of an entity, creating it if necessary
@cached_wrapper @cached_wrapper
@connection_provider @connection_provider
def get_track_id(trackdict,create_new=True,update_album=False,dbconn=None): def get_track_id(trackdict: TrackDict, create_new=True, update_album=False, dbconn=None) -> int | None:
ntitle = normalize_name(trackdict['title']) ntitle = normalize_name(trackdict['title'])
artist_ids = [get_artist_id(a, create_new=create_new, dbconn=dbconn) for a in trackdict['artists']] artist_ids = [get_artist_id(a, create_new=create_new, dbconn=dbconn) for a in trackdict['artists']]
artist_ids = list(set(artist_ids)) artist_ids = list(set(artist_ids))
op = DB['tracks'].select().where( op = DB['tracks'].select().where(
DB['tracks'].c.title_normalized == ntitle DB['tracks'].c.title_normalized == ntitle
) )
@ -472,10 +501,10 @@ def get_track_id(trackdict,create_new=True,update_album=False,dbconn=None):
album_id = get_album_id(trackdict['album'],create_new=(update_album or not row.album_id),dbconn=dbconn) album_id = get_album_id(trackdict['album'],create_new=(update_album or not row.album_id),dbconn=dbconn)
add_track_to_album(row.id,album_id,replace=update_album,dbconn=dbconn) add_track_to_album(row.id,album_id,replace=update_album,dbconn=dbconn)
return row.id return row.id
if not create_new: return None if not create_new:
return None
#print("Creating new track") #print("Creating new track")
op = DB['tracks'].insert().values( op = DB['tracks'].insert().values(
@ -497,9 +526,10 @@ def get_track_id(trackdict,create_new=True,update_album=False,dbconn=None):
add_track_to_album(track_id, get_album_id(trackdict['album'], dbconn=dbconn), dbconn=dbconn) add_track_to_album(track_id, get_album_id(trackdict['album'], dbconn=dbconn), dbconn=dbconn)
return track_id return track_id
@cached_wrapper @cached_wrapper
@connection_provider @connection_provider
def get_artist_id(artistname,create_new=True,dbconn=None): def get_artist_id(artistname: str, create_new=True, dbconn=None) -> int | None:
nname = normalize_name(artistname) nname = normalize_name(artistname)
#print("looking for",nname) #print("looking for",nname)
@ -511,7 +541,8 @@ def get_artist_id(artistname,create_new=True,dbconn=None):
#print("ID for",artistname,"was",row[0]) #print("ID for",artistname,"was",row[0])
return row.id return row.id
if not create_new: return None if not create_new:
return None
op = DB['artists'].insert().values( op = DB['artists'].insert().values(
name=artistname, name=artistname,
@ -524,7 +555,7 @@ def get_artist_id(artistname,create_new=True,dbconn=None):
@cached_wrapper @cached_wrapper
@connection_provider @connection_provider
def get_album_id(albumdict,create_new=True,ignore_albumartists=False,dbconn=None): def get_album_id(albumdict: AlbumDict, create_new=True, ignore_albumartists=False, dbconn=None) -> int | None:
ntitle = normalize_name(albumdict['albumtitle']) ntitle = normalize_name(albumdict['albumtitle'])
artist_ids = [get_artist_id(a, dbconn=dbconn) for a in (albumdict.get('artists') or [])] artist_ids = [get_artist_id(a, dbconn=dbconn) for a in (albumdict.get('artists') or [])]
artist_ids = list(set(artist_ids)) artist_ids = list(set(artist_ids))
@ -554,8 +585,8 @@ def get_album_id(albumdict,create_new=True,ignore_albumartists=False,dbconn=None
#print("ID for",albumdict['title'],"was",row[0]) #print("ID for",albumdict['title'],"was",row[0])
return row.id return row.id
if not create_new: return None if not create_new:
return None
op = DB['albums'].insert().values( op = DB['albums'].insert().values(
**album_dict_to_db(albumdict, dbconn=dbconn) **album_dict_to_db(albumdict, dbconn=dbconn)
@ -573,13 +604,10 @@ def get_album_id(albumdict,create_new=True,ignore_albumartists=False,dbconn=None
return album_id return album_id
### Edit existing ### Edit existing
@connection_provider @connection_provider
def edit_scrobble(scrobble_id,scrobbleupdatedict,dbconn=None): def edit_scrobble(scrobble_id: int, scrobbleupdatedict: dict, dbconn=None) -> bool:
dbentry = scrobble_dict_to_db(scrobbleupdatedict,dbconn=dbconn) dbentry = scrobble_dict_to_db(scrobbleupdatedict,dbconn=dbconn)
dbentry = {k: v for k, v in dbentry.items() if v} dbentry = {k: v for k, v in dbentry.items() if v}
@ -595,81 +623,82 @@ def edit_scrobble(scrobble_id,scrobbleupdatedict,dbconn=None):
) )
dbconn.execute(op) dbconn.execute(op)
return True
# edit function only for primary db information (not linked fields) # edit function only for primary db information (not linked fields)
@connection_provider @connection_provider
def edit_artist(id,artistupdatedict,dbconn=None): def edit_artist(artist_id: int, artistupdatedict: str, dbconn=None) -> bool:
artist = get_artist(id) artist = get_artist(artist_id)
changedartist = artistupdatedict # well changedartist = artistupdatedict # well
dbentry = artist_dict_to_db(artistupdatedict, dbconn=dbconn) dbentry = artist_dict_to_db(artistupdatedict, dbconn=dbconn)
dbentry = {k: v for k, v in dbentry.items() if v} dbentry = {k: v for k, v in dbentry.items() if v}
existing_artist_id = get_artist_id(changedartist, create_new=False, dbconn=dbconn) existing_artist_id = get_artist_id(changedartist, create_new=False, dbconn=dbconn)
if existing_artist_id not in (None,id): if existing_artist_id not in (None, artist_id):
raise exc.ArtistExists(changedartist) raise exc.ArtistExists(changedartist)
op = DB['artists'].update().where( op = DB['artists'].update().where(
DB['artists'].c.id==id DB['artists'].c.id == artist_id
).values( ).values(
**dbentry **dbentry
) )
result = dbconn.execute(op) result = dbconn.execute(op)
return True return True
# edit function only for primary db information (not linked fields) # edit function only for primary db information (not linked fields)
@connection_provider @connection_provider
def edit_track(id,trackupdatedict,dbconn=None): def edit_track(track_id: int, trackupdatedict: dict, dbconn=None) -> bool:
track = get_track(id,dbconn=dbconn) track = get_track(track_id, dbconn=dbconn)
changedtrack = {**track,**trackupdatedict} changedtrack: TrackDict = {**track, **trackupdatedict}
dbentry = track_dict_to_db(trackupdatedict, dbconn=dbconn) dbentry = track_dict_to_db(trackupdatedict, dbconn=dbconn)
dbentry = {k: v for k, v in dbentry.items() if v} dbentry = {k: v for k, v in dbentry.items() if v}
existing_track_id = get_track_id(changedtrack, create_new=False, dbconn=dbconn) existing_track_id = get_track_id(changedtrack, create_new=False, dbconn=dbconn)
if existing_track_id not in (None,id): if existing_track_id not in (None, track_id):
raise exc.TrackExists(changedtrack) raise exc.TrackExists(changedtrack)
op = DB['tracks'].update().where( op = DB['tracks'].update().where(
DB['tracks'].c.id==id DB['tracks'].c.id == track_id
).values( ).values(
**dbentry **dbentry
) )
result = dbconn.execute(op) result = dbconn.execute(op)
return True return True
# edit function only for primary db information (not linked fields) # edit function only for primary db information (not linked fields)
@connection_provider @connection_provider
def edit_album(id,albumupdatedict,dbconn=None): def edit_album(album_id: int, albumupdatedict: dict, dbconn=None) -> bool:
album = get_album(id,dbconn=dbconn) album = get_album(album_id, dbconn=dbconn)
changedalbum = {**album,**albumupdatedict} changedalbum: AlbumDict = {**album, **albumupdatedict}
dbentry = album_dict_to_db(albumupdatedict, dbconn=dbconn) dbentry = album_dict_to_db(albumupdatedict, dbconn=dbconn)
dbentry = {k: v for k, v in dbentry.items() if v} dbentry = {k: v for k, v in dbentry.items() if v}
existing_album_id = get_album_id(changedalbum, create_new=False, dbconn=dbconn) existing_album_id = get_album_id(changedalbum, create_new=False, dbconn=dbconn)
if existing_album_id not in (None,id): if existing_album_id not in (None, album_id):
raise exc.TrackExists(changedalbum) raise exc.TrackExists(changedalbum)
op = DB['albums'].update().where( op = DB['albums'].update().where(
DB['albums'].c.id==id DB['albums'].c.id == album_id
).values( ).values(
**dbentry **dbentry
) )
result = dbconn.execute(op) result = dbconn.execute(op)
return True return True
### Edit associations ### Edit associations
@connection_provider @connection_provider
def add_artists_to_tracks(track_ids,artist_ids,dbconn=None): def add_artists_to_tracks(track_ids: list[int], artist_ids: list[int], dbconn=None) -> bool:
op = DB['trackartists'].insert().values([ op = DB['trackartists'].insert().values([
{'track_id': track_id, 'artist_id': artist_id} {'track_id': track_id, 'artist_id': artist_id}
@ -677,15 +706,14 @@ def add_artists_to_tracks(track_ids,artist_ids,dbconn=None):
]) ])
result = dbconn.execute(op) result = dbconn.execute(op)
# the resulting tracks could now be duplicates of existing ones # the resulting tracks could now be duplicates of existing ones
# this also takes care of clean_db # this also takes care of clean_db
merge_duplicate_tracks(dbconn=dbconn) merge_duplicate_tracks(dbconn=dbconn)
return True return True
@connection_provider @connection_provider
def remove_artists_from_tracks(track_ids,artist_ids,dbconn=None): def remove_artists_from_tracks(track_ids: list[int], artist_ids: list[int], dbconn=None) -> bool:
# only tracks that have at least one other artist # only tracks that have at least one other artist
subquery = DB['trackartists'].select().where( subquery = DB['trackartists'].select().where(
@ -703,16 +731,14 @@ def remove_artists_from_tracks(track_ids,artist_ids,dbconn=None):
) )
result = dbconn.execute(op) result = dbconn.execute(op)
# the resulting tracks could now be duplicates of existing ones # the resulting tracks could now be duplicates of existing ones
# this also takes care of clean_db # this also takes care of clean_db
merge_duplicate_tracks(dbconn=dbconn) merge_duplicate_tracks(dbconn=dbconn)
return True return True
@connection_provider @connection_provider
def add_artists_to_albums(album_ids,artist_ids,dbconn=None): def add_artists_to_albums(album_ids: list[int], artist_ids: list[int], dbconn=None) -> bool:
op = DB['albumartists'].insert().values([ op = DB['albumartists'].insert().values([
{'album_id':album_id,'artist_id':artist_id} {'album_id':album_id,'artist_id':artist_id}
@ -720,16 +746,14 @@ def add_artists_to_albums(album_ids,artist_ids,dbconn=None):
]) ])
result = dbconn.execute(op) result = dbconn.execute(op)
# the resulting albums could now be duplicates of existing ones # the resulting albums could now be duplicates of existing ones
# this also takes care of clean_db # this also takes care of clean_db
merge_duplicate_albums(dbconn=dbconn) merge_duplicate_albums(dbconn=dbconn)
return True return True
@connection_provider @connection_provider
def remove_artists_from_albums(album_ids,artist_ids,dbconn=None): def remove_artists_from_albums(album_ids: list[int], artist_ids: list[int], dbconn=None) -> bool:
# no check here, albums are allowed to have zero artists # no check here, albums are allowed to have zero artists
@ -741,17 +765,16 @@ def remove_artists_from_albums(album_ids,artist_ids,dbconn=None):
) )
result = dbconn.execute(op) result = dbconn.execute(op)
# the resulting albums could now be duplicates of existing ones # the resulting albums could now be duplicates of existing ones
# this also takes care of clean_db # this also takes care of clean_db
merge_duplicate_albums(dbconn=dbconn) merge_duplicate_albums(dbconn=dbconn)
return True return True
### Merge ### Merge
@connection_provider @connection_provider
def merge_tracks(target_id,source_ids,dbconn=None): def merge_tracks(target_id: int, source_ids: list[int], dbconn=None) -> bool:
op = DB['scrobbles'].update().where( op = DB['scrobbles'].update().where(
DB['scrobbles'].c.track_id.in_(source_ids) DB['scrobbles'].c.track_id.in_(source_ids)
@ -760,11 +783,11 @@ def merge_tracks(target_id,source_ids,dbconn=None):
) )
result = dbconn.execute(op) result = dbconn.execute(op)
clean_db(dbconn=dbconn) clean_db(dbconn=dbconn)
return True return True
@connection_provider @connection_provider
def merge_artists(target_id,source_ids,dbconn=None): def merge_artists(target_id: int, source_ids: list[int], dbconn=None) -> bool:
# some tracks could already have multiple of the to be merged artists # some tracks could already have multiple of the to be merged artists
@ -792,7 +815,6 @@ def merge_artists(target_id,source_ids,dbconn=None):
result = dbconn.execute(op) result = dbconn.execute(op)
# same for albums # same for albums
op = DB['albumartists'].select().where( op = DB['albumartists'].select().where(
DB['albumartists'].c.artist_id.in_(source_ids + [target_id]) DB['albumartists'].c.artist_id.in_(source_ids + [target_id])
@ -813,7 +835,6 @@ def merge_artists(target_id,source_ids,dbconn=None):
result = dbconn.execute(op) result = dbconn.execute(op)
# tracks_artists = {} # tracks_artists = {}
# for row in result: # for row in result:
# tracks_artists.setdefault(row.track_id,[]).append(row.artist_id) # tracks_artists.setdefault(row.track_id,[]).append(row.artist_id)
@ -833,12 +854,11 @@ def merge_artists(target_id,source_ids,dbconn=None):
merge_duplicate_tracks(artist_id=target_id, dbconn=dbconn) merge_duplicate_tracks(artist_id=target_id, dbconn=dbconn)
merge_duplicate_albums(artist_id=target_id, dbconn=dbconn) merge_duplicate_albums(artist_id=target_id, dbconn=dbconn)
clean_db(dbconn=dbconn) clean_db(dbconn=dbconn)
return True return True
@connection_provider @connection_provider
def merge_albums(target_id,source_ids,dbconn=None): def merge_albums(target_id: int, source_ids: list[int], dbconn=None) -> bool:
op = DB['tracks'].update().where( op = DB['tracks'].update().where(
DB['tracks'].c.album_id.in_(source_ids) DB['tracks'].c.album_id.in_(source_ids)
@ -847,7 +867,6 @@ def merge_albums(target_id,source_ids,dbconn=None):
) )
result = dbconn.execute(op) result = dbconn.execute(op)
clean_db(dbconn=dbconn) clean_db(dbconn=dbconn)
return True return True
@ -1622,40 +1641,43 @@ def get_credited_artists(*artists,dbconn=None):
@cached_wrapper @cached_wrapper
@connection_provider @connection_provider
def get_track(id,dbconn=None): def get_track(track_id: int, dbconn=None) -> TrackDict:
op = DB['tracks'].select().where( op = DB['tracks'].select().where(
DB['tracks'].c.id==id DB['tracks'].c.id == track_id
) )
result = dbconn.execute(op).all() result = dbconn.execute(op).all()
trackinfo = result[0] trackinfo = result[0]
return track_db_to_dict(trackinfo, dbconn=dbconn) return track_db_to_dict(trackinfo, dbconn=dbconn)
@cached_wrapper @cached_wrapper
@connection_provider @connection_provider
def get_artist(id,dbconn=None): def get_artist(artist_id: int, dbconn=None) -> str:
op = DB['artists'].select().where( op = DB['artists'].select().where(
DB['artists'].c.id==id DB['artists'].c.id == artist_id
) )
result = dbconn.execute(op).all() result = dbconn.execute(op).all()
artistinfo = result[0] artistinfo = result[0]
return artist_db_to_dict(artistinfo, dbconn=dbconn) return artist_db_to_dict(artistinfo, dbconn=dbconn)
@cached_wrapper @cached_wrapper
@connection_provider @connection_provider
def get_album(id,dbconn=None): def get_album(album_id: int, dbconn=None) -> AlbumDict:
op = DB['albums'].select().where( op = DB['albums'].select().where(
DB['albums'].c.id==id DB['albums'].c.id == album_id
) )
result = dbconn.execute(op).all() result = dbconn.execute(op).all()
albuminfo = result[0] albuminfo = result[0]
return album_db_to_dict(albuminfo, dbconn=dbconn) return album_db_to_dict(albuminfo, dbconn=dbconn)
@cached_wrapper @cached_wrapper
@connection_provider @connection_provider
def get_scrobble(timestamp, include_internal=False, dbconn=None): def get_scrobble(timestamp: int, include_internal=False, dbconn=None) -> ScrobbleDict:
op = DB['scrobbles'].select().where( op = DB['scrobbles'].select().where(
DB['scrobbles'].c.timestamp == timestamp DB['scrobbles'].c.timestamp == timestamp
) )
@ -1664,6 +1686,7 @@ def get_scrobble(timestamp, include_internal=False, dbconn=None):
scrobble = result[0] scrobble = result[0]
return scrobbles_db_to_dict(rows=[scrobble], include_internal=include_internal)[0] return scrobbles_db_to_dict(rows=[scrobble], include_internal=include_internal)[0]
@cached_wrapper @cached_wrapper
@connection_provider @connection_provider
def search_artist(searchterm,dbconn=None): def search_artist(searchterm,dbconn=None):