diff --git a/maloja/database/__init__.py b/maloja/database/__init__.py index 0c4b786..dc7dd43 100644 --- a/maloja/database/__init__.py +++ b/maloja/database/__init__.py @@ -10,6 +10,7 @@ from ..thirdparty import proxy_scrobble_all from ..globalconf import data_dir, malojaconfig, apikeystore #db from . import sqldb +from .cache import db_query, db_aggregate # doreah toolkit from doreah.logging import log @@ -33,7 +34,6 @@ import unicodedata from collections import namedtuple from threading import Lock import yaml, json -import lru import math # url handling @@ -556,165 +556,6 @@ def start_db(): -### -## Caches in front of DB -## the volatile caches are intended mainly for excessive site navigation during one session -## the permanent caches are there to save data that is hard to calculate and never changes (old charts) -### - - - - -import copy - -if malojaconfig["USE_DB_CACHE"]: - def db_query(**kwargs): - return db_query_cached(**kwargs) - def db_aggregate(**kwargs): - return db_aggregate_cached(**kwargs) -else: - def db_query(**kwargs): - return db_query_full(**kwargs) - def db_aggregate(**kwargs): - return db_aggregate_full(**kwargs) - - -csz = malojaconfig["DB_CACHE_ENTRIES"] -cmp = malojaconfig["DB_MAX_MEMORY"] -try: - import psutil - use_psutil = True -except: - use_psutil = False - -cache_query = lru.LRU(csz) -cache_query_perm = lru.LRU(csz) -cache_aggregate = lru.LRU(csz) -cache_aggregate_perm = lru.LRU(csz) - -perm_caching = malojaconfig["CACHE_DATABASE_PERM"] -temp_caching = malojaconfig["CACHE_DATABASE_SHORT"] - -cachestats = { - "cache_query":{ - "hits_perm":0, - "hits_tmp":0, - "misses":0, - "objperm":cache_query_perm, - "objtmp":cache_query, - "name":"Query Cache" - }, - "cache_aggregate":{ - "hits_perm":0, - "hits_tmp":0, - "misses":0, - "objperm":cache_aggregate_perm, - "objtmp":cache_aggregate, - "name":"Aggregate Cache" - } -} - -from doreah.regular import runhourly - -@runhourly -def log_stats(): - logstr = "{name}: {hitsperm} Perm Hits, {hitstmp} Tmp Hits, {misses} Misses; Current Size: {sizeperm}/{sizetmp}" - for s in (cachestats["cache_query"],cachestats["cache_aggregate"]): - log(logstr.format(name=s["name"],hitsperm=s["hits_perm"],hitstmp=s["hits_tmp"],misses=s["misses"], - sizeperm=len(s["objperm"]),sizetmp=len(s["objtmp"])),module="debug") - -def db_query_cached(**kwargs): - global cache_query, cache_query_perm - key = utilities.serialize(kwargs) - - eligible_permanent_caching = ( - "timerange" in kwargs and - not kwargs["timerange"].active() and - perm_caching - ) - eligible_temporary_caching = ( - not eligible_permanent_caching and - temp_caching - ) - - # hit permanent cache for past timeranges - if eligible_permanent_caching and key in cache_query_perm: - cachestats["cache_query"]["hits_perm"] += 1 - return copy.copy(cache_query_perm.get(key)) - - # hit short term cache - elif eligible_temporary_caching and key in cache_query: - cachestats["cache_query"]["hits_tmp"] += 1 - return copy.copy(cache_query.get(key)) - - else: - cachestats["cache_query"]["misses"] += 1 - result = db_query_full(**kwargs) - if eligible_permanent_caching: cache_query_perm[key] = result - elif eligible_temporary_caching: cache_query[key] = result - - if use_psutil: - reduce_caches_if_low_ram() - - return result - - -def db_aggregate_cached(**kwargs): - global cache_aggregate, cache_aggregate_perm - key = utilities.serialize(kwargs) - - eligible_permanent_caching = ( - "timerange" in kwargs and - not kwargs["timerange"].active() and - perm_caching - ) - eligible_temporary_caching = ( - not eligible_permanent_caching and - temp_caching - ) - - # hit permanent cache for past timeranges - if eligible_permanent_caching and key in cache_aggregate_perm: - cachestats["cache_aggregate"]["hits_perm"] += 1 - return copy.copy(cache_aggregate_perm.get(key)) - - # hit short term cache - elif eligible_temporary_caching and key in cache_aggregate: - cachestats["cache_aggregate"]["hits_tmp"] += 1 - return copy.copy(cache_aggregate.get(key)) - - else: - cachestats["cache_aggregate"]["misses"] += 1 - result = db_aggregate_full(**kwargs) - if eligible_permanent_caching: cache_aggregate_perm[key] = result - elif eligible_temporary_caching: cache_aggregate[key] = result - - if use_psutil: - reduce_caches_if_low_ram() - - return result - -def invalidate_caches(): - global cache_query, cache_aggregate - cache_query.clear() - cache_aggregate.clear() - log("Database caches invalidated.") - -def reduce_caches(to=0.75): - global cache_query, cache_aggregate, cache_query_perm, cache_aggregate_perm - for c in cache_query, cache_aggregate, cache_query_perm, cache_aggregate_perm: - currentsize = len(c) - if currentsize > 100: - targetsize = max(int(currentsize * to),10) - c.set_size(targetsize) - c.set_size(csz) - -def reduce_caches_if_low_ram(): - ramprct = psutil.virtual_memory().percent - if ramprct > cmp: - log("{prct}% RAM usage, reducing caches!".format(prct=ramprct),module="debug") - ratio = (cmp / ramprct) ** 3 - reduce_caches(to=ratio) #### ## Database queries @@ -735,7 +576,10 @@ def db_query_full(artist=None,artists=None,title=None,track=None,timerange=None, #artist = None if artists is not None and title is not None: - return sqldb.get_scrobbles_of_track(track={"artists":artists,"title":title},since=since,to=to) + track = {'artists':artists,'title':title} + + if track is not None: + return sqldb.get_scrobbles_of_track(track=track,since=since,to=to) if artist is not None: return sqldb.get_scrobbles_of_artist(artist=artist,since=since,to=to) diff --git a/maloja/database/cache.py b/maloja/database/cache.py new file mode 100644 index 0000000..90a672b --- /dev/null +++ b/maloja/database/cache.py @@ -0,0 +1,158 @@ + +### +## Caches in front of DB +## the volatile caches are intended mainly for excessive site navigation during one session +## the permanent caches are there to save data that is hard to calculate and never changes (old charts) +### + +import psutil +import copy +import lru + +from doreah.logging import log + +from ..globalconf import malojaconfig +from .. import utilities +from .. import database as dbmain + +if malojaconfig["USE_DB_CACHE"]: + def db_query(**kwargs): + return db_query_cached(**kwargs) + def db_aggregate(**kwargs): + return db_aggregate_cached(**kwargs) +else: + def db_query(**kwargs): + return dbmain.db_query_full(**kwargs) + def db_aggregate(**kwargs): + return dbmain.db_aggregate_full(**kwargs) + + +csz = malojaconfig["DB_CACHE_ENTRIES"] +cmp = malojaconfig["DB_MAX_MEMORY"] + +cache_query = lru.LRU(csz) +cache_query_perm = lru.LRU(csz) +cache_aggregate = lru.LRU(csz) +cache_aggregate_perm = lru.LRU(csz) + +perm_caching = malojaconfig["CACHE_DATABASE_PERM"] +temp_caching = malojaconfig["CACHE_DATABASE_SHORT"] + +cachestats = { + "cache_query":{ + "hits_perm":0, + "hits_tmp":0, + "misses":0, + "objperm":cache_query_perm, + "objtmp":cache_query, + "name":"Query Cache" + }, + "cache_aggregate":{ + "hits_perm":0, + "hits_tmp":0, + "misses":0, + "objperm":cache_aggregate_perm, + "objtmp":cache_aggregate, + "name":"Aggregate Cache" + } +} + +from doreah.regular import runhourly + +@runhourly +def log_stats(): + logstr = "{name}: {hitsperm} Perm Hits, {hitstmp} Tmp Hits, {misses} Misses; Current Size: {sizeperm}/{sizetmp}" + for s in (cachestats["cache_query"],cachestats["cache_aggregate"]): + log(logstr.format(name=s["name"],hitsperm=s["hits_perm"],hitstmp=s["hits_tmp"],misses=s["misses"], + sizeperm=len(s["objperm"]),sizetmp=len(s["objtmp"])),module="debug") + +def db_query_cached(**kwargs): + global cache_query, cache_query_perm + key = utilities.serialize(kwargs) + + eligible_permanent_caching = ( + "timerange" in kwargs and + not kwargs["timerange"].active() and + perm_caching + ) + eligible_temporary_caching = ( + not eligible_permanent_caching and + temp_caching + ) + + # hit permanent cache for past timeranges + if eligible_permanent_caching and key in cache_query_perm: + cachestats["cache_query"]["hits_perm"] += 1 + return copy.copy(cache_query_perm.get(key)) + + # hit short term cache + elif eligible_temporary_caching and key in cache_query: + cachestats["cache_query"]["hits_tmp"] += 1 + return copy.copy(cache_query.get(key)) + + else: + cachestats["cache_query"]["misses"] += 1 + result = dbmain.db_query_full(**kwargs) + if eligible_permanent_caching: cache_query_perm[key] = result + elif eligible_temporary_caching: cache_query[key] = result + + reduce_caches_if_low_ram() + + return result + + +def db_aggregate_cached(**kwargs): + global cache_aggregate, cache_aggregate_perm + key = utilities.serialize(kwargs) + + eligible_permanent_caching = ( + "timerange" in kwargs and + not kwargs["timerange"].active() and + perm_caching + ) + eligible_temporary_caching = ( + not eligible_permanent_caching and + temp_caching + ) + + # hit permanent cache for past timeranges + if eligible_permanent_caching and key in cache_aggregate_perm: + cachestats["cache_aggregate"]["hits_perm"] += 1 + return copy.copy(cache_aggregate_perm.get(key)) + + # hit short term cache + elif eligible_temporary_caching and key in cache_aggregate: + cachestats["cache_aggregate"]["hits_tmp"] += 1 + return copy.copy(cache_aggregate.get(key)) + + else: + cachestats["cache_aggregate"]["misses"] += 1 + result = dbmain.db_aggregate_full(**kwargs) + if eligible_permanent_caching: cache_aggregate_perm[key] = result + elif eligible_temporary_caching: cache_aggregate[key] = result + + reduce_caches_if_low_ram() + + return result + +def invalidate_caches(): + global cache_query, cache_aggregate + cache_query.clear() + cache_aggregate.clear() + log("Database caches invalidated.") + +def reduce_caches(to=0.75): + global cache_query, cache_aggregate, cache_query_perm, cache_aggregate_perm + for c in cache_query, cache_aggregate, cache_query_perm, cache_aggregate_perm: + currentsize = len(c) + if currentsize > 100: + targetsize = max(int(currentsize * to),10) + c.set_size(targetsize) + c.set_size(csz) + +def reduce_caches_if_low_ram(): + ramprct = psutil.virtual_memory().percent + if ramprct > cmp: + log("{prct}% RAM usage, reducing caches!".format(prct=ramprct),module="debug") + ratio = (cmp / ramprct) ** 3 + reduce_caches(to=ratio) diff --git a/maloja/database/sqldb.py b/maloja/database/sqldb.py index 1a88ab2..708a3d0 100644 --- a/maloja/database/sqldb.py +++ b/maloja/database/sqldb.py @@ -226,13 +226,16 @@ def get_scrobbles_of_artist(artist,since,to): artist_id = get_artist_id(artist) + jointable = sql.join(DB['scrobbles'],DB['trackartists'],DB['scrobbles'].c.track_id == DB['trackartists'].c.track_id) with engine.begin() as conn: - op = DB['scrobbles'].select().where( + op = jointable.select().where( DB['scrobbles'].c.timestamp<=to, DB['scrobbles'].c.timestamp>=since, + DB['trackartists'].c.artist_id==artist_id ) result = conn.execute(op).all() + result = [scrobble_db_to_dict(row) for row in result] return result @@ -244,9 +247,11 @@ def get_scrobbles_of_track(track,since,to): op = DB['scrobbles'].select().where( DB['scrobbles'].c.timestamp<=to, DB['scrobbles'].c.timestamp>=since, + DB['scrobbles'].c.track_id==track_id ) result = conn.execute(op).all() + result = [scrobble_db_to_dict(row) for row in result] return result