mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Refactor out sanity_check as a utility method and re-organize utility methods into their own module
This commit is contained in:
parent
4016e852d8
commit
e7e3b86573
@ -5,7 +5,7 @@ __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
|
||||
Device drivers.
|
||||
'''
|
||||
|
||||
import sys, time, pprint, operator, re, os
|
||||
import sys, time, pprint, operator
|
||||
from functools import partial
|
||||
from StringIO import StringIO
|
||||
|
||||
@ -27,112 +27,6 @@ def strftime(epoch, zone=time.gmtime):
|
||||
src[2] = INVERSE_MONTH_MAP[int(src[2])]
|
||||
return ' '.join(src)
|
||||
|
||||
def build_template_regexp(template):
|
||||
from calibre import prints
|
||||
|
||||
def replfunc(match, seen=None):
|
||||
v = match.group(1)
|
||||
if v in ['authors', 'author_sort']:
|
||||
v = 'author'
|
||||
if v in ('title', 'series', 'series_index', 'isbn', 'author'):
|
||||
if v not in seen:
|
||||
seen.add(v)
|
||||
return '(?P<' + v + '>.+?)'
|
||||
return '(.+?)'
|
||||
s = set()
|
||||
f = partial(replfunc, seen=s)
|
||||
|
||||
try:
|
||||
template = template.rpartition('/')[2]
|
||||
return re.compile(re.sub('{([^}]*)}', f, template) + '([_\d]*$)')
|
||||
except:
|
||||
prints(u'Failed to parse template: %r'%template)
|
||||
template = u'{title} - {authors}'
|
||||
return re.compile(re.sub('{([^}]*)}', f, template) + '([_\d]*$)')
|
||||
|
||||
def create_upload_path(mdata, fname, template, sanitize,
|
||||
prefix_path='',
|
||||
path_type=os.path,
|
||||
maxlen=250,
|
||||
use_subdirs=True,
|
||||
news_in_folder=True,
|
||||
filename_callback=lambda x, y:x,
|
||||
sanitize_path_components=lambda x: x
|
||||
):
|
||||
from calibre.library.save_to_disk import get_components, config
|
||||
from calibre.utils.filenames import shorten_components_to
|
||||
|
||||
special_tag = None
|
||||
if mdata.tags:
|
||||
for t in mdata.tags:
|
||||
if t.startswith(_('News')) or t.startswith('/'):
|
||||
special_tag = t
|
||||
break
|
||||
|
||||
if mdata.tags and _('News') in mdata.tags:
|
||||
try:
|
||||
p = mdata.pubdate
|
||||
date = (p.year, p.month, p.day)
|
||||
except:
|
||||
today = time.localtime()
|
||||
date = (today[0], today[1], today[2])
|
||||
template = u"{title}_%d-%d-%d" % date
|
||||
|
||||
fname = sanitize(fname)
|
||||
ext = path_type.splitext(fname)[1]
|
||||
|
||||
opts = config().parse()
|
||||
if not isinstance(template, unicode):
|
||||
template = template.decode('utf-8')
|
||||
app_id = str(getattr(mdata, 'application_id', ''))
|
||||
id_ = mdata.get('id', fname)
|
||||
extra_components = get_components(template, mdata, id_,
|
||||
timefmt=opts.send_timefmt, length=maxlen-len(app_id)-1)
|
||||
if not extra_components:
|
||||
extra_components.append(sanitize(filename_callback(fname,
|
||||
mdata)))
|
||||
else:
|
||||
extra_components[-1] = sanitize(filename_callback(extra_components[-1]+ext, mdata))
|
||||
|
||||
if extra_components[-1] and extra_components[-1][0] in ('.', '_'):
|
||||
extra_components[-1] = 'x' + extra_components[-1][1:]
|
||||
|
||||
if special_tag is not None:
|
||||
name = extra_components[-1]
|
||||
extra_components = []
|
||||
tag = special_tag
|
||||
if tag.startswith(_('News')):
|
||||
if news_in_folder:
|
||||
extra_components.append('News')
|
||||
else:
|
||||
for c in tag.split('/'):
|
||||
c = sanitize(c)
|
||||
if not c: continue
|
||||
extra_components.append(c)
|
||||
extra_components.append(name)
|
||||
|
||||
if not use_subdirs:
|
||||
extra_components = extra_components[-1:]
|
||||
|
||||
def remove_trailing_periods(x):
|
||||
ans = x
|
||||
while ans.endswith('.'):
|
||||
ans = ans[:-1].strip()
|
||||
if not ans:
|
||||
ans = 'x'
|
||||
return ans
|
||||
|
||||
extra_components = list(map(remove_trailing_periods, extra_components))
|
||||
components = shorten_components_to(maxlen - len(prefix_path), extra_components)
|
||||
components = sanitize_path_components(components)
|
||||
if prefix_path:
|
||||
filepath = path_type.join(prefix_path, *components)
|
||||
else:
|
||||
filepath = path_type.join(*components)
|
||||
|
||||
return filepath
|
||||
|
||||
|
||||
def get_connected_device():
|
||||
from calibre.customize.ui import device_plugins
|
||||
from calibre.devices.scanner import DeviceScanner
|
||||
|
@ -55,7 +55,7 @@ class MTPDeviceBase(DevicePlugin):
|
||||
return False
|
||||
|
||||
def build_template_regexp(self):
|
||||
from calibre.devices import build_template_regexp
|
||||
from calibre.devices.utils import build_template_regexp
|
||||
return build_template_regexp(self.save_template)
|
||||
|
||||
@property
|
||||
|
@ -33,9 +33,11 @@ class MTP_DEVICE(BASE):
|
||||
BACKLOADING_ERROR_MESSAGE = None
|
||||
MANAGES_DEVICE_PRESENCE = True
|
||||
FORMATS = ['epub', 'azw3', 'mobi', 'pdf']
|
||||
DEVICE_PLUGBOARD_NAME = 'MTP_DEVICE'
|
||||
|
||||
def open(self, devices, library_uuid):
|
||||
self.current_library_uuid = library_uuid
|
||||
self.plugboards = self.plugboard_func = None
|
||||
BASE.open(self, devices, library_uuid)
|
||||
|
||||
# Device information {{{
|
||||
@ -228,8 +230,12 @@ class MTP_DEVICE(BASE):
|
||||
|
||||
# Sending files to the device {{{
|
||||
|
||||
def set_plugboards(self, plugboards, pb_func):
|
||||
self.plugboards = plugboards
|
||||
self.plugboard_func = pb_func
|
||||
|
||||
def create_upload_path(self, path, mdata, fname):
|
||||
from calibre.devices import create_upload_path
|
||||
from calibre.devices.utils import create_upload_path
|
||||
from calibre.utils.filenames import ascii_filename as sanitize
|
||||
filepath = create_upload_path(mdata, fname, self.save_template, sanitize,
|
||||
prefix_path=path,
|
||||
@ -240,6 +246,12 @@ class MTP_DEVICE(BASE):
|
||||
)
|
||||
return tuple(x.lower() for x in filepath.split('/'))
|
||||
|
||||
def upload_books(self, files, names, on_card=None, end_session=True,
|
||||
metadata=None):
|
||||
from calibre.devices.utils import sanity_check
|
||||
sanity_check(on_card, files, self.card_prefix(), self.free_space())
|
||||
raise NotImplementedError()
|
||||
|
||||
# }}}
|
||||
|
||||
# Settings {{{
|
||||
|
@ -15,8 +15,7 @@ import os, subprocess, time, re, sys, glob
|
||||
from itertools import repeat
|
||||
|
||||
from calibre.devices.interface import DevicePlugin
|
||||
from calibre.devices.errors import (DeviceError, FreeSpaceError,
|
||||
WrongDestinationError)
|
||||
from calibre.devices.errors import DeviceError
|
||||
from calibre.devices.usbms.deviceconfig import DeviceConfig
|
||||
from calibre.constants import iswindows, islinux, isosx, isfreebsd, plugins
|
||||
from calibre.utils.filenames import ascii_filename as sanitize
|
||||
@ -976,20 +975,8 @@ class Device(DeviceConfig, DevicePlugin):
|
||||
return self.EBOOK_DIR_CARD_A
|
||||
|
||||
def _sanity_check(self, on_card, files):
|
||||
if on_card == 'carda' and not self._card_a_prefix:
|
||||
raise WrongDestinationError(_(
|
||||
'The reader has no storage card %s. You may have changed '
|
||||
'the default send to device action. Right click on the send '
|
||||
'to device button and reset the default action to be '
|
||||
'"Send to main memory".')%'A')
|
||||
elif on_card == 'cardb' and not self._card_b_prefix:
|
||||
raise WrongDestinationError(_(
|
||||
'The reader has no storage card %s. You may have changed '
|
||||
'the default send to device action. Right click on the send '
|
||||
'to device button and reset the default action to be '
|
||||
'"Send to main memory".')%'B')
|
||||
elif on_card and on_card not in ('carda', 'cardb'):
|
||||
raise DeviceError(_('Selected slot: %s is not supported.') % on_card)
|
||||
from calibre.devices.utils import sanity_check
|
||||
sanity_check(on_card, files, self.card_prefix(), self.free_space())
|
||||
|
||||
def get_dest_dir(prefix, candidates):
|
||||
if isinstance(candidates, basestring):
|
||||
@ -1014,19 +1001,6 @@ class Device(DeviceConfig, DevicePlugin):
|
||||
candidates = self.get_main_ebook_dir(for_upload=True)
|
||||
path = get_dest_dir(self._main_prefix, candidates)
|
||||
|
||||
def get_size(obj):
|
||||
path = getattr(obj, 'name', obj)
|
||||
return os.path.getsize(path)
|
||||
|
||||
sizes = [get_size(f) for f in files]
|
||||
size = sum(sizes)
|
||||
|
||||
if not on_card and size > self.free_space()[0] - 2*1024*1024:
|
||||
raise FreeSpaceError(_("There is insufficient free space in main memory"))
|
||||
if on_card == 'carda' and size > self.free_space()[1] - 1024*1024:
|
||||
raise FreeSpaceError(_("There is insufficient free space on the storage card"))
|
||||
if on_card == 'cardb' and size > self.free_space()[2] - 1024*1024:
|
||||
raise FreeSpaceError(_("There is insufficient free space on the storage card"))
|
||||
return path
|
||||
|
||||
def filename_callback(self, default, mi):
|
||||
@ -1056,7 +1030,7 @@ class Device(DeviceConfig, DevicePlugin):
|
||||
pass
|
||||
|
||||
def create_upload_path(self, path, mdata, fname, create_dirs=True):
|
||||
from calibre.devices import create_upload_path
|
||||
from calibre.devices.utils import create_upload_path
|
||||
settings = self.settings()
|
||||
filepath = create_upload_path(mdata, fname, self.save_template(), sanitize,
|
||||
prefix_path=os.path.abspath(path),
|
||||
|
@ -404,7 +404,7 @@ class USBMS(CLI, Device):
|
||||
|
||||
@classmethod
|
||||
def build_template_regexp(cls):
|
||||
from calibre.devices import build_template_regexp
|
||||
from calibre.devices.utils import build_template_regexp
|
||||
return build_template_regexp(cls.save_template())
|
||||
|
||||
@classmethod
|
||||
|
148
src/calibre/devices/utils.py
Normal file
148
src/calibre/devices/utils.py
Normal file
@ -0,0 +1,148 @@
|
||||
#!/usr/bin/env python
|
||||
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:fdm=marker:ai
|
||||
from __future__ import (unicode_literals, division, absolute_import,
|
||||
print_function)
|
||||
|
||||
__license__ = 'GPL v3'
|
||||
__copyright__ = '2012, Kovid Goyal <kovid at kovidgoyal.net>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
import os, time, re
|
||||
from functools import partial
|
||||
|
||||
from calibre.devices.errors import DeviceError, WrongDestinationError, FreeSpaceError
|
||||
|
||||
def sanity_check(on_card, files, card_prefixes, free_space):
|
||||
if on_card == 'carda' and not card_prefixes[0]:
|
||||
raise WrongDestinationError(_(
|
||||
'The reader has no storage card %s. You may have changed '
|
||||
'the default send to device action. Right click on the send '
|
||||
'to device button and reset the default action to be '
|
||||
'"Send to main memory".')%'A')
|
||||
elif on_card == 'cardb' and not card_prefixes[1]:
|
||||
raise WrongDestinationError(_(
|
||||
'The reader has no storage card %s. You may have changed '
|
||||
'the default send to device action. Right click on the send '
|
||||
'to device button and reset the default action to be '
|
||||
'"Send to main memory".')%'B')
|
||||
elif on_card and on_card not in ('carda', 'cardb'):
|
||||
raise DeviceError(_('Selected slot: %s is not supported.') % on_card)
|
||||
|
||||
size = 0
|
||||
for f in files:
|
||||
size += os.path.getsize(getattr(f, 'name', f))
|
||||
|
||||
if not on_card and size > free_space[0] - 2*1024*1024:
|
||||
raise FreeSpaceError(_("There is insufficient free space in main memory"))
|
||||
if on_card == 'carda' and size > free_space[1] - 1024*1024:
|
||||
raise FreeSpaceError(_("There is insufficient free space on the storage card"))
|
||||
if on_card == 'cardb' and size > free_space[2] - 1024*1024:
|
||||
raise FreeSpaceError(_("There is insufficient free space on the storage card"))
|
||||
|
||||
def build_template_regexp(template):
|
||||
from calibre import prints
|
||||
|
||||
def replfunc(match, seen=None):
|
||||
v = match.group(1)
|
||||
if v in ['authors', 'author_sort']:
|
||||
v = 'author'
|
||||
if v in ('title', 'series', 'series_index', 'isbn', 'author'):
|
||||
if v not in seen:
|
||||
seen.add(v)
|
||||
return '(?P<' + v + '>.+?)'
|
||||
return '(.+?)'
|
||||
s = set()
|
||||
f = partial(replfunc, seen=s)
|
||||
|
||||
try:
|
||||
template = template.rpartition('/')[2]
|
||||
return re.compile(re.sub('{([^}]*)}', f, template) + '([_\d]*$)')
|
||||
except:
|
||||
prints(u'Failed to parse template: %r'%template)
|
||||
template = u'{title} - {authors}'
|
||||
return re.compile(re.sub('{([^}]*)}', f, template) + '([_\d]*$)')
|
||||
|
||||
def create_upload_path(mdata, fname, template, sanitize,
|
||||
prefix_path='',
|
||||
path_type=os.path,
|
||||
maxlen=250,
|
||||
use_subdirs=True,
|
||||
news_in_folder=True,
|
||||
filename_callback=lambda x, y:x,
|
||||
sanitize_path_components=lambda x: x
|
||||
):
|
||||
from calibre.library.save_to_disk import get_components, config
|
||||
from calibre.utils.filenames import shorten_components_to
|
||||
|
||||
special_tag = None
|
||||
if mdata.tags:
|
||||
for t in mdata.tags:
|
||||
if t.startswith(_('News')) or t.startswith('/'):
|
||||
special_tag = t
|
||||
break
|
||||
|
||||
if mdata.tags and _('News') in mdata.tags:
|
||||
try:
|
||||
p = mdata.pubdate
|
||||
date = (p.year, p.month, p.day)
|
||||
except:
|
||||
today = time.localtime()
|
||||
date = (today[0], today[1], today[2])
|
||||
template = u"{title}_%d-%d-%d" % date
|
||||
|
||||
fname = sanitize(fname)
|
||||
ext = path_type.splitext(fname)[1]
|
||||
|
||||
opts = config().parse()
|
||||
if not isinstance(template, unicode):
|
||||
template = template.decode('utf-8')
|
||||
app_id = str(getattr(mdata, 'application_id', ''))
|
||||
id_ = mdata.get('id', fname)
|
||||
extra_components = get_components(template, mdata, id_,
|
||||
timefmt=opts.send_timefmt, length=maxlen-len(app_id)-1)
|
||||
if not extra_components:
|
||||
extra_components.append(sanitize(filename_callback(fname,
|
||||
mdata)))
|
||||
else:
|
||||
extra_components[-1] = sanitize(filename_callback(extra_components[-1]+ext, mdata))
|
||||
|
||||
if extra_components[-1] and extra_components[-1][0] in ('.', '_'):
|
||||
extra_components[-1] = 'x' + extra_components[-1][1:]
|
||||
|
||||
if special_tag is not None:
|
||||
name = extra_components[-1]
|
||||
extra_components = []
|
||||
tag = special_tag
|
||||
if tag.startswith(_('News')):
|
||||
if news_in_folder:
|
||||
extra_components.append('News')
|
||||
else:
|
||||
for c in tag.split('/'):
|
||||
c = sanitize(c)
|
||||
if not c: continue
|
||||
extra_components.append(c)
|
||||
extra_components.append(name)
|
||||
|
||||
if not use_subdirs:
|
||||
extra_components = extra_components[-1:]
|
||||
|
||||
def remove_trailing_periods(x):
|
||||
ans = x
|
||||
while ans.endswith('.'):
|
||||
ans = ans[:-1].strip()
|
||||
if not ans:
|
||||
ans = 'x'
|
||||
return ans
|
||||
|
||||
extra_components = list(map(remove_trailing_periods, extra_components))
|
||||
components = shorten_components_to(maxlen - len(prefix_path), extra_components)
|
||||
components = sanitize_path_components(components)
|
||||
if prefix_path:
|
||||
filepath = path_type.join(prefix_path, *components)
|
||||
else:
|
||||
filepath = path_type.join(*components)
|
||||
|
||||
return filepath
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user