mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-06-23 15:30:45 -04:00
Export notes as a single HTML file with embedded images
This commit is contained in:
parent
7df0ec7bee
commit
d3ae36a965
@ -1029,12 +1029,12 @@ class DB:
|
||||
def restore_notes(self, report_progress):
|
||||
self.notes.restore(self.conn, self.tables, report_progress)
|
||||
|
||||
def import_note(self, field, item_id, html_file_path):
|
||||
def import_note(self, field, item_id, html, basedir, ctime, mtime):
|
||||
id_val = self.tables[field].id_map[item_id]
|
||||
return self.notes.import_note(self.conn, field, item_id, id_val, html_file_path)
|
||||
return self.notes.import_note(self.conn, field, item_id, id_val, html, basedir, ctime, mtime)
|
||||
|
||||
def export_note(self, field, item_id, dest_dir):
|
||||
return self.notes.export_note(self.conn, field, item_id, dest_dir)
|
||||
def export_note(self, field, item_id):
|
||||
return self.notes.export_note(self.conn, field, item_id)
|
||||
|
||||
def initialize_fts(self, dbref):
|
||||
self.fts = None
|
||||
|
@ -706,12 +706,16 @@ class Cache:
|
||||
return self.backend.unretire_note_for(field, item_id)
|
||||
|
||||
@read_api
|
||||
def export_note(self, field, item_id, dest_dir):
|
||||
return self.backend.export_note(field, item_id, dest_dir)
|
||||
def export_note(self, field, item_id):
|
||||
return self.backend.export_note(field, item_id)
|
||||
|
||||
@write_api
|
||||
def import_note(self, field, item_id, path_to_html_file):
|
||||
return self.backend.import_note(field, item_id, path_to_html_file)
|
||||
with open(path_to_html_file, 'rb') as f:
|
||||
html = f.read()
|
||||
st = os.stat(f.fileno())
|
||||
basedir = os.path.dirname(os.path.abspath(path_to_html_file))
|
||||
return self.backend.import_note(field, item_id, html, basedir, st.st_ctime, st.st_mtime)
|
||||
|
||||
@write_api # we need to use write locking as SQLITE gives a locked table error if multiple FTS queries are made at the same time
|
||||
def notes_search(
|
||||
|
@ -13,6 +13,7 @@ NOTES_DB_NAME = 'notes.db'
|
||||
DATA_DIR_NAME = 'data'
|
||||
DATA_FILE_PATTERN = f'{DATA_DIR_NAME}/**/*'
|
||||
BOOK_ID_PATH_TEMPLATE = ' ({})'
|
||||
RESOURCE_URL_SCHEME = 'calres'
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -33,7 +33,6 @@ copy_marked_up_text = cmt()
|
||||
SEP = b'\0\x1c\0'
|
||||
DOC_NAME = 'doc.html'
|
||||
METADATA_EXT = '.metadata'
|
||||
RESOURCE_URL_SCHEME = 'calres'
|
||||
|
||||
|
||||
def hash_data(data: bytes) -> str:
|
||||
@ -467,26 +466,22 @@ class Notes:
|
||||
report_progress('', i)
|
||||
return errors
|
||||
|
||||
def export_note(self, conn, field_name, item_id, dest_dir):
|
||||
def export_note(self, conn, field_name, item_id):
|
||||
nd = self.get_note_data(conn, field_name, item_id)
|
||||
if nd is None:
|
||||
return ''
|
||||
from .exim import export_note
|
||||
resources = {}
|
||||
for rh in nd['resource_hashes']:
|
||||
p = make_long_path_useable(self.path_for_resource(rh))
|
||||
if os.path.exists(p):
|
||||
for (name,) in conn.execute('SELECT name FROM notes_db.resources WHERE hash=?', (rh,)):
|
||||
resources[rh] = (p, name)
|
||||
return export_note(nd, resources, dest_dir)
|
||||
|
||||
def import_note(self, conn, field_name, item_id, item_value, html_file_path):
|
||||
def get_resource(rhash):
|
||||
return self.get_resource_data(conn, rhash)
|
||||
return export_note(nd['doc'], get_resource)
|
||||
|
||||
def import_note(self, conn, field_name, item_id, item_value, html, basedir, ctime=None, mtime=None):
|
||||
from .exim import import_note
|
||||
def add_resource(path, name):
|
||||
return self.add_resource(conn, path, name)
|
||||
st = os.stat(html_file_path)
|
||||
doc, searchable_text, resources = import_note(html_file_path, add_resource)
|
||||
def add_resource(path_or_stream_or_data, name):
|
||||
return self.add_resource(conn, path_or_stream_or_data, name)
|
||||
doc, searchable_text, resources = import_note(html, basedir, add_resource)
|
||||
return self.set_note(
|
||||
conn, field_name, item_id, item_value, marked_up_text=doc, used_resource_hashes=resources, searchable_text=searchable_text,
|
||||
ctime=st.st_ctime, mtime=st.st_mtime
|
||||
ctime=ctime, mtime=mtime
|
||||
)
|
||||
|
@ -1,19 +1,20 @@
|
||||
#!/usr/bin/env python
|
||||
# License: GPLv3 Copyright: 2023, Kovid Goyal <kovid at kovidgoyal.net>
|
||||
|
||||
import base64
|
||||
import os
|
||||
import shutil
|
||||
from html5_parser import parse
|
||||
from lxml import html
|
||||
from urllib.parse import unquote, urlparse
|
||||
|
||||
from calibre import guess_extension, guess_type
|
||||
from calibre.db.constants import RESOURCE_URL_SCHEME
|
||||
from calibre.ebooks.chardet import xml_to_unicode
|
||||
from calibre.ebooks.oeb.transforms.rasterize import data_url
|
||||
from calibre.utils.cleantext import clean_xml_chars
|
||||
from calibre.utils.filenames import get_long_path_name, make_long_path_useable
|
||||
from calibre.utils.html2text import html2text
|
||||
|
||||
from .connect import DOC_NAME, RESOURCE_URL_SCHEME
|
||||
|
||||
|
||||
def parse_html(raw):
|
||||
try:
|
||||
@ -22,42 +23,51 @@ def parse_html(raw):
|
||||
return parse(clean_xml_chars(raw), maybe_xhtml=False, sanitize_names=True)
|
||||
|
||||
|
||||
def export_note(note_data: dict, resources: dict[str, tuple[str, str]], dest_dir: str) -> str:
|
||||
for rhash, (path, name) in resources.items():
|
||||
d = os.path.join(dest_dir, name)
|
||||
shutil.copy2(path, d)
|
||||
root = parse_html(note_data['doc'])
|
||||
def export_note(note_doc: str, get_resource) -> str:
|
||||
root = parse_html(note_doc)
|
||||
for img in root.xpath('//img[@src]'):
|
||||
img.attrib.pop('pre-import-src', None)
|
||||
img.attrib.pop('data-pre-import-src', None)
|
||||
try:
|
||||
purl = urlparse(img.get('src'))
|
||||
except Exception:
|
||||
continue
|
||||
if purl.scheme == RESOURCE_URL_SCHEME:
|
||||
rhash = f'{purl.hostname}:{purl.path[1:]}'
|
||||
x = resources.get(rhash)
|
||||
if x is not None:
|
||||
img.set('src', x[1])
|
||||
x = get_resource(rhash)
|
||||
if x:
|
||||
img.set('src', data_url(guess_type(x['name'])[0], x['data']))
|
||||
img.set('data-filename', x['name'])
|
||||
|
||||
shtml = html.tostring(root, encoding='utf-8')
|
||||
with open(os.path.join(dest_dir, DOC_NAME), 'wb') as f:
|
||||
f.write(shtml)
|
||||
os.utime(f.name, times=(note_data['ctime'], note_data['mtime']))
|
||||
return DOC_NAME
|
||||
return html.tostring(root, encoding='unicode')
|
||||
|
||||
|
||||
def import_note(path_to_html_file: str, add_resource) -> dict:
|
||||
path_to_html_file = path_to_html_file
|
||||
with open(make_long_path_useable(path_to_html_file), 'rb') as f:
|
||||
raw = f.read()
|
||||
shtml = xml_to_unicode(raw, strip_encoding_pats=True, assume_utf8=True)[0]
|
||||
basedir = os.path.dirname(os.path.abspath(path_to_html_file))
|
||||
basedir = os.path.normcase(get_long_path_name(basedir) + os.sep)
|
||||
def import_note(shtml: str | bytes, basedir: str, add_resource) -> dict:
|
||||
shtml = xml_to_unicode(shtml, strip_encoding_pats=True, assume_utf8=True)[0]
|
||||
basedir = os.path.normcase(get_long_path_name(os.path.abspath(basedir)) + os.sep)
|
||||
root = parse_html(shtml)
|
||||
resources = set()
|
||||
|
||||
def ar(img, path_or_data, name):
|
||||
rhash = add_resource(path_or_data, name)
|
||||
scheme, digest = rhash.split(':', 1)
|
||||
img.set('src', f'{RESOURCE_URL_SCHEME}://{scheme}/{digest}')
|
||||
resources.add(rhash)
|
||||
|
||||
for img in root.xpath('//img[@src]'):
|
||||
src = img.attrib.pop('src')
|
||||
img.set('pre-import-src', src)
|
||||
img.set('data-pre-import-src', src)
|
||||
if src.startswith('data:'):
|
||||
d = src.split(':', 1)[-1]
|
||||
menc, payload = d.partition(',')
|
||||
mt, enc = menc.partition(';')
|
||||
if enc != 'base64':
|
||||
continue
|
||||
try:
|
||||
d = base64.standard_b64decode(payload)
|
||||
except Exception:
|
||||
continue
|
||||
ar(img, d, img.get('data-filename') or ('image' + guess_extension(mt, strict=False)))
|
||||
continue
|
||||
try:
|
||||
purl = urlparse(src)
|
||||
except Exception:
|
||||
@ -65,12 +75,13 @@ def import_note(path_to_html_file: str, add_resource) -> dict:
|
||||
if purl.scheme in ('', 'file'):
|
||||
path = unquote(purl.path)
|
||||
if not os.path.isabs(path):
|
||||
if not basedir:
|
||||
continue
|
||||
path = os.path.join(basedir, path)
|
||||
q = os.path.normcase(get_long_path_name(os.path.abspath(path)))
|
||||
if q.startswith(basedir) and os.path.exists(make_long_path_useable(path)):
|
||||
rhash = add_resource(make_long_path_useable(path), os.path.basename(path))
|
||||
scheme, digest = rhash.split(':', 1)
|
||||
img.set('src', f'{RESOURCE_URL_SCHEME}://{scheme}/{digest}')
|
||||
resources.add(rhash)
|
||||
ar(img, make_long_path_useable(path), os.path.basename(path))
|
||||
shtml = html.tostring(root, encoding='unicode')
|
||||
for img in root.xpath('//img[@src]'):
|
||||
del img.attrib['src']
|
||||
return shtml, html2text(shtml, default_image_alt=' '), resources
|
||||
|
@ -135,14 +135,11 @@ def test_cache_api(self: 'NotesTest'):
|
||||
note_id = cache.import_note('authors', author_id, f.name)
|
||||
self.assertGreater(note_id, 0)
|
||||
self.assertIn('<p>test simple exim <img', cache.notes_for('authors', author_id))
|
||||
edir = os.path.join(tdir, 'e')
|
||||
os.mkdir(edir)
|
||||
index_name = cache.export_note('authors', author_id, edir)
|
||||
with open(os.path.join(edir, index_name)) as f:
|
||||
self.assertIn(doc.replace('r 1.png', 'r%201.png'), f.read())
|
||||
for x in ('r 1.png', 'r 2.png'):
|
||||
with open(os.path.join(idir, x), 'rb') as a, open(os.path.join(edir, x), 'rb') as b:
|
||||
self.assertEqual(a.read(), b.read())
|
||||
exported = cache.export_note('authors', author_id)
|
||||
self.assertIn('<p>test simple exim <img src="', exported)
|
||||
from html5_parser import parse
|
||||
root = parse(exported)
|
||||
self.ae(root.xpath('//img/@data-filename'), ['r 1.png', 'r 2.png'])
|
||||
|
||||
|
||||
def test_fts(self: 'NotesTest'):
|
||||
|
Loading…
x
Reference in New Issue
Block a user