Modernize infra for loading plugins

Enable the new meta plugin finder and resource loading infrastructure from
python 3.7
This commit is contained in:
Kovid Goyal 2020-09-30 22:43:58 +05:30
parent 2938e2c203
commit bbb050b77c
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -6,9 +6,10 @@ __license__ = 'GPL v3'
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
import os, zipfile, posixpath, importlib, threading, re, imp, sys
import os, zipfile, posixpath, importlib, threading, re, sys
from collections import OrderedDict
from functools import partial
from importlib.machinery import ModuleSpec
from calibre import as_unicode
from calibre.customize import (Plugin, numeric_version, platform,
@ -114,76 +115,148 @@ def load_translations(namespace, zfp):
namespace['ngettext'] = trans.ngettext
class PluginLoader(object):
class CalibrePluginLoader:
__slots__ = (
'plugin_name', 'fullname_in_plugin', 'zip_file_path', '_is_package', 'names',
'filename', 'all_names'
)
def __init__(self, plugin_name, fullname_in_plugin, zip_file_path, names, filename, is_package, all_names):
self.plugin_name = plugin_name
self.fullname_in_plugin = fullname_in_plugin
self.zip_file_path = zip_file_path
self.names = names
self.filename = filename
self._is_package = is_package
self.all_names = all_names
def __eq__(self, other):
return (
self.__class__ == other.__class__ and
self.plugin_name == other.plugin_name and
self.fullname_in_plugin == other.fullname_in_plugin
)
def __hash__(self):
return hash(self.name) ^ hash(self.plugin_name) ^ hash(self.fullname_in_plugin)
def create_module(self, spec):
pass
def is_package(self, fullname):
return self._is_package
def get_source(self, fullname=None):
src = b''
if self.plugin_name and self.fullname_in_plugin and self.zip_file_path:
zinfo = self.names.get(self.fullname_in_plugin)
if zinfo is not None:
with zipfile.ZipFile(self.zip_file_path) as zf:
try:
src = zf.read(zinfo)
except Exception:
# Maybe the zip file changed from under us
src = zf.read(zinfo.filename)
return src.decode('utf-8').replace('\r\n', '\n')
def get_filename(self, fullname):
return self.filename
def get_code(self, fullname=None):
return compile(self.get_source(fullname), f'calibre_plugins.{self.plugin_name}.{self.fullname_in_plugin}',
'exec', dont_inherit=True)
def exec_module(self, module):
compiled = self.get_code()
module.__file__ = self.filename
if self.zip_file_path:
zfp = self.zip_file_path
module.__dict__['get_resources'] = partial(get_resources, zfp)
module.__dict__['get_icons'] = partial(get_icons, zfp)
module.__dict__['load_translations'] = partial(load_translations, module.__dict__, zfp)
exec(compiled, module.__dict__)
def resource_path(self, name):
raise FileNotFoundError(
f'{name} is not available as a filesystem path in calibre plugins')
def contents(self):
if not self._is_package:
return ()
zinfo = self.names.get(self.fullname_in_plugin)
if zinfo is None:
return ()
base = posixpath.dirname(zinfo.filename)
if base:
base += '/'
def is_ok(x):
if not base or x.startswith(base):
rest = x[len(base):]
return '/' not in rest
return False
return tuple(filter(is_ok, self.all_names))
def is_resource(self, name):
zinfo = self.names.get(self.fullname_in_plugin)
if zinfo is None:
return False
base = posixpath.dirname(zinfo.filename)
q = posixpath.join(base, name)
return q in self.all_names
def open_resource(self, name):
zinfo = self.names.get(self.fullname_in_plugin)
if zinfo is None:
raise FileNotFoundError(f'{self.fullname_in_plugin} not in plugin zip file')
base = posixpath.dirname(zinfo.filename)
q = posixpath.join(base, name)
with zipfile.ZipFile(self.zip_file_path) as zf:
return zf.open(q)
class CalibrePluginFinder:
def __init__(self):
self.loaded_plugins = {}
self._lock = threading.RLock()
self._identifier_pat = re.compile(r'[a-zA-Z][_0-9a-zA-Z]*')
def _get_actual_fullname(self, fullname):
def find_spec(self, fullname, path, target=None):
if not fullname.startswith('calibre_plugins'):
return
parts = fullname.split('.')
if parts[0] == 'calibre_plugins':
if len(parts) == 1:
return parts[0], None
if parts[0] != 'calibre_plugins':
return
plugin_name = fullname_in_plugin = zip_file_path = filename = None
all_names = frozenset()
names = OrderedDict()
if len(parts) > 1:
plugin_name = parts[1]
with self._lock:
names = self.loaded_plugins.get(plugin_name, None)
if names is None:
raise ImportError('No plugin named %r loaded'%plugin_name)
names = names[1]
fullname = '.'.join(parts[2:])
if not fullname:
fullname = '__init__'
if fullname in names:
return fullname, plugin_name
if fullname+'.__init__' in names:
return fullname+'.__init__', plugin_name
return None, None
zip_file_path, names, all_names = self.loaded_plugins.get(plugin_name, (None, None, None))
if zip_file_path is None:
return
fullname_in_plugin = '.'.join(parts[2:])
if not fullname_in_plugin:
fullname_in_plugin = '__init__'
if fullname_in_plugin not in names:
if fullname_in_plugin + '.__init__' in names:
fullname_in_plugin += '.__init__'
else:
return
is_package = fullname.count('.') < 2 or fullname == '__init__' or fullname.endswith('__init__')
if zip_file_path:
filename = posixpath.join(zip_file_path, *fullname_in_plugin.split('.')) + '.py'
def find_module(self, fullname, path=None):
fullname, plugin_name = self._get_actual_fullname(fullname)
if fullname is None and plugin_name is None:
return None
return self
def load_module(self, fullname):
import_name, plugin_name = self._get_actual_fullname(fullname)
if import_name is None and plugin_name is None:
raise ImportError('No plugin named %r is loaded'%fullname)
mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod.__file__ = "<calibre Plugin Loader>"
mod.__loader__ = self
if import_name.endswith('.__init__') or import_name in ('__init__',
'calibre_plugins'):
# We have a package
mod.__path__ = []
if plugin_name is not None:
# We have some actual code to load
with self._lock:
zfp, names = self.loaded_plugins.get(plugin_name, (None, None))
if names is None:
raise ImportError('No plugin named %r loaded'%plugin_name)
zinfo = names.get(import_name, None)
if zinfo is None:
raise ImportError('Plugin %r has no module named %r' %
(plugin_name, import_name))
with zipfile.ZipFile(zfp) as zf:
try:
code = zf.read(zinfo)
except:
# Maybe the zip file changed from under us
code = zf.read(zinfo.filename)
compiled = compile(code, 'calibre_plugins.%s.%s'%(plugin_name,
import_name), 'exec', dont_inherit=True)
mod.__dict__['get_resources'] = partial(get_resources, zfp)
mod.__dict__['get_icons'] = partial(get_icons, zfp)
mod.__dict__['load_translations'] = partial(load_translations, mod.__dict__, zfp)
exec(compiled, mod.__dict__)
return mod
return ModuleSpec(
fullname,
CalibrePluginLoader(plugin_name, fullname_in_plugin, zip_file_path, names, filename, is_package, all_names),
is_package=is_package, origin=filename
)
def load(self, path_to_zip_file):
if not os.access(path_to_zip_file, os.R_OK):
@ -231,9 +304,8 @@ class PluginLoader(object):
raise
def _locate_code(self, zf, path_to_zip_file):
names = [x if isinstance(x, unicode_type) else x.decode('utf-8') for x in
zf.namelist()]
names = [x[1:] if x[0] == '/' else x for x in names]
all_names = frozenset(zf.namelist())
names = [x[1:] if x[0] == '/' else x for x in all_names]
plugin_name = None
for name in names:
@ -291,13 +363,13 @@ class PluginLoader(object):
% path_to_zip_file)
with self._lock:
self.loaded_plugins[plugin_name] = (path_to_zip_file, names)
self.loaded_plugins[plugin_name] = path_to_zip_file, names, tuple(all_names)
return plugin_name
loader = PluginLoader()
sys.meta_path.insert(0, loader)
loader = CalibrePluginFinder()
sys.meta_path.append(loader)
if __name__ == '__main__':