Implement creation of .xz files

This commit is contained in:
Kovid Goyal 2015-08-08 09:23:35 +05:30
parent d37b124fb5
commit e6fa5795fe
2 changed files with 141 additions and 12 deletions

View File

@ -315,6 +315,27 @@ static SRes report_progress(void *p, UInt64 in_size, UInt64 out_size) {
return SZ_OK;
}
static PyObject*
get_lzma2_properties(int preset) {
CLzma2EncHandle lzma2 = NULL;
CLzma2EncProps props;
Byte props_out = 0;
SRes res = SZ_OK;
lzma2 = Lzma2Enc_Create(&allocator, &allocator);
if (lzma2 == NULL) { PyErr_NoMemory(); goto exit; }
// Initialize parameters based on the preset
init_props(&props, preset);
res = Lzma2Enc_SetProps(lzma2, &props);
if (res != SZ_OK) { SET_ERROR(res); goto exit; }
props_out = Lzma2Enc_WriteProperties(lzma2);
exit:
if (lzma2) Lzma2Enc_Destroy(lzma2);
if (PyErr_Occurred()) return NULL;
return Py_BuildValue("s#", &props_out, 1);
}
static PyObject*
compress(PyObject *self, PyObject *args) {
PyObject *read = NULL, *write = NULL, *progress_callback = NULL;
@ -393,15 +414,21 @@ static PyMethodDef lzma_binding_methods[] = {
PyMODINIT_FUNC
initlzma_binding(void) {
PyObject *m = NULL;
PyObject *m = NULL, *preset_map = NULL, *temp = NULL;
int i = 0;
init_crc_table();
LZMAError = PyErr_NewException("lzma_binding.error", NULL, NULL);
if (!LZMAError) return;
m = Py_InitModule3("lzma_binding", lzma_binding_methods,
"Bindings to the LZMA (de)compression C code"
);
m = Py_InitModule3("lzma_binding", lzma_binding_methods, "Bindings to the LZMA (de)compression C code");
if (m == NULL) return;
preset_map = PyTuple_New(10);
if (preset_map == NULL) return;
for (i = 0; i < 10; i++) {
temp = get_lzma2_properties(i);
if (temp == NULL) return;
PyTuple_SET_ITEM(preset_map, i, temp);
}
PyModule_AddObject(m, "preset_map", preset_map);
Py_INCREF(LZMAError);
PyModule_AddObject(m, "error", LZMAError);
if (m == NULL) return;
}

View File

@ -6,19 +6,31 @@ from __future__ import (unicode_literals, division, absolute_import,
__license__ = 'GPL v3'
__copyright__ = '2015, Kovid Goyal <kovid at kovidgoyal.net>'
# See http://tukaani.org/xz/xz-file-format.txt for file format details
from collections import namedtuple
from io import BytesIO
from hashlib import sha256
from struct import unpack, error as struct_error
from struct import unpack, error as struct_error, pack
from binascii import crc32 as _crc32
from calibre.ptempfile import SpooledTemporaryFile
from lzma.errors import NotXZ, InvalidXZ, lzma
HEADER_MAGIC = b'\xfd7zXZ\0'
FOOTER_MAGIC = b'YZ'
DELTA_FILTER_ID = 0x03
LZMA2_FILTER_ID = 0x21
def align(raw):
extra = len(raw) % 4
if extra:
raw += b'\0' * (4 - extra)
return raw
def as_bytes(*args):
return bytes(bytearray(args))
def crc32(raw, start=0):
return 0xFFFFFFFF & _crc32(raw, start)
@ -89,12 +101,16 @@ class CRCChecker(object):
if self.func is not crc32:
self.code = 0xFFFFFFFFFFFFFFFFL & self.code
@property
def code_as_bytes(self):
return pack(self.fmt, self.code)
def check(self, raw):
return self.code == unpack(self.fmt, raw)[0]
class Sha256Checker(object):
def __init__(self):
def __init__(self, *args):
self.h = sha256()
self.func = self.h.update
self.code = None
@ -104,7 +120,7 @@ class Sha256Checker(object):
self.func(raw)
def finish(self):
self.code = self.h.digest()
self.code = self.code_as_bytes = self.h.digest()
self.h = self.func = None
def check(self, raw):
@ -113,6 +129,10 @@ class Sha256Checker(object):
class DummyChecker(object):
size = 0
code_as_bytes = None
def __init__(self, *args):
pass
def __call__(self, raw):
pass
@ -287,7 +307,7 @@ def read_stream_footer(f, check_type, index_size):
backward_size = 4 * (1 + backward_size)
if backward_size != index_size:
raise InvalidXZ('Footer backward size != actual index size')
if f.read(2) != b'YZ':
if f.read(2) != FOOTER_MAGIC:
raise InvalidXZ('Stream footer has incorrect magic bytes')
if crc != crc32(raw):
raise InvalidXZ('Stream footer CRC mismatch')
@ -310,10 +330,17 @@ def read_stream(f, outfile):
read_stream_footer(f, check_type, index_size)
def decompress(raw, outfile=None):
'''
Decompress the specified data.
:param raw: A bytestring or a file-like object open for reading
:outfile: A file like object open for writing.
The decompressed data is written into it. If not specified then a SpooledTemporaryFile
is created and returned by this function.
'''
if isinstance(raw, bytes):
raw = BytesIO(raw)
outfile = outfile or SpooledTemporaryFile(50 * 1024 * 1024, '_xz_decompress')
outfile.seek(0)
while True:
read_stream(raw, outfile)
pos = raw.tell()
@ -332,16 +359,91 @@ def decompress(raw, outfile=None):
raise InvalidXZ('Found trailing garbage between streams')
return outfile
def compress(raw, outfile=None, level=5, check_type='crc64'):
'''
Compress the specified data into a .xz stream (which can be written directly as
an .xz file.
:param raw: A bytestring or a file-like object open for reading
:outfile: A file like object open for writing.
The .xz stream is written into it. If not specified then a SpooledTemporaryFile
is created and returned by this function.
:level: An integer between 0 and 9 with 0 being fastest/worst compression and 9 being
slowest/best compression
:check_type: The type of data integrity check to write into the output .xz stream.
Should be one of: 'crc32', 'crc64', 'sha256', or None
'''
if isinstance(raw, bytes):
raw = BytesIO(raw)
outfile = outfile or SpooledTemporaryFile(50 * 1024 * 1024, '_xz_decompress')
# Write stream header
outfile.write(HEADER_MAGIC)
check_type = {'crc':1, 'crc32':1, 'sha256':0xa, None:0, '':0, 'none':0, 'None':0}.get(check_type, 4)
stream_flags = as_bytes(0, check_type)
outfile.write(stream_flags)
outfile.write(pack(b'<I', crc32(stream_flags)))
# Write block header
filter_flags = encode_var_int(LZMA2_FILTER_ID) + encode_var_int(1) + lzma.preset_map[level]
block_header = align(b'\0\0' + filter_flags)
bhs = ((4 + len(block_header)) // 4) - 1
block_header = as_bytes(bhs) + block_header[1:]
block_header += pack(b'<I', crc32(block_header))
start = outfile.tell()
outfile.write(block_header)
# Write compressed data and check
checker = {0:DummyChecker, 1:CRCChecker, 4:CRCChecker, 0xa:Sha256Checker}[check_type](check_type)
uncompressed_size = [0]
def read(n):
ans = raw.read(n)
if ans:
uncompressed_size[0] += len(ans)
checker(ans)
return ans
unpadded_size = outfile.tell() - start
pos = outfile.tell()
if pos % 4:
outfile.write(b'\0' * (4 - (pos % 4)))
checker.finish()
if check_type:
cc = checker.code_as_bytes
outfile.write(cc)
unpadded_size += len(cc)
# Write index
index = b'\0' + encode_var_int(1)
index += encode_var_int(unpadded_size) + encode_var_int(uncompressed_size[0])
if len(index) % 4:
index += b'\0' * (4 - len(index) % 4)
outfile.write(index), outfile.write(pack(b'<I', crc32(index)))
# Write stream footer
backwards_size = pack(b'<I', ((len(index) + 4) // 4) - 1)
outfile.write(pack(b'<I', crc32(backwards_size + stream_flags)))
outfile.write(backwards_size), outfile.write(stream_flags), outfile.write(FOOTER_MAGIC)
def test_lzma2():
raw = P('template-functions.json', allow_user_override=False, data=True)
ibuf, obuf = BytesIO(raw), BytesIO()
props = lzma.compress(ibuf.read, obuf.write)
props = lzma.compress(ibuf.read, obuf.write, False)
cc = obuf.getvalue()
ibuf, obuf = BytesIO(cc), BytesIO()
LZMA2Filter(props, 0, 1)(ibuf, obuf)
if obuf.getvalue() != raw:
raise ValueError('Roundtripping via LZMA2 failed')
def test_xz():
raw = P('template-functions.json', allow_user_override=False, data=True)
ibuf, obuf = BytesIO(raw), BytesIO()
compress(ibuf, obuf, check_type='sha256')
cc = obuf.getvalue()
ibuf, obuf = BytesIO(cc), BytesIO()
decompress(ibuf, obuf)
if obuf.getvalue() != raw:
raise ValueError('Roundtripping via XZ failed')
if __name__ == '__main__':
import sys
decompress(open(sys.argv[-1], 'rb'))