Make plumber archive handling code re-useable

This commit is contained in:
Kovid Goyal 2025-03-06 08:47:58 +05:30
parent 0483d5f22f
commit 1fe527e351
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 50 additions and 43 deletions

View File

@ -0,0 +1,47 @@
#!/usr/bin/env python
# License: GPLv3 Copyright: 2025, Kovid Goyal <kovid at kovidgoyal.net>
import os
import re
from calibre import extract, filesystem_encoding, walk
ARCHIVE_FMTS = ('zip', 'rar', 'oebzip')
def unarchive(path, tdir):
extract(path, tdir)
files = list(walk(tdir))
files = [f if isinstance(f, str) else f.decode(filesystem_encoding)
for f in files]
from calibre.customize.ui import available_input_formats
fmts = set(available_input_formats())
fmts -= {'htm', 'html', 'xhtm', 'xhtml'}
fmts -= set(ARCHIVE_FMTS)
for ext in fmts:
for f in files:
if f.lower().endswith('.'+ext):
if ext in ['txt', 'rtf'] and os.stat(f).st_size < 2048:
continue
return f, ext
return find_html_index(files)
def find_html_index(files):
'''
Given a list of files, find the most likely root HTML file in the
list.
'''
html_pat = re.compile(r'\.(x){0,1}htm(l){0,1}$', re.IGNORECASE)
html_files = [f for f in files if html_pat.search(f) is not None]
if not html_files:
raise ValueError(_('Could not find an e-book inside the archive'))
html_files = [(f, os.stat(f).st_size) for f in html_files]
html_files.sort(key=lambda x: x[1])
html_files = [f[0] for f in html_files]
for q in ('toc', 'index'):
for f in html_files:
if os.path.splitext(os.path.basename(f))[0].lower() == q:
return f, os.path.splitext(f)[1].lower()[1:]
return html_files[-1], os.path.splitext(html_files[-1])[1].lower()[1:]

View File

@ -5,12 +5,11 @@ __docformat__ = 'restructuredtext en'
import json
import os
import pprint
import re
import shutil
import sys
from functools import partial
from calibre import extract, filesystem_encoding, get_types_map, isbytestring, walk
from calibre import filesystem_encoding, get_types_map, isbytestring
from calibre.constants import __version__
from calibre.customize.conversion import DummyReporter, OptionRecommendation
from calibre.customize.ui import (
@ -23,6 +22,7 @@ from calibre.customize.ui import (
run_plugins_on_postprocess,
run_plugins_on_preprocess,
)
from calibre.ebooks.conversion.archives import ARCHIVE_FMTS, unarchive
from calibre.ebooks.conversion.preprocess import HTMLPreProcessor
from calibre.ptempfile import PersistentTemporaryDirectory
from calibre.utils.date import parse_date
@ -74,9 +74,6 @@ class CompositeProgressReporter:
self.global_reporter(global_frac, msg)
ARCHIVE_FMTS = ('zip', 'rar', 'oebzip')
class Plumber:
'''
The `Plumber` manages the conversion pipeline. An UI should call the methods
@ -746,7 +743,7 @@ OptionRecommendation(name='search_replace',
if input_fmt in ARCHIVE_FMTS:
self.log('Processing archive...')
tdir = PersistentTemporaryDirectory('_pl_arc')
self.input, input_fmt = self.unarchive(self.input, tdir)
self.input, input_fmt = unarchive(self.input, tdir)
self.archive_input_tdir = tdir
if os.access(self.input, os.R_OK):
nfp = run_plugins_on_preprocess(self.input, input_fmt)
@ -811,43 +808,6 @@ OptionRecommendation(name='search_replace',
self.merge_plugin_recommendations()
@classmethod
def unarchive(self, path, tdir):
extract(path, tdir)
files = list(walk(tdir))
files = [f if isinstance(f, str) else f.decode(filesystem_encoding)
for f in files]
from calibre.customize.ui import available_input_formats
fmts = set(available_input_formats())
fmts -= {'htm', 'html', 'xhtm', 'xhtml'}
fmts -= set(ARCHIVE_FMTS)
for ext in fmts:
for f in files:
if f.lower().endswith('.'+ext):
if ext in ['txt', 'rtf'] and os.stat(f).st_size < 2048:
continue
return f, ext
return self.find_html_index(files)
@classmethod
def find_html_index(self, files):
'''
Given a list of files, find the most likely root HTML file in the
list.
'''
html_pat = re.compile(r'\.(x){0,1}htm(l){0,1}$', re.IGNORECASE)
html_files = [f for f in files if html_pat.search(f) is not None]
if not html_files:
raise ValueError(_('Could not find an e-book inside the archive'))
html_files = [(f, os.stat(f).st_size) for f in html_files]
html_files.sort(key=lambda x: x[1])
html_files = [f[0] for f in html_files]
for q in ('toc', 'index'):
for f in html_files:
if os.path.splitext(os.path.basename(f))[0].lower() == q:
return f, os.path.splitext(f)[1].lower()[1:]
return html_files[-1], os.path.splitext(html_files[-1])[1].lower()[1:]
def get_all_options(self):
ans = {}
for group in (self.input_options, self.pipeline_options,