Update 7zip wrapper code for removal of read method

This commit is contained in:
Kovid Goyal 2025-06-04 12:00:36 +05:30
parent ddcc2c4bbd
commit 62738f4d81
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python
# License: GPL v3 Copyright: 2021, Kovid Goyal <kovid at kovidgoyal.net>
import io
import os
import re
@ -17,6 +18,27 @@ def names(path_or_stream):
return tuple(zf.getnames())
class Writer:
def __init__(self):
self.outputs = {}
def create(self, filename):
b = self.outputs[filename] = io.BytesIO()
return b
def asdatadict(self):
return {k: v.getvalue() for k, v in self.outputs.items()}
def read_file(archive, name):
w = Writer()
archive.extract(targets=[name], factory=w)
for v in w.outputs.values():
return v.getvalue()
raise KeyError(f'No file named {name} in archive')
def extract_member(path_or_stream, match=None, name=None):
if iswindows and name is not None:
name = name.replace(os.sep, '/')
@ -32,7 +54,7 @@ def extract_member(path_or_stream, match=None, name=None):
with open_archive(path_or_stream) as ar:
all_names = list(filter(is_match, ar.getnames()))
if all_names:
return all_names[0], ar.read(all_names[:1])[all_names[0]].read()
return all_names[0], read_file(ar, all_names[0])
def extract_cover_image(stream):
@ -79,15 +101,21 @@ def test_basic():
with open_archive(os.path.join('a.7z')) as zf:
if set(zf.getnames()) != set(tdata):
raise ValueError('names not equal')
read_data = {name:af.read() for name, af in zf.readall().items()}
w = Writer()
zf.extractall(factory=w)
read_data = w.asdatadict()
if read_data != tdata:
raise ValueError('data not equal')
os.mkdir('ex')
extract('a.7z', 'ex')
for name in tdata:
if name not in '1 2 symlink'.split():
with open(os.path.join(tdir, name), 'rb') as s:
with open(os.path.join(tdir, 'ex', name), 'rb') as s:
if s.read() != tdata[name]:
raise ValueError(f'Did not extract {name} properly')
if extract_member('a.7z', name='one.txt')[1] != tdata['one.txt']:
raise ValueError('extract_member failed')
with TemporaryDirectory('test-7z') as tdir, CurrentDir(tdir):
do_test()