From 62738f4d81352a29bd588d50419195824ad9f663 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Wed, 4 Jun 2025 12:00:36 +0530 Subject: [PATCH] Update 7zip wrapper code for removal of read method --- src/calibre/utils/seven_zip.py | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/calibre/utils/seven_zip.py b/src/calibre/utils/seven_zip.py index 3846626e9d..04d87ff933 100644 --- a/src/calibre/utils/seven_zip.py +++ b/src/calibre/utils/seven_zip.py @@ -1,6 +1,7 @@ #!/usr/bin/env python # License: GPL v3 Copyright: 2021, Kovid Goyal +import io import os import re @@ -17,6 +18,27 @@ def names(path_or_stream): return tuple(zf.getnames()) +class Writer: + + def __init__(self): + self.outputs = {} + + def create(self, filename): + b = self.outputs[filename] = io.BytesIO() + return b + + def asdatadict(self): + return {k: v.getvalue() for k, v in self.outputs.items()} + + +def read_file(archive, name): + w = Writer() + archive.extract(targets=[name], factory=w) + for v in w.outputs.values(): + return v.getvalue() + raise KeyError(f'No file named {name} in archive') + + def extract_member(path_or_stream, match=None, name=None): if iswindows and name is not None: name = name.replace(os.sep, '/') @@ -32,7 +54,7 @@ def extract_member(path_or_stream, match=None, name=None): with open_archive(path_or_stream) as ar: all_names = list(filter(is_match, ar.getnames())) if all_names: - return all_names[0], ar.read(all_names[:1])[all_names[0]].read() + return all_names[0], read_file(ar, all_names[0]) def extract_cover_image(stream): @@ -79,15 +101,21 @@ def test_basic(): with open_archive(os.path.join('a.7z')) as zf: if set(zf.getnames()) != set(tdata): raise ValueError('names not equal') - read_data = {name:af.read() for name, af in zf.readall().items()} + w = Writer() + zf.extractall(factory=w) + read_data = w.asdatadict() if read_data != tdata: raise ValueError('data not equal') + os.mkdir('ex') + extract('a.7z', 'ex') for name in tdata: if name not in '1 2 symlink'.split(): - with open(os.path.join(tdir, name), 'rb') as s: + with open(os.path.join(tdir, 'ex', name), 'rb') as s: if s.read() != tdata[name]: raise ValueError(f'Did not extract {name} properly') + if extract_member('a.7z', name='one.txt')[1] != tdata['one.txt']: + raise ValueError('extract_member failed') with TemporaryDirectory('test-7z') as tdir, CurrentDir(tdir): do_test()