TCR Output: Fix TCR compression adding junk to the end of the text. Remove compression level option.

This commit is contained in:
Kovid Goyal 2010-12-31 09:31:46 -07:00
commit 54a2ab69e5
2 changed files with 112 additions and 105 deletions

View File

@ -6,11 +6,118 @@ __docformat__ = 'restructuredtext en'
import re import re
class TCRCompressor(object):
'''
TCR compression takes the form header+code_dict+coded_text.
The header is always "!!8-Bit!!". The code dict is a list of 256 strings.
The list takes the form 1 byte length and then a string. Each position in
The list corresponds to a code found in the file. The coded text is
string of characters values. for instance the character Q represents the
value 81 which corresponds to the string in the code list at position 81.
'''
def _reset(self):
# List of indexes in the codes list that are empty and can hold new codes
self.unused_codes = set()
self.coded_txt = ''
# Generate initial codes from text.
# The index of the list will be the code that represents the characters at that location
# in the list
self.codes = []
def _combine_codes(self):
'''
Combine two codes that always appear in pair into a single code.
The intent is to create more unused codes.
'''
possible_codes = []
a_code = set(re.findall('(?msu).', self.coded_txt))
for code in a_code:
single_code = set(re.findall('(?msu)%s.' % re.escape(code), self.coded_txt))
if len(single_code) == 1:
possible_codes.append(single_code.pop())
for code in possible_codes:
self.coded_txt = self.coded_txt.replace(code, code[0])
self.codes[ord(code[0])] = '%s%s' % (self.codes[ord(code[0])], self.codes[ord(code[1])])
def _free_unused_codes(self):
'''
Look for codes that do no not appear in the coded text and add them to
the list of free codes.
'''
for i in xrange(256):
if i not in self.unused_codes:
if chr(i) not in self.coded_txt:
self.unused_codes.add(i)
def _new_codes(self):
'''
Create new codes from codes that occur in pairs often.
'''
possible_new_codes = list(set(re.findall('(?msu)..', self.coded_txt)))
new_codes_count = []
for c in possible_new_codes:
count = self.coded_txt.count(c)
# Less than 3 occurrences will not produce any size reduction.
if count > 2:
new_codes_count.append((c, count))
# Arrange the codes in order of least to most occurring.
possible_new_codes = [x[0] for x in sorted(new_codes_count, key=lambda c: c[1])]
return possible_new_codes
def compress(self, txt):
self._reset()
self.codes = list(set(re.findall('(?msu).', txt)))
# Replace the text with their corresponding code
for c in txt:
self.coded_txt += chr(self.codes.index(c))
# Zero the unused codes and record which are unused.
for i in range(len(self.codes), 256):
self.codes.append('')
self.unused_codes.add(i)
self._combine_codes()
possible_codes = self._new_codes()
while possible_codes and self.unused_codes:
while possible_codes and self.unused_codes:
unused_code = self.unused_codes.pop()
# Take the last possible codes and split it into individual
# codes. The last possible code is the most often occurring.
code1, code2 = possible_codes.pop()
self.codes[unused_code] = '%s%s' % (self.codes[ord(code1)], self.codes[ord(code2)])
self.coded_txt = self.coded_txt.replace('%s%s' % (code1, code2), chr(unused_code))
self._combine_codes()
self._free_unused_codes()
possible_codes = self._new_codes()
self._free_unused_codes()
# Generate the code dictionary.
code_dict = []
for i in xrange(0, 256):
if i in self.unused_codes:
code_dict.append(chr(0))
else:
code_dict.append(chr(len(self.codes[i])) + self.codes[i])
# Join the identifier with the dictionary and coded text.
return '!!8-Bit!!'+''.join(code_dict)+self.coded_txt
def decompress(stream): def decompress(stream):
txt = [] txt = []
stream.seek(0) stream.seek(0)
if stream.read(9) != '!!8-Bit!!': if stream.read(9) != '!!8-Bit!!':
raise ValueError('File %s contaions an invalid TCR header.' % stream.name) raise ValueError('File %s contains an invalid TCR header.' % stream.name)
# Codes that the file contents are broken down into. # Codes that the file contents are broken down into.
entries = [] entries = []
@ -26,101 +133,6 @@ def decompress(stream):
return ''.join(txt) return ''.join(txt)
def compress(txt):
def compress(txt, level=5): t = TCRCompressor()
''' return t.compress(txt)
TCR compression takes the form header+code_list+coded_text.
The header is always "!!8-Bit!!". The code list is a list of 256 strings.
The list takes the form 1 byte length and then a string. Each position in
The list corresponds to a code found in the file. The coded text is
string of characters vaules. for instance the character Q represents the
value 81 which corresponds to the string in the code list at position 81.
'''
# Turn each unique character into a coded value.
# The code of the string at a given position are represented by the position
# they occupy in the list.
codes = list(set(re.findall('(?msu).', txt)))
for i in range(len(codes), 256):
codes.append('')
# Set the compression level.
if level <= 1:
new_length = 256
if level >= 10:
new_length = 1
else:
new_length = int(256 * (10 - level) * .1)
new_length = 1 if new_length < 1 else new_length
# Replace txt with codes.
coded_txt = ''
for c in txt:
coded_txt += chr(codes.index(c))
txt = coded_txt
# Start compressing the text.
new = True
merged = True
while new or merged:
# Merge codes that always follow another code
merge = []
merged = False
for i in xrange(256):
if codes[i] != '':
# Find all codes that are next to i.
fall = list(set(re.findall('(?msu)%s.' % re.escape(chr(i)), txt)))
# 1 if only one code comes after i.
if len(fall) == 1:
# We are searching codes and each code is always 1 character.
j = ord(fall[0][1:2])
# Only merge if the total length of the string represented by
# code is less than 256.
if len(codes[i]) + len(codes[j]) < 256:
merge.append((i, j))
if merge:
merged = True
for i, j in merge:
# Merge the string for j into the string for i.
if i == j:
# Don't use += here just in case something goes wrong. This
# will prevent out of control memory consumption. This is
# unecessary but when creating this routine it happened due
# to an error.
codes[i] = codes[i] + codes[i]
else:
codes[i] = codes[i] + codes[j]
txt = txt.replace(chr(i)+chr(j), chr(i))
if chr(j) not in txt:
codes[j] = ''
new = False
if '' in codes:
# Create a list of codes based on combinations of codes that are next
# to each other. The amount of savings for the new code is calculated.
new_codes = []
for c in list(set(re.findall('(?msu)..', txt))):
i = ord(c[0:1])
j = ord(c[1:2])
if codes[i]+codes[j] in codes:
continue
savings = txt.count(chr(i)+chr(j)) - len(codes[i]) - len(codes[j])
if savings > 2 and len(codes[i]) + len(codes[j]) < 256:
new_codes.append((savings, i, j, codes[i], codes[j]))
if new_codes:
new = True
# Sort the codes from highest savings to lowest.
new_codes.sort(lambda x, y: -1 if x[0] > y[0] else 1 if x[0] < y[0] else 0)
# The shorter new_length the more chances time merging will happen
# giving more changes for better codes to be created. However,
# the shorter new_lengh the longer it will take to compress.
new_codes = new_codes[:new_length]
for code in new_codes:
if '' not in codes:
break
c = codes.index('')
codes[c] = code[3]+code[4]
txt = txt.replace(chr(code[1])+chr(code[2]), chr(c))
# Generate the code dictionary.
header = []
for code in codes:
header.append(chr(len(code))+code)
for i in xrange(len(header), 256):
header.append(chr(0))
# Join the identifier with the dictionary and coded text.
return '!!8-Bit!!'+''.join(header)+txt

View File

@ -22,11 +22,6 @@ class TCROutput(OutputFormatPlugin):
level=OptionRecommendation.LOW, level=OptionRecommendation.LOW,
help=_('Specify the character encoding of the output document. ' \ help=_('Specify the character encoding of the output document. ' \
'The default is utf-8.')), 'The default is utf-8.')),
OptionRecommendation(name='compression_level', recommended_value=5,
level=OptionRecommendation.LOW,
help=_('Specify the compression level to use. Scale 1 - 10. 1 ' \
'being the lowest compression but the fastest and 10 being the ' \
'highest compression but the slowest.')),
]) ])
def convert(self, oeb_book, output_path, input_plugin, opts, log): def convert(self, oeb_book, output_path, input_plugin, opts, log):
@ -48,7 +43,7 @@ class TCROutput(OutputFormatPlugin):
txt = writer.extract_content(oeb_book, opts).encode(opts.output_encoding, 'replace') txt = writer.extract_content(oeb_book, opts).encode(opts.output_encoding, 'replace')
log.info('Compressing text...') log.info('Compressing text...')
txt = compress(txt, opts.compression_level) txt = compress(txt)
out_stream.seek(0) out_stream.seek(0)
out_stream.truncate() out_stream.truncate()