mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Cleanup ISOData command
Cache downloaded bytes in memory and dont store a BytesIO instance, instead create one fresh on zip file access.
This commit is contained in:
parent
4804084665
commit
c721338b43
@ -8,51 +8,49 @@ import shutil
|
||||
import time
|
||||
import zipfile
|
||||
from contextlib import suppress
|
||||
from functools import lru_cache
|
||||
from io import BytesIO
|
||||
|
||||
from setup import Command, download_securely
|
||||
|
||||
|
||||
@lru_cache(2)
|
||||
def iso_codes_data():
|
||||
URL = 'https://salsa.debian.org/iso-codes-team/iso-codes/-/archive/main/iso-codes-main.zip'
|
||||
return download_securely(URL)
|
||||
|
||||
|
||||
class ISOData(Command):
|
||||
description = 'Get ISO codes name localization data'
|
||||
URL = 'https://salsa.debian.org/iso-codes-team/iso-codes/-/archive/main/iso-codes-main.zip'
|
||||
top_level_filename = 'iso-codes-main'
|
||||
_zip_data = None
|
||||
|
||||
def add_options(self, parser):
|
||||
with suppress(optparse.OptionConflictError): # ignore if option already added
|
||||
parser.add_option('--path-to-isocodes', help='Path to previously downloaded iso-codes-main.zip')
|
||||
|
||||
def run(self, opts):
|
||||
if self._zip_data is None:
|
||||
if opts.path_to_isocodes:
|
||||
with open(opts.path_to_isocodes, 'rb') as f:
|
||||
self._zip_data = BytesIO(f.read())
|
||||
# get top level directory
|
||||
top = {item.split('/')[0] for item in zipfile.ZipFile(self.zip_data).namelist()}
|
||||
assert len(top) == 1
|
||||
self.top_level = top.pop()
|
||||
else:
|
||||
self._zip_data = BytesIO(download_securely(self.URL))
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self._zip_data = None
|
||||
self.top_level = 'iso-codes-main'
|
||||
if self._zip_data is None and opts.path_to_isocodes:
|
||||
with open(opts.path_to_isocodes, 'rb') as f:
|
||||
self._zip_data = f.read()
|
||||
# get top level directory
|
||||
top = {item.split('/')[0] for item in zipfile.ZipFile(self.zip_data).namelist()}
|
||||
assert len(top) == 1
|
||||
self.top_level_filename = top.pop()
|
||||
|
||||
@property
|
||||
def zip_data(self):
|
||||
return self._zip_data
|
||||
return self._zip_data or iso_codes_data()
|
||||
|
||||
def db_data(self, name: str) -> bytes:
|
||||
with zipfile.ZipFile(self.zip_data) as zf:
|
||||
with zf.open(f'{self.top_level}/data/{name}') as f:
|
||||
with zipfile.ZipFile(BytesIO(self.zip_data)) as zf:
|
||||
with zf.open(f'{self.top_level_filename}/data/{name}') as f:
|
||||
return f.read()
|
||||
|
||||
def extract_po_files(self, name: str, output_dir: str) -> None:
|
||||
name = name.split('.', 1)[0]
|
||||
pat = f'{self.top_level}/{name}/*.po'
|
||||
if self.zip_data is None:
|
||||
self._zip_data = BytesIO(download_securely(self.URL))
|
||||
with zipfile.ZipFile(self.zip_data) as zf:
|
||||
pat = f'{self.top_level_filename}/{name}/*.po'
|
||||
with zipfile.ZipFile(BytesIO(self.zip_data)) as zf:
|
||||
for name in fnmatch.filter(zf.namelist(), pat):
|
||||
dest = os.path.join(output_dir, name.split('/')[-1])
|
||||
zi = zf.getinfo(name)
|
||||
|
Loading…
x
Reference in New Issue
Block a user