mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
automated upgrade of python code using ruff and ./setup.py upgrade_source_code
This commit is contained in:
parent
dd71135591
commit
3e58252176
@ -3,7 +3,6 @@ requires-python = ">=3.10"
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 160
|
||||
target-version = 'py310'
|
||||
builtins = ['_', 'I', 'P']
|
||||
include = ['*.py', '*.recipe']
|
||||
exclude = [
|
||||
|
@ -5,7 +5,6 @@ afriquexxi.info
|
||||
'''
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from calibre import browser
|
||||
|
@ -3,9 +3,9 @@
|
||||
|
||||
import json
|
||||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
import mechanize
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from calibre import browser
|
||||
from calibre.web.feeds.news import BasicNewsRecipe
|
||||
|
@ -5,7 +5,6 @@ contretemps.eu
|
||||
'''
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from calibre.web.feeds.news import BasicNewsRecipe
|
||||
|
@ -1,7 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
''' orientxxi.info '''
|
||||
from datetime import datetime
|
||||
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from calibre import browser
|
||||
|
@ -15,7 +15,7 @@ import sys
|
||||
import sysconfig
|
||||
import textwrap
|
||||
from functools import partial
|
||||
from typing import List, NamedTuple
|
||||
from typing import NamedTuple
|
||||
|
||||
from setup import SRC, Command, isbsd, isfreebsd, ishaiku, islinux, ismacos, iswindows
|
||||
|
||||
@ -24,14 +24,14 @@ isunix = islinux or ismacos or isbsd or ishaiku
|
||||
py_lib = os.path.join(sys.prefix, 'libs', 'python%d%d.lib' % sys.version_info[:2])
|
||||
|
||||
class CompileCommand(NamedTuple):
|
||||
cmd: List[str]
|
||||
cmd: list[str]
|
||||
src: str
|
||||
dest: str
|
||||
|
||||
|
||||
class LinkCommand(NamedTuple):
|
||||
cmd: List[str]
|
||||
objects: List[str]
|
||||
cmd: list[str]
|
||||
objects: list[str]
|
||||
dest: str
|
||||
|
||||
|
||||
@ -209,11 +209,11 @@ class Environment(NamedTuple):
|
||||
cc: str
|
||||
cxx: str
|
||||
linker: str
|
||||
base_cflags: List[str]
|
||||
base_cxxflags: List[str]
|
||||
base_ldflags: List[str]
|
||||
cflags: List[str]
|
||||
ldflags: List[str]
|
||||
base_cflags: list[str]
|
||||
base_cxxflags: list[str]
|
||||
base_ldflags: list[str]
|
||||
cflags: list[str]
|
||||
ldflags: list[str]
|
||||
make: str
|
||||
internal_inc_prefix: str
|
||||
external_inc_prefix: str
|
||||
@ -228,10 +228,10 @@ class Environment(NamedTuple):
|
||||
dest_ext: str
|
||||
std_prefix: str
|
||||
|
||||
def inc_dirs_to_cflags(self, dirs) -> List[str]:
|
||||
def inc_dirs_to_cflags(self, dirs) -> list[str]:
|
||||
return [self.external_inc_prefix+x for x in dirs]
|
||||
|
||||
def lib_dirs_to_ldflags(self, dirs) -> List[str]:
|
||||
def lib_dirs_to_ldflags(self, dirs) -> list[str]:
|
||||
return [self.libdir_prefix+x for x in dirs if x]
|
||||
|
||||
def libraries_to_ldflags(self, libs):
|
||||
|
@ -3900,4 +3900,4 @@ data = [('aa_DJ',
|
||||
('thousep', ','),
|
||||
('yesexpr', '^[yY]'),
|
||||
('noexpr', '^[nNcC]')])]
|
||||
# }}}
|
||||
# }}}
|
||||
|
@ -1,4 +1,3 @@
|
||||
# -* coding: utf-8 -*-
|
||||
#
|
||||
# License: MIT (see LICENSE file provided)
|
||||
# vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4:
|
||||
@ -34,25 +33,14 @@ default_encoding = 'utf-8'
|
||||
# python 2/3 compatibility helpers {{{
|
||||
|
||||
|
||||
if sys.version_info < (3,):
|
||||
PY3 = False
|
||||
text_type = unicode
|
||||
PY3 = True
|
||||
text_type = str
|
||||
|
||||
def b(s):
|
||||
return s
|
||||
def b(s):
|
||||
return s.encode()
|
||||
|
||||
def u(s):
|
||||
return unicode(s, "unicode_escape")
|
||||
|
||||
else:
|
||||
PY3 = True
|
||||
text_type = str
|
||||
|
||||
def b(s):
|
||||
return s.encode("latin-1")
|
||||
|
||||
def u(s):
|
||||
return s
|
||||
def u(s):
|
||||
return s
|
||||
# }}}
|
||||
# _pofile_or_mofile {{{
|
||||
|
||||
@ -390,7 +378,7 @@ class _BaseFile(list):
|
||||
# But if pickling, we never want to check for duplicates anyway.
|
||||
if getattr(self, 'check_for_duplicates', False) and entry in self:
|
||||
raise ValueError('Entry "%s" already exists' % entry.msgid)
|
||||
super(_BaseFile, self).append(entry)
|
||||
super().append(entry)
|
||||
|
||||
def insert(self, index, entry):
|
||||
"""
|
||||
@ -408,7 +396,7 @@ class _BaseFile(list):
|
||||
"""
|
||||
if self.check_for_duplicates and entry in self:
|
||||
raise ValueError('Entry "%s" already exists' % entry.msgid)
|
||||
super(_BaseFile, self).insert(index, entry)
|
||||
super().insert(index, entry)
|
||||
|
||||
def metadata_as_entry(self):
|
||||
"""
|
||||
@ -420,7 +408,7 @@ class _BaseFile(list):
|
||||
strs = []
|
||||
for name, value in mdata:
|
||||
# Strip whitespace off each line in a multi-line entry
|
||||
strs.append('%s: %s' % (name, value))
|
||||
strs.append(f'{name}: {value}')
|
||||
e.msgstr = '\n'.join(strs) + '\n'
|
||||
if self.metadata_is_fuzzy:
|
||||
e.flags.append('fuzzy')
|
||||
@ -444,7 +432,7 @@ class _BaseFile(list):
|
||||
string, controls how universal newlines works
|
||||
"""
|
||||
if self.fpath is None and fpath is None:
|
||||
raise IOError('You must provide a file path to save() method')
|
||||
raise OSError('You must provide a file path to save() method')
|
||||
contents = getattr(self, repr_method)()
|
||||
if fpath is None:
|
||||
fpath = self.fpath
|
||||
@ -452,7 +440,7 @@ class _BaseFile(list):
|
||||
with open(fpath, 'wb') as fhandle:
|
||||
fhandle.write(contents)
|
||||
else:
|
||||
with io.open(
|
||||
with open(
|
||||
fpath,
|
||||
'w',
|
||||
encoding=self.encoding,
|
||||
@ -730,10 +718,10 @@ class POFile(_BaseFile):
|
||||
object POFile, the reference catalog.
|
||||
"""
|
||||
# Store entries in dict/set for faster access
|
||||
self_entries = dict(
|
||||
(entry.msgid_with_context, entry) for entry in self
|
||||
)
|
||||
refpot_msgids = set(entry.msgid_with_context for entry in refpot)
|
||||
self_entries = {
|
||||
entry.msgid_with_context: entry for entry in self
|
||||
}
|
||||
refpot_msgids = {entry.msgid_with_context for entry in refpot}
|
||||
# Merge entries that are in the refpot
|
||||
for entry in refpot:
|
||||
e = self_entries.get(entry.msgid_with_context)
|
||||
@ -822,7 +810,7 @@ class MOFile(_BaseFile):
|
||||
# class _BaseEntry {{{
|
||||
|
||||
|
||||
class _BaseEntry(object):
|
||||
class _BaseEntry:
|
||||
"""
|
||||
Base class for :class:`~polib.POEntry` and :class:`~polib.MOEntry` classes.
|
||||
This class should **not** be instantiated directly.
|
||||
@ -942,16 +930,16 @@ class _BaseEntry(object):
|
||||
# quick and dirty trick to get the real field name
|
||||
fieldname = fieldname[9:]
|
||||
|
||||
ret = ['%s%s%s "%s"' % (delflag, fieldname, plural_index,
|
||||
ret = ['{}{}{} "{}"'.format(delflag, fieldname, plural_index,
|
||||
escape(lines.pop(0)))]
|
||||
for line in lines:
|
||||
ret.append('%s"%s"' % (delflag, escape(line)))
|
||||
ret.append(f'{delflag}"{escape(line)}"')
|
||||
return ret
|
||||
|
||||
@property
|
||||
def msgid_with_context(self):
|
||||
if self.msgctxt:
|
||||
return '%s%s%s' % (self.msgctxt, "\x04", self.msgid)
|
||||
return '{}{}{}'.format(self.msgctxt, "\x04", self.msgid)
|
||||
return self.msgid
|
||||
# }}}
|
||||
# class POEntry {{{
|
||||
@ -1023,14 +1011,14 @@ class POEntry(_BaseEntry):
|
||||
break_long_words=False
|
||||
)
|
||||
else:
|
||||
ret.append('%s%s' % (c[1], comment))
|
||||
ret.append(f'{c[1]}{comment}')
|
||||
|
||||
# occurrences (with text wrapping as xgettext does)
|
||||
if not self.obsolete and self.occurrences:
|
||||
filelist = []
|
||||
for fpath, lineno in self.occurrences:
|
||||
if lineno:
|
||||
filelist.append('%s:%s' % (fpath, lineno))
|
||||
filelist.append(f'{fpath}:{lineno}')
|
||||
else:
|
||||
filelist.append(fpath)
|
||||
filestr = ' '.join(filelist)
|
||||
@ -1238,7 +1226,7 @@ class MOEntry(_BaseEntry):
|
||||
# class _POFileParser {{{
|
||||
|
||||
|
||||
class _POFileParser(object):
|
||||
class _POFileParser:
|
||||
"""
|
||||
A finite state machine to efficiently and correctly parse po
|
||||
file format.
|
||||
@ -1264,10 +1252,10 @@ class _POFileParser(object):
|
||||
enc = kwargs.get('encoding', default_encoding)
|
||||
if _is_file(pofile):
|
||||
try:
|
||||
self.fhandle = io.open(pofile, 'rt', encoding=enc)
|
||||
self.fhandle = open(pofile, encoding=enc)
|
||||
except LookupError:
|
||||
enc = default_encoding
|
||||
self.fhandle = io.open(pofile, 'rt', encoding=enc)
|
||||
self.fhandle = open(pofile, encoding=enc)
|
||||
else:
|
||||
self.fhandle = pofile.splitlines()
|
||||
|
||||
@ -1373,7 +1361,7 @@ class _POFileParser(object):
|
||||
if tokens[0] in keywords and nb_tokens > 1:
|
||||
line = line[len(tokens[0]):].lstrip()
|
||||
if re.search(r'([^\\]|^)"', line[1:-1]):
|
||||
raise IOError('Syntax error in po file %s(line %s): '
|
||||
raise OSError('Syntax error in po file %s(line %s): '
|
||||
'unescaped double quote found' %
|
||||
(fpath, self.current_line))
|
||||
self.current_token = line
|
||||
@ -1391,7 +1379,7 @@ class _POFileParser(object):
|
||||
elif line[:1] == '"':
|
||||
# we are on a continuation line
|
||||
if re.search(r'([^\\]|^)"', line[1:-1]):
|
||||
raise IOError('Syntax error in po file %s(line %s): '
|
||||
raise OSError('Syntax error in po file %s(line %s): '
|
||||
'unescaped double quote found' %
|
||||
(fpath, self.current_line))
|
||||
self.process('mc')
|
||||
@ -1420,7 +1408,7 @@ class _POFileParser(object):
|
||||
|
||||
elif tokens[0] == '#|':
|
||||
if nb_tokens <= 1:
|
||||
raise IOError('Syntax error in po file %s(line %s)' %
|
||||
raise OSError('Syntax error in po file %s(line %s)' %
|
||||
(fpath, self.current_line))
|
||||
|
||||
# Remove the marker and any whitespace right after that.
|
||||
@ -1434,14 +1422,14 @@ class _POFileParser(object):
|
||||
|
||||
if nb_tokens == 2:
|
||||
# Invalid continuation line.
|
||||
raise IOError('Syntax error in po file %s(line %s): '
|
||||
raise OSError('Syntax error in po file %s(line %s): '
|
||||
'invalid continuation line' %
|
||||
(fpath, self.current_line))
|
||||
|
||||
# we are on a "previous translation" comment line,
|
||||
if tokens[1] not in prev_keywords:
|
||||
# Unknown keyword in previous translation comment.
|
||||
raise IOError('Syntax error in po file %s(line %s): '
|
||||
raise OSError('Syntax error in po file %s(line %s): '
|
||||
'unknown keyword %s' %
|
||||
(fpath, self.current_line,
|
||||
tokens[1]))
|
||||
@ -1453,7 +1441,7 @@ class _POFileParser(object):
|
||||
self.process(prev_keywords[tokens[1]])
|
||||
|
||||
else:
|
||||
raise IOError('Syntax error in po file %s(line %s)' %
|
||||
raise OSError('Syntax error in po file %s(line %s)' %
|
||||
(fpath, self.current_line))
|
||||
|
||||
if self.current_entry and len(tokens) > 0 and \
|
||||
@ -1524,7 +1512,7 @@ class _POFileParser(object):
|
||||
fpath = '%s ' % self.instance.fpath if self.instance.fpath else ''
|
||||
if hasattr(self.fhandle, 'close'):
|
||||
self.fhandle.close()
|
||||
raise IOError('Syntax error in po file %s(line %s)' %
|
||||
raise OSError('Syntax error in po file %s(line %s)' %
|
||||
(fpath, self.current_line))
|
||||
|
||||
# state handlers
|
||||
@ -1673,7 +1661,7 @@ class _POFileParser(object):
|
||||
# class _MOFileParser {{{
|
||||
|
||||
|
||||
class _MOFileParser(object):
|
||||
class _MOFileParser:
|
||||
"""
|
||||
A class to parse binary mo files.
|
||||
"""
|
||||
@ -1729,14 +1717,14 @@ class _MOFileParser(object):
|
||||
elif magic_number == MOFile.MAGIC_SWAPPED:
|
||||
ii = '>II'
|
||||
else:
|
||||
raise IOError('Invalid mo file, magic number is incorrect !')
|
||||
raise OSError('Invalid mo file, magic number is incorrect !')
|
||||
self.instance.magic_number = magic_number
|
||||
# parse the version number and the number of strings
|
||||
version, numofstrings = self._readbinary(ii, 8)
|
||||
# from MO file format specs: "A program seeing an unexpected major
|
||||
# revision number should stop reading the MO file entirely"
|
||||
if version >> 16 not in (0, 1):
|
||||
raise IOError('Invalid mo file, unexpected major revision number')
|
||||
raise OSError('Invalid mo file, unexpected major revision number')
|
||||
self.instance.version = version
|
||||
# original strings and translation strings hash table offset
|
||||
msgids_hash_offset, msgstrs_hash_offset = self._readbinary(ii, 8)
|
||||
@ -1777,8 +1765,8 @@ class _MOFileParser(object):
|
||||
entry = self._build_entry(
|
||||
msgid=msgid_tokens[0],
|
||||
msgid_plural=msgid_tokens[1],
|
||||
msgstr_plural=dict((k, v) for k, v in
|
||||
enumerate(msgstr.split(b('\0'))))
|
||||
msgstr_plural={k: v for k, v in
|
||||
enumerate(msgstr.split(b('\0')))}
|
||||
)
|
||||
else:
|
||||
entry = self._build_entry(msgid=msgid, msgstr=msgstr)
|
||||
|
@ -17,7 +17,6 @@ import time
|
||||
import uuid
|
||||
from contextlib import closing, suppress
|
||||
from functools import partial
|
||||
from typing import Optional
|
||||
|
||||
import apsw
|
||||
|
||||
@ -1007,7 +1006,7 @@ class DB:
|
||||
def add_notes_resource(self, path_or_stream, name, mtime=None) -> int:
|
||||
return self.notes.add_resource(self.conn, path_or_stream, name, mtime=mtime)
|
||||
|
||||
def get_notes_resource(self, resource_hash) -> Optional[dict]:
|
||||
def get_notes_resource(self, resource_hash) -> dict | None:
|
||||
return self.notes.get_resource_data(self.conn, resource_hash)
|
||||
|
||||
def notes_resources_used_by(self, field, item_id):
|
||||
|
@ -14,13 +14,13 @@ import sys
|
||||
import traceback
|
||||
import weakref
|
||||
from collections import defaultdict
|
||||
from collections.abc import MutableSet, Set
|
||||
from collections.abc import Iterable, MutableSet, Set
|
||||
from functools import partial, wraps
|
||||
from io import DEFAULT_BUFFER_SIZE, BytesIO
|
||||
from queue import Queue
|
||||
from threading import Lock
|
||||
from time import mktime, monotonic, sleep, time
|
||||
from typing import Iterable, NamedTuple, Optional, Tuple
|
||||
from typing import NamedTuple
|
||||
|
||||
from calibre import as_unicode, detect_ncpus, isbytestring
|
||||
from calibre.constants import iswindows, preferred_encoding
|
||||
@ -725,7 +725,7 @@ class Cache:
|
||||
return self.backend.add_notes_resource(path_or_stream_or_data, name, mtime)
|
||||
|
||||
@read_api
|
||||
def get_notes_resource(self, resource_hash) -> Optional[dict]:
|
||||
def get_notes_resource(self, resource_hash) -> dict | None:
|
||||
' Return a dict containing the resource data and name or None if no resource with the specified hash is found '
|
||||
return self.backend.get_notes_resource(resource_hash)
|
||||
|
||||
@ -3370,7 +3370,7 @@ class Cache:
|
||||
return dict.fromkeys(relpaths)
|
||||
|
||||
@read_api
|
||||
def list_extra_files(self, book_id, use_cache=False, pattern='') -> Tuple[ExtraFile, ...]:
|
||||
def list_extra_files(self, book_id, use_cache=False, pattern='') -> tuple[ExtraFile, ...]:
|
||||
'''
|
||||
Get information about extra files in the book's directory.
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
#!/usr/bin/env python
|
||||
# License: GPLv3 Copyright: 2023, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
from collections.abc import Sequence
|
||||
from dataclasses import dataclass
|
||||
from typing import Sequence
|
||||
|
||||
COVER_FILE_NAME = 'cover.jpg'
|
||||
METADATA_FILE_NAME = 'metadata.opf'
|
||||
|
@ -7,9 +7,9 @@ __docformat__ = 'restructuredtext en'
|
||||
|
||||
import sys
|
||||
from collections import Counter, defaultdict
|
||||
from collections.abc import Iterable
|
||||
from functools import partial
|
||||
from threading import Lock
|
||||
from typing import Iterable
|
||||
|
||||
from calibre.db.tables import MANY_MANY, MANY_ONE, ONE_ONE, null
|
||||
from calibre.db.utils import atof, force_to_bool
|
||||
|
@ -8,7 +8,6 @@ import time
|
||||
from collections import defaultdict
|
||||
from contextlib import suppress
|
||||
from itertools import count, repeat
|
||||
from typing import Optional
|
||||
|
||||
import apsw
|
||||
import xxhash
|
||||
@ -368,7 +367,7 @@ class Notes:
|
||||
name = f'{base_name}-{c}{ext}'
|
||||
return resource_hash
|
||||
|
||||
def get_resource_data(self, conn, resource_hash) -> Optional[dict]:
|
||||
def get_resource_data(self, conn, resource_hash) -> dict | None:
|
||||
ans = None
|
||||
for (name,) in conn.execute('SELECT name FROM notes_db.resources WHERE hash=?', (resource_hash,)):
|
||||
path = self.path_for_resource(resource_hash)
|
||||
|
@ -7,8 +7,8 @@ __docformat__ = 'restructuredtext en'
|
||||
|
||||
import numbers
|
||||
from collections import defaultdict
|
||||
from collections.abc import Iterable
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Iterable
|
||||
|
||||
from calibre.ebooks.metadata import author_to_author_sort
|
||||
from calibre.utils.date import UNDEFINED_DATE, parse_date, utc_tz
|
||||
|
@ -2,7 +2,6 @@ __license__ = 'GPL v3'
|
||||
__copyright__ = '2011, John Schember <john at nachtimwald.com>, refactored: 2022, Vaso Peras-Likodric <vaso at vipl.in.rs>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
from typing import Dict, Optional
|
||||
|
||||
'''
|
||||
Generates and writes an APNX page mapping file.
|
||||
@ -29,14 +28,14 @@ class APNXBuilder:
|
||||
Create an APNX file using a pseudo page mapping.
|
||||
"""
|
||||
|
||||
generators: Dict[str, IPageGenerator] = {
|
||||
generators: dict[str, IPageGenerator] = {
|
||||
FastPageGenerator.instance.name(): FastPageGenerator.instance,
|
||||
AccuratePageGenerator.instance.name(): AccuratePageGenerator.instance,
|
||||
PagebreakPageGenerator.instance.name(): PagebreakPageGenerator.instance,
|
||||
# ExactPageGenerator.instance.name(): ExactPageGenerator.instance,
|
||||
}
|
||||
|
||||
def write_apnx(self, mobi_file_path: str, apnx_path: str, method: Optional[str] = None, page_count: int = 0):
|
||||
def write_apnx(self, mobi_file_path: str, apnx_path: str, method: str | None = None, page_count: int = 0):
|
||||
"""
|
||||
If you want a fixed number of pages (such as from a custom column) then
|
||||
pass in a value to page_count, otherwise a count will be estimated
|
||||
@ -61,7 +60,7 @@ class APNXBuilder:
|
||||
fsync(apnxf)
|
||||
|
||||
@staticmethod
|
||||
def get_apnx_meta(mobi_file_path) -> Dict[str, str]:
|
||||
def get_apnx_meta(mobi_file_path) -> dict[str, str]:
|
||||
import uuid
|
||||
apnx_meta = {
|
||||
'guid': str(uuid.uuid4()).replace('-', '')[:8],
|
||||
|
@ -2,7 +2,6 @@ __license__ = 'GPL v3'
|
||||
__copyright__ = '2022, Vaso Peras-Likodric <vaso at vipl.in.rs>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from calibre.devices.kindle.apnx_page_generator.generators.fast_page_generator import FastPageGenerator
|
||||
from calibre.devices.kindle.apnx_page_generator.i_page_generator import IPageGenerator, mobi_html
|
||||
@ -16,10 +15,10 @@ class AccuratePageGenerator(IPageGenerator):
|
||||
def name(self) -> str:
|
||||
return "accurate"
|
||||
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
return FastPageGenerator.instance.generate(mobi_file_path, real_count)
|
||||
|
||||
def _generate(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
"""
|
||||
A more accurate but much more resource intensive and slower
|
||||
method to calculate the page length.
|
||||
|
@ -2,7 +2,6 @@ __license__ = 'GPL v3'
|
||||
__copyright__ = '2022, Vaso Peras-Likodric <vaso at vipl.in.rs>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from calibre.devices.kindle.apnx_page_generator.generators.fast_page_generator import FastPageGenerator
|
||||
from calibre.devices.kindle.apnx_page_generator.i_page_generator import IPageGenerator, mobi_html_length
|
||||
@ -16,10 +15,10 @@ class ExactPageGenerator(IPageGenerator):
|
||||
def name(self) -> str:
|
||||
return "exact"
|
||||
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
return FastPageGenerator.instance.generate(mobi_file_path, real_count)
|
||||
|
||||
def _generate(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
"""
|
||||
Given a specified page count (such as from a custom column),
|
||||
create our array of pages for the apnx file by dividing by
|
||||
|
@ -2,7 +2,6 @@ __license__ = 'GPL v3'
|
||||
__copyright__ = '2022, Vaso Peras-Likodric <vaso at vipl.in.rs>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
from typing import Optional
|
||||
|
||||
from calibre.devices.kindle.apnx_page_generator.i_page_generator import IPageGenerator, mobi_html_length
|
||||
from calibre.devices.kindle.apnx_page_generator.pages import Pages
|
||||
@ -13,10 +12,10 @@ class FastPageGenerator(IPageGenerator):
|
||||
def name(self) -> str:
|
||||
return "fast"
|
||||
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
raise Exception("Fast calculation impossible.")
|
||||
|
||||
def _generate(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
"""
|
||||
2300 characters of uncompressed text per page. This is
|
||||
not meant to map 1 to 1 to a print book but to be a
|
||||
|
@ -3,7 +3,6 @@ __copyright__ = '2022, Vaso Peras-Likodric <vaso at vipl.in.rs>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
import re
|
||||
from typing import Optional
|
||||
|
||||
from calibre.devices.kindle.apnx_page_generator.generators.fast_page_generator import FastPageGenerator
|
||||
from calibre.devices.kindle.apnx_page_generator.i_page_generator import IPageGenerator, mobi_html
|
||||
@ -15,10 +14,10 @@ class PagebreakPageGenerator(IPageGenerator):
|
||||
def name(self) -> str:
|
||||
return "pagebreak"
|
||||
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
return FastPageGenerator.instance.generate(mobi_file_path, real_count)
|
||||
|
||||
def _generate(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
""" Determine pages based on the presence of <*pagebreak*/>. """
|
||||
html = mobi_html(mobi_file_path)
|
||||
pages = []
|
||||
|
@ -4,7 +4,6 @@ __docformat__ = 'restructuredtext en'
|
||||
|
||||
import struct
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from typing import Optional
|
||||
|
||||
from calibre.devices.kindle.apnx_page_generator.pages import Pages
|
||||
from calibre.ebooks.pdb.header import PdbHeaderReader
|
||||
@ -15,14 +14,14 @@ from polyglot.builtins import as_bytes
|
||||
class IPageGenerator(metaclass=ABCMeta):
|
||||
|
||||
@abstractmethod
|
||||
def _generate(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def _generate_fallback(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
pass
|
||||
|
||||
def generate(self, mobi_file_path: str, real_count: Optional[int]) -> Pages:
|
||||
def generate(self, mobi_file_path: str, real_count: int | None) -> Pages:
|
||||
try:
|
||||
result = self._generate(mobi_file_path, real_count)
|
||||
if result.number_of_pages > 0:
|
||||
|
@ -2,32 +2,31 @@ __license__ = 'GPL v3'
|
||||
__copyright__ = '2022, Vaso Peras-Likodric <vaso at vipl.in.rs>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
from typing import List, Tuple, Union
|
||||
|
||||
from calibre.devices.kindle.apnx_page_generator.page_number_type import PageNumberTypes
|
||||
|
||||
|
||||
class PageGroup:
|
||||
"""Simulate constructor overloading"""
|
||||
def __init__(self, page_locations: Union[int, List[int]], page_number_type: PageNumberTypes, first_value: int,
|
||||
page_labels: Union[str, List[str], None] = None):
|
||||
def __init__(self, page_locations: int | list[int], page_number_type: PageNumberTypes, first_value: int,
|
||||
page_labels: str | list[str] | None = None):
|
||||
if page_locations.__class__ is int:
|
||||
self.page_locations: List[int] = [page_locations]
|
||||
self.page_locations: list[int] = [page_locations]
|
||||
else:
|
||||
self.page_locations: List[int] = page_locations
|
||||
self.page_locations: list[int] = page_locations
|
||||
self.__page_number_type: PageNumberTypes = page_number_type
|
||||
self.__first_value = first_value
|
||||
if page_number_type == PageNumberTypes.Custom:
|
||||
assert page_labels is not None
|
||||
if page_labels.__class__ is str:
|
||||
assert 1 == len(self.page_locations) and len(page_labels) > 0
|
||||
self.__page_number_labels: List[str] = [page_labels]
|
||||
self.__page_number_labels: list[str] = [page_labels]
|
||||
else:
|
||||
assert len(page_labels) == len(self.page_locations)
|
||||
assert all(len(label) > 0 for label in page_labels)
|
||||
self.__page_number_labels: List[str] = page_labels
|
||||
self.__page_number_labels: list[str] = page_labels
|
||||
|
||||
def append(self, page_location: Union[int, Tuple[int, str]]) -> None:
|
||||
def append(self, page_location: int | tuple[int, str]) -> None:
|
||||
if page_location.__class__ is int:
|
||||
assert self.__page_number_type != PageNumberTypes.Custom
|
||||
self.page_locations.append(page_location)
|
||||
@ -54,4 +53,4 @@ class PageGroup:
|
||||
values = str(self.__first_value)
|
||||
else:
|
||||
values = "|".join(self.__page_number_labels)
|
||||
return "({},{},{})".format(starting_location, self.__page_number_type.value, values)
|
||||
return f"({starting_location},{self.__page_number_type.value},{values})"
|
||||
|
@ -3,18 +3,17 @@ __copyright__ = '2022, Vaso Peras-Likodric <vaso at vipl.in.rs>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
||||
import itertools
|
||||
from typing import List, Optional
|
||||
|
||||
from calibre.devices.kindle.apnx_page_generator.page_group import PageGroup
|
||||
from calibre.devices.kindle.apnx_page_generator.page_number_type import PageNumberTypes
|
||||
|
||||
|
||||
class Pages:
|
||||
def __init__(self, page_locations: Optional[List[int]] = None):
|
||||
def __init__(self, page_locations: list[int] | None = None):
|
||||
if page_locations.__class__ is list:
|
||||
self.__pages_groups: List[PageGroup] = [PageGroup(page_locations, PageNumberTypes.Arabic, 1)]
|
||||
self.__pages_groups: list[PageGroup] = [PageGroup(page_locations, PageNumberTypes.Arabic, 1)]
|
||||
else:
|
||||
self.__pages_groups: List[PageGroup] = []
|
||||
self.__pages_groups: list[PageGroup] = []
|
||||
|
||||
def append(self, page_location: PageGroup) -> None:
|
||||
self.__pages_groups.append(page_location)
|
||||
@ -34,7 +33,7 @@ class Pages:
|
||||
return ",".join(result)
|
||||
|
||||
@property
|
||||
def page_locations(self) -> List[int]:
|
||||
def page_locations(self) -> list[int]:
|
||||
return list(itertools.chain.from_iterable(list(map(lambda pg: pg.page_locations, self.__pages_groups))))
|
||||
|
||||
@property
|
||||
|
@ -12,8 +12,9 @@ import os
|
||||
import posixpath
|
||||
import sys
|
||||
import traceback
|
||||
from collections.abc import Sequence
|
||||
from io import BytesIO
|
||||
from typing import NamedTuple, Sequence
|
||||
from typing import NamedTuple
|
||||
|
||||
from calibre import prints
|
||||
from calibre.constants import iswindows, numeric_version
|
||||
|
@ -13,7 +13,6 @@ from collections import defaultdict, deque
|
||||
from datetime import datetime
|
||||
from itertools import chain
|
||||
from operator import attrgetter
|
||||
from typing import Dict, Tuple
|
||||
|
||||
from calibre import force_unicode, human_readable, prints
|
||||
from calibre.constants import iswindows
|
||||
@ -121,7 +120,7 @@ class FileOrFolder:
|
||||
return not self.files and not self.folders
|
||||
|
||||
@property
|
||||
def id_map(self) -> Dict[int, 'FileOrFolder']:
|
||||
def id_map(self) -> dict[int, 'FileOrFolder']:
|
||||
return self.fs_cache().id_maps[self.storage_id]
|
||||
|
||||
@property
|
||||
@ -141,7 +140,7 @@ class FileOrFolder:
|
||||
return self.fs_cache().storage(self.storage_id)
|
||||
|
||||
@property
|
||||
def full_path(self) -> Tuple[str, ...]:
|
||||
def full_path(self) -> tuple[str, ...]:
|
||||
parts = deque()
|
||||
parts.append(self.name)
|
||||
p = self.parent
|
||||
|
@ -159,7 +159,7 @@ class ConnectionListener(Thread):
|
||||
device_socket = None
|
||||
self.driver._debug('driver is not answering')
|
||||
|
||||
except socket.timeout:
|
||||
except TimeoutError:
|
||||
pass
|
||||
except OSError:
|
||||
x = sys.exc_info()[1]
|
||||
@ -648,7 +648,7 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
|
||||
if not wait_for_response:
|
||||
return None, None
|
||||
return self._receive_from_client(print_debug_info=print_debug_info)
|
||||
except socket.timeout:
|
||||
except TimeoutError:
|
||||
self._debug('timeout communicating with device')
|
||||
self._close_device_socket()
|
||||
raise TimeoutError('Device did not respond in reasonable time')
|
||||
@ -676,7 +676,7 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
|
||||
self._debug('receive after decode') # , v)
|
||||
return (self.reverse_opcodes[v[0]], v[1])
|
||||
self._debug('protocol error -- empty json string')
|
||||
except socket.timeout:
|
||||
except TimeoutError:
|
||||
self._debug('timeout communicating with device')
|
||||
self._close_device_socket()
|
||||
raise TimeoutError('Device did not respond in reasonable time')
|
||||
@ -1202,7 +1202,7 @@ class SMART_DEVICE_APP(DeviceConfig, DevicePlugin):
|
||||
pass
|
||||
|
||||
return True
|
||||
except socket.timeout:
|
||||
except TimeoutError:
|
||||
self._close_device_socket()
|
||||
except OSError:
|
||||
x = sys.exc_info()[1]
|
||||
|
@ -61,7 +61,7 @@ class PDFInput(InputFormatPlugin):
|
||||
from calibre.ebooks.pdf.reflow import PDFDocument
|
||||
from calibre.utils.cleantext import clean_ascii_chars
|
||||
pdftohtml(os.getcwd(), stream.name, self.opts.no_images, as_xml=True)
|
||||
with open(u'index.xml', 'rb') as f:
|
||||
with open('index.xml', 'rb') as f:
|
||||
xml = clean_ascii_chars(f.read())
|
||||
PDFDocument(xml, self.opts, self.log)
|
||||
else:
|
||||
|
@ -40,7 +40,7 @@ class FDST:
|
||||
def __str__(self):
|
||||
ans = ['FDST record']
|
||||
def a(k, v):
|
||||
return ans.append('{}: {}'.format(k, v))
|
||||
return ans.append(f'{k}: {v}')
|
||||
a('Offset to sections', self.sec_off)
|
||||
a('Number of section records', self.num_sections)
|
||||
ans.append('**** %d Sections ****'% len(self.sections))
|
||||
|
@ -14,7 +14,6 @@ import sys
|
||||
from collections import defaultdict
|
||||
from itertools import count
|
||||
from operator import attrgetter
|
||||
from typing import Optional
|
||||
|
||||
from lxml import etree, html
|
||||
|
||||
@ -1027,7 +1026,7 @@ class Manifest:
|
||||
# }}}
|
||||
|
||||
@property
|
||||
def data_as_bytes_or_none(self) -> Optional[bytes]:
|
||||
def data_as_bytes_or_none(self) -> bytes | None:
|
||||
if self._loader is None:
|
||||
return None
|
||||
return self._loader(getattr(self, 'html_input_href', self.href))
|
||||
|
@ -385,7 +385,7 @@ class Text(Element):
|
||||
return self.raw
|
||||
|
||||
def dump(self, f):
|
||||
f.write('T top={}, left={}, width={}, height={}: '.format(self.top, self.left, self.width, self.height))
|
||||
f.write(f'T top={self.top}, left={self.left}, width={self.width}, height={self.height}: ')
|
||||
f.write(self.to_html().encode('utf-8'))
|
||||
f.write('\n')
|
||||
|
||||
@ -422,7 +422,7 @@ class Paragraph(Text):
|
||||
return self.raw
|
||||
|
||||
def dump(self, f):
|
||||
f.write('P top={}, left={}, width={}, height={}: '.format(self.top, self.left, self.width, self.height))
|
||||
f.write(f'P top={self.top}, left={self.left}, width={self.width}, height={self.height}: ')
|
||||
f.write(self.to_html().encode('utf-8'))
|
||||
f.write('\n')
|
||||
|
||||
|
@ -1,6 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
__license__ = 'GPL 3'
|
||||
__copyright__ = '2010 Hiroshi Miura <miurahr@linux.com>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
@ -1,6 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
__license__ = 'GPL 3'
|
||||
__copyright__ = '2010 Hiroshi Miura <miurahr@linux.com>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
@ -1,6 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
__license__ = 'GPL 3'
|
||||
__copyright__ = '2009, John Schember <john@nachtimwald.com>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
@ -1,6 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
__license__ = 'GPL 3'
|
||||
__copyright__ = '2010 Hiroshi Miura <miurahr@linux.com>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
@ -1,6 +1,3 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
__license__ = 'GPL 3'
|
||||
__copyright__ = '2010 Hiroshi Miura <miurahr@linux.com>'
|
||||
__docformat__ = 'restructuredtext en'
|
||||
|
@ -106,7 +106,7 @@ class AllGUIActions(InterfaceAction):
|
||||
for n,v in kbd.keys_map.items():
|
||||
act_name = kbd.shortcuts[n]['name'].lower()
|
||||
if act_name in lower_names:
|
||||
shortcuts = list((sc.toString() for sc in v))
|
||||
shortcuts = list(sc.toString() for sc in v)
|
||||
shortcut_map[act_name] = f'\t{", ".join(shortcuts)}'
|
||||
|
||||
# This function constructs a menu action, dealing with the action being
|
||||
|
@ -97,12 +97,12 @@ class DocViewer(Dialog):
|
||||
|
||||
b = self.back_button = self.bb.addButton(_('&Back'), QDialogButtonBox.ButtonRole.ActionRole)
|
||||
b.clicked.connect(self.back)
|
||||
b.setToolTip((_('Displays the previously viewed function')))
|
||||
b.setToolTip(_('Displays the previously viewed function'))
|
||||
b.setEnabled(False)
|
||||
|
||||
b = self.bb.addButton(_('Show &all functions'), QDialogButtonBox.ButtonRole.ActionRole)
|
||||
b.clicked.connect(self.show_all_functions_button_clicked)
|
||||
b.setToolTip((_('Shows a list of all built-in functions in alphabetic order')))
|
||||
b.setToolTip(_('Shows a list of all built-in functions in alphabetic order'))
|
||||
|
||||
def back(self):
|
||||
if not self.back_stack:
|
||||
|
@ -3,7 +3,7 @@
|
||||
|
||||
|
||||
from time import monotonic
|
||||
from typing import NamedTuple, Tuple
|
||||
from typing import NamedTuple
|
||||
|
||||
from qt.core import QObject, QTimer, pyqtSignal
|
||||
|
||||
@ -18,7 +18,7 @@ class ExtraFile(NamedTuple):
|
||||
|
||||
class ExtraFiles(NamedTuple):
|
||||
last_changed_at: float
|
||||
files: Tuple[ExtraFile, ...]
|
||||
files: tuple[ExtraFile, ...]
|
||||
|
||||
|
||||
class ExtraFilesWatcher(QObject):
|
||||
|
@ -3,8 +3,8 @@
|
||||
|
||||
import time
|
||||
import traceback
|
||||
from collections.abc import Iterator
|
||||
from operator import attrgetter
|
||||
from typing import Iterator, List
|
||||
|
||||
from qt.core import (
|
||||
QAbstractItemView,
|
||||
@ -102,7 +102,7 @@ class TrashList(QListWidget):
|
||||
|
||||
restore_item = pyqtSignal(object, object)
|
||||
|
||||
def __init__(self, entries: List[TrashEntry], parent: 'TrashView', is_books: bool):
|
||||
def __init__(self, entries: list[TrashEntry], parent: 'TrashView', is_books: bool):
|
||||
super().__init__(parent)
|
||||
self.is_books = is_books
|
||||
self.db = parent.db
|
||||
|
@ -8,12 +8,13 @@ import os
|
||||
import re
|
||||
import sys
|
||||
from collections import deque
|
||||
from collections.abc import Iterable, Iterator
|
||||
from contextlib import suppress
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
from itertools import count
|
||||
from time import monotonic
|
||||
from typing import BinaryIO, Iterable, Iterator
|
||||
from typing import BinaryIO
|
||||
|
||||
from qt.core import (
|
||||
QAudio,
|
||||
|
@ -2,7 +2,7 @@
|
||||
# License: GPL v3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
from contextlib import suppress
|
||||
from typing import List, NamedTuple, Optional, Tuple
|
||||
from typing import NamedTuple
|
||||
|
||||
from css_parser.css import CSSRule
|
||||
from css_selectors import Select, SelectorError
|
||||
@ -21,9 +21,9 @@ class NoMatchingRuleFound(KeyError):
|
||||
|
||||
|
||||
class RuleLocation(NamedTuple):
|
||||
rule_address: List[int]
|
||||
rule_address: list[int]
|
||||
file_name: str
|
||||
style_tag_address: Optional[Tuple[int, List[int]]] = None
|
||||
style_tag_address: tuple[int, list[int]] | None = None
|
||||
|
||||
|
||||
def rule_matches_elem(rule, elem, select, class_name):
|
||||
|
@ -13,7 +13,6 @@ import traceback
|
||||
from contextlib import suppress
|
||||
from functools import lru_cache, partial
|
||||
from io import BytesIO
|
||||
from typing import Union
|
||||
|
||||
from calibre import as_unicode
|
||||
from calibre.constants import iswindows
|
||||
@ -162,7 +161,7 @@ def is_ip_trusted(remote_addr, trusted_ips):
|
||||
return False
|
||||
|
||||
|
||||
def is_local_address(addr: Union[ipaddress.IPv4Address, ipaddress.IPv6Address, None]):
|
||||
def is_local_address(addr: ipaddress.IPv4Address | ipaddress.IPv6Address | None):
|
||||
if addr is None:
|
||||
return False
|
||||
if addr.is_loopback:
|
||||
|
@ -92,7 +92,7 @@ class LoopTest(BaseTest):
|
||||
with self.assertRaises(socket.timeout):
|
||||
res = conn.getresponse()
|
||||
if int(res.status) == int(http_client.REQUEST_TIMEOUT):
|
||||
raise socket.timeout('Timeout')
|
||||
raise TimeoutError('Timeout')
|
||||
raise Exception('Got unexpected response: code: {} {} headers: {!r} data: {!r}'.format(
|
||||
res.status, res.reason, res.getheaders(), res.read()))
|
||||
self.ae(pool.busy, 1)
|
||||
|
@ -6,8 +6,8 @@ import shutil
|
||||
import stat
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from collections.abc import Callable
|
||||
from contextlib import suppress
|
||||
from typing import Callable, Dict, List, Set, Tuple, Union
|
||||
|
||||
from calibre.constants import filesystem_encoding, iswindows
|
||||
from calibre.utils.filenames import make_long_path_useable, samefile, windows_hardlink
|
||||
@ -16,14 +16,14 @@ if iswindows:
|
||||
from calibre_extensions import winutil
|
||||
|
||||
WINDOWS_SLEEP_FOR_RETRY_TIME = 2 # seconds
|
||||
WindowsFileId = Tuple[int, int, int]
|
||||
WindowsFileId = tuple[int, int, int]
|
||||
|
||||
class UnixFileCopier:
|
||||
|
||||
def __init__(self, delete_all=False, allow_move=False):
|
||||
self.delete_all = delete_all
|
||||
self.allow_move = allow_move
|
||||
self.copy_map: Dict[str, str] = {}
|
||||
self.copy_map: dict[str, str] = {}
|
||||
|
||||
def register(self, path: str, dest: str) -> None:
|
||||
self.copy_map[path] = dest
|
||||
@ -81,12 +81,12 @@ class WindowsFileCopier:
|
||||
def __init__(self, delete_all=False, allow_move=False):
|
||||
self.delete_all = delete_all
|
||||
self.allow_move = allow_move
|
||||
self.path_to_fileid_map : Dict[str, WindowsFileId] = {}
|
||||
self.fileid_to_paths_map: Dict[WindowsFileId, Set[str]] = defaultdict(set)
|
||||
self.path_to_handle_map: Dict[str, 'winutil.Handle'] = {}
|
||||
self.folder_to_handle_map: Dict[str, 'winutil.Handle'] = {}
|
||||
self.folders: List[str] = []
|
||||
self.copy_map: Dict[str, str] = {}
|
||||
self.path_to_fileid_map : dict[str, WindowsFileId] = {}
|
||||
self.fileid_to_paths_map: dict[WindowsFileId, set[str]] = defaultdict(set)
|
||||
self.path_to_handle_map: dict[str, 'winutil.Handle'] = {}
|
||||
self.folder_to_handle_map: dict[str, 'winutil.Handle'] = {}
|
||||
self.folders: list[str] = []
|
||||
self.copy_map: dict[str, str] = {}
|
||||
|
||||
def register(self, path: str, dest: str) -> None:
|
||||
with suppress(OSError):
|
||||
@ -184,11 +184,11 @@ class WindowsFileCopier:
|
||||
winutil.move_file(make_long_path_useable(src_path), make_long_path_useable(dest_path))
|
||||
|
||||
|
||||
def get_copier(delete_all=False, allow_move=False) -> Union[UnixFileCopier, WindowsFileCopier]:
|
||||
def get_copier(delete_all=False, allow_move=False) -> UnixFileCopier | WindowsFileCopier:
|
||||
return (WindowsFileCopier if iswindows else UnixFileCopier)(delete_all, allow_move)
|
||||
|
||||
|
||||
def rename_files(src_to_dest_map: Dict[str, str]) -> None:
|
||||
def rename_files(src_to_dest_map: dict[str, str]) -> None:
|
||||
' Rename a bunch of files. On Windows all files are locked before renaming so no other process can interfere. '
|
||||
copier = get_copier(allow_move=True)
|
||||
for s, d in src_to_dest_map.items():
|
||||
@ -197,7 +197,7 @@ def rename_files(src_to_dest_map: Dict[str, str]) -> None:
|
||||
copier.rename_all()
|
||||
|
||||
|
||||
def copy_files(src_to_dest_map: Dict[str, str], delete_source: bool = False) -> None:
|
||||
def copy_files(src_to_dest_map: dict[str, str], delete_source: bool = False) -> None:
|
||||
copier = get_copier(delete_source)
|
||||
for s, d in src_to_dest_map.items():
|
||||
if not samefile(s, d):
|
||||
@ -211,7 +211,7 @@ def identity_transform(src_path: str, dest_path: str) -> str:
|
||||
|
||||
|
||||
def register_folder_recursively(
|
||||
src: str, copier: Union[UnixFileCopier, WindowsFileCopier], dest_dir: str,
|
||||
src: str, copier: UnixFileCopier | WindowsFileCopier, dest_dir: str,
|
||||
transform_destination_filename: Callable[[str, str], str] = identity_transform,
|
||||
read_only: bool = False
|
||||
) -> None:
|
||||
|
@ -1,5 +1,4 @@
|
||||
#!/usr/bin/env python
|
||||
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:fdm=marker:ai
|
||||
|
||||
|
||||
__license__ = 'GPL v3'
|
||||
|
@ -8,7 +8,6 @@ import os
|
||||
import secrets
|
||||
import stat
|
||||
import struct
|
||||
from typing import Optional, Union
|
||||
|
||||
from calibre.constants import ismacos, iswindows
|
||||
|
||||
@ -44,7 +43,7 @@ class SharedMemory:
|
||||
'''
|
||||
_fd: int = -1
|
||||
_name: str = ''
|
||||
_mmap: Optional[mmap.mmap] = None
|
||||
_mmap: mmap.mmap | None = None
|
||||
_size: int = 0
|
||||
size_fmt = '!I'
|
||||
num_bytes_for_size = struct.calcsize(size_fmt)
|
||||
@ -158,7 +157,7 @@ class SharedMemory:
|
||||
def flush(self) -> None:
|
||||
self.mmap.flush()
|
||||
|
||||
def write_data_with_size(self, data: Union[str, bytes]) -> None:
|
||||
def write_data_with_size(self, data: str | bytes) -> None:
|
||||
if isinstance(data, str):
|
||||
data = data.encode('utf-8')
|
||||
sz = struct.pack(self.size_fmt, len(data))
|
||||
|
@ -9,9 +9,10 @@ import calendar
|
||||
import json
|
||||
import os
|
||||
import zipfile
|
||||
from collections.abc import Sequence
|
||||
from datetime import timedelta
|
||||
from threading import RLock
|
||||
from typing import Dict, NamedTuple, Optional, Sequence
|
||||
from typing import NamedTuple
|
||||
|
||||
from lxml import etree
|
||||
from lxml.builder import ElementMaker
|
||||
@ -298,7 +299,7 @@ class RecipeCustomization(NamedTuple):
|
||||
add_title_tag: bool = False
|
||||
custom_tags: Sequence[str] = ()
|
||||
keep_issues: int = 0
|
||||
recipe_specific_options: Optional[Dict[str, str]] = None
|
||||
recipe_specific_options: dict[str, str] | None = None
|
||||
|
||||
|
||||
class SchedulerConfig:
|
||||
|
@ -1,6 +1,5 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
from __future__ import absolute_import, division, print_function, unicode_literals
|
||||
|
||||
import json
|
||||
from pprint import pprint
|
||||
|
Loading…
x
Reference in New Issue
Block a user