Start work on editor tool to download external resources

This commit is contained in:
Kovid Goyal 2016-10-09 15:23:19 +05:30
parent 7914ada946
commit 5ad2a42f64

View File

@ -0,0 +1,122 @@
#!/usr/bin/env python2
# vim:fileencoding=utf-8
# License: GPLv3 Copyright: 2016, Kovid Goyal <kovid at kovidgoyal.net>
from __future__ import (unicode_literals, division, absolute_import,
print_function)
import shutil, os, posixpath, cgi, mimetypes
from collections import defaultdict
from contextlib import closing
from urlparse import urlparse
from multiprocessing.dummy import Pool
from functools import partial
from tempfile import NamedTemporaryFile
from urllib2 import urlopen
from calibre import as_unicode, sanitize_file_name2
from calibre.ebooks.oeb.polish.utils import guess_type
from calibre.ebooks.oeb.base import OEB_DOCS, iterlinks, barename, OEB_STYLES
from calibre.ptempfile import TemporaryDirectory
from calibre.web import get_download_filename_from_response
def is_external(url):
try:
purl = urlparse(url)
except Exception:
return False
return purl.scheme in ('http', 'https', 'file', 'ftp')
def iterhtmllinks(container, name):
for el, attr, link, pos in iterlinks(container.parsed(name)):
tag = barename(el.tag).lower()
if tag != 'a' and is_external(link):
yield el, attr, link
def get_external_resources(container):
ans = defaultdict(list)
for name, media_type in container.mime_map.iteritems():
if container.has_name(name) and container.exists(name):
if media_type in OEB_DOCS:
for el, attr, link in iterhtmllinks(container, name):
ans[link].append(name)
elif media_type in OEB_STYLES:
for link in container.iterlinks(name):
ans[link].append(name)
return ans
def get_filename(original_url_parsed, response):
ans = get_download_filename_from_response(response) or posixpath.basename(original_url_parsed.path) or 'unknown'
ct = response.info().get('Content-Type', '')
if ct:
ct = cgi.parse_header(ct)[0].lower()
if ct:
mt = guess_type(ans)
if mt != ct:
exts = mimetypes.guess_all_extensions(ct)
if exts:
ans += exts[0]
return ans
def download_one(tdir, timeout, url):
try:
purl = urlparse(url)
with NamedTemporaryFile(dir=tdir, delete=False) as dest:
if purl.scheme == 'file':
src = lopen(purl.path, 'rb')
filename = os.path.basename(src)
else:
src = urlopen(url, timeout=timeout)
filename = get_filename(purl, src)
with src:
shutil.copyfileobj(src, dest)
filename = sanitize_file_name2(filename)
mt = guess_type(filename)
if mt in OEB_DOCS:
raise ValueError('The external resource {} looks like a HTML document ({})'.format(url, filename))
if not mt or mt == 'application/octet-stream' or '.' not in filename:
raise ValueError('The external resource {} is not of a known type'.format(url))
return True, (url, sanitize_file_name2(filename), dest.name, mt)
except Exception as err:
return False, url, as_unicode(err)
def download_external_resoures(container, urls, timeout=60):
failures = {}
replacements = {}
with TemporaryDirectory('editor_download') as tdir:
pool = Pool(10)
with closing(pool):
for ok, result in pool.imap_unordered(partial(download_one, tdir, timeout), urls):
if ok:
url, suggested_filename, downloaded_file, mt = result
with lopen(downloaded_file, 'rb') as src:
name = container.add_file(suggested_filename, src, mt, modify_name_if_needed=True)
replacements[url] = name
else:
url, err = result
failures[url] = err
return replacements, failures
def replacer(url_map):
def replace(url):
r = url_map.get(url)
replace.replaced |= r is not None
return url if r is None else r
replace.replaced = False
return replace
def replace_resources(container, urls, replacements):
url_maps = defaultdict(dict)
changed = False
for url, name in urls.iteritems():
replacement = replacements.get(url)
if replacement is not None:
url_maps[name][url] = container.name_to_href(replacement, name)
for name, url_map in url_maps.iteritems():
r = replacer(url_map)
container.replace_links(name, r)
changed |= r.replaced
return changed