mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-25 15:52:35 -04:00 
			
		
		
		
	Adds option to allow a user to export directory to a zipfile
This commit is contained in:
		
							parent
							
								
									2704bcb979
								
							
						
					
					
						commit
						a6b7beaf6b
					
				| @ -2,6 +2,7 @@ import hashlib | |||||||
| import json | import json | ||||||
| import os | import os | ||||||
| import shutil | import shutil | ||||||
|  | import tempfile | ||||||
| import time | import time | ||||||
| 
 | 
 | ||||||
| import tqdm | import tqdm | ||||||
| @ -12,6 +13,7 @@ from django.core import serializers | |||||||
| from django.core.management.base import BaseCommand | from django.core.management.base import BaseCommand | ||||||
| from django.core.management.base import CommandError | from django.core.management.base import CommandError | ||||||
| from django.db import transaction | from django.db import transaction | ||||||
|  | from django.utils import timezone | ||||||
| from documents.models import Comment | from documents.models import Comment | ||||||
| from documents.models import Correspondent | from documents.models import Correspondent | ||||||
| from documents.models import Document | from documents.models import Document | ||||||
| @ -76,6 +78,7 @@ class Command(BaseCommand): | |||||||
|             "do not belong to the current export, such as files from " |             "do not belong to the current export, such as files from " | ||||||
|             "deleted documents.", |             "deleted documents.", | ||||||
|         ) |         ) | ||||||
|  | 
 | ||||||
|         parser.add_argument( |         parser.add_argument( | ||||||
|             "--no-progress-bar", |             "--no-progress-bar", | ||||||
|             default=False, |             default=False, | ||||||
| @ -83,6 +86,14 @@ class Command(BaseCommand): | |||||||
|             help="If set, the progress bar will not be shown", |             help="If set, the progress bar will not be shown", | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|  |         parser.add_argument( | ||||||
|  |             "-z", | ||||||
|  |             "--zip", | ||||||
|  |             default=False, | ||||||
|  |             action="store_true", | ||||||
|  |             help="Export the documents to a zip file in the given directory", | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|     def __init__(self, *args, **kwargs): |     def __init__(self, *args, **kwargs): | ||||||
|         BaseCommand.__init__(self, *args, **kwargs) |         BaseCommand.__init__(self, *args, **kwargs) | ||||||
|         self.target = None |         self.target = None | ||||||
| @ -98,6 +109,19 @@ class Command(BaseCommand): | |||||||
|         self.compare_checksums = options["compare_checksums"] |         self.compare_checksums = options["compare_checksums"] | ||||||
|         self.use_filename_format = options["use_filename_format"] |         self.use_filename_format = options["use_filename_format"] | ||||||
|         self.delete = options["delete"] |         self.delete = options["delete"] | ||||||
|  |         zip_export: bool = options["zip"] | ||||||
|  | 
 | ||||||
|  |         # If zipping, save the original target for later and | ||||||
|  |         # get a temporary directory for the target | ||||||
|  |         temp_dir = None | ||||||
|  |         original_target = None | ||||||
|  |         if zip_export: | ||||||
|  |             original_target = self.target | ||||||
|  |             temp_dir = tempfile.TemporaryDirectory( | ||||||
|  |                 dir=settings.SCRATCH_DIR, | ||||||
|  |                 prefix="paperless-export", | ||||||
|  |             ) | ||||||
|  |             self.target = temp_dir.name | ||||||
| 
 | 
 | ||||||
|         if not os.path.exists(self.target): |         if not os.path.exists(self.target): | ||||||
|             raise CommandError("That path doesn't exist") |             raise CommandError("That path doesn't exist") | ||||||
| @ -105,8 +129,26 @@ class Command(BaseCommand): | |||||||
|         if not os.access(self.target, os.W_OK): |         if not os.access(self.target, os.W_OK): | ||||||
|             raise CommandError("That path doesn't appear to be writable") |             raise CommandError("That path doesn't appear to be writable") | ||||||
| 
 | 
 | ||||||
|         with FileLock(settings.MEDIA_LOCK): |         try: | ||||||
|             self.dump(options["no_progress_bar"]) |             with FileLock(settings.MEDIA_LOCK): | ||||||
|  |                 self.dump(options["no_progress_bar"]) | ||||||
|  | 
 | ||||||
|  |                 # We've written everything to the temporary directory in this case, | ||||||
|  |                 # now make an archive in the original target, with all files stored | ||||||
|  |                 if zip_export: | ||||||
|  |                     shutil.make_archive( | ||||||
|  |                         os.path.join( | ||||||
|  |                             original_target, | ||||||
|  |                             f"export-{timezone.localdate().isoformat()}", | ||||||
|  |                         ), | ||||||
|  |                         format="zip", | ||||||
|  |                         root_dir=temp_dir.name, | ||||||
|  |                     ) | ||||||
|  | 
 | ||||||
|  |         finally: | ||||||
|  |             # Always cleanup the temporary directory, if one was created | ||||||
|  |             if zip_export and temp_dir is not None: | ||||||
|  |                 temp_dir.cleanup() | ||||||
| 
 | 
 | ||||||
|     def dump(self, progress_bar_disable=False): |     def dump(self, progress_bar_disable=False): | ||||||
|         # 1. Take a snapshot of what files exist in the current export folder |         # 1. Take a snapshot of what files exist in the current export folder | ||||||
|  | |||||||
| @ -5,10 +5,12 @@ import shutil | |||||||
| import tempfile | import tempfile | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
| from unittest import mock | from unittest import mock | ||||||
|  | from zipfile import ZipFile | ||||||
| 
 | 
 | ||||||
| from django.core.management import call_command | from django.core.management import call_command | ||||||
| from django.test import override_settings | from django.test import override_settings | ||||||
| from django.test import TestCase | from django.test import TestCase | ||||||
|  | from django.utils import timezone | ||||||
| from documents.management.commands import document_exporter | from documents.management.commands import document_exporter | ||||||
| from documents.models import Comment | from documents.models import Comment | ||||||
| from documents.models import Correspondent | from documents.models import Correspondent | ||||||
| @ -365,3 +367,56 @@ class TestExportImport(DirectoriesMixin, TestCase): | |||||||
|             mime_type="application/pdf", |             mime_type="application/pdf", | ||||||
|         ) |         ) | ||||||
|         self.assertRaises(FileNotFoundError, call_command, "document_exporter", target) |         self.assertRaises(FileNotFoundError, call_command, "document_exporter", target) | ||||||
|  | 
 | ||||||
|  |     @override_settings(PASSPHRASE="test") | ||||||
|  |     def test_export_zipped(self): | ||||||
|  |         shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) | ||||||
|  |         shutil.copytree( | ||||||
|  |             os.path.join(os.path.dirname(__file__), "samples", "documents"), | ||||||
|  |             os.path.join(self.dirs.media_dir, "documents"), | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         args = ["document_exporter", self.target, "--zip"] | ||||||
|  | 
 | ||||||
|  |         call_command(*args) | ||||||
|  | 
 | ||||||
|  |         expected_file = os.path.join( | ||||||
|  |             self.target, | ||||||
|  |             f"export-{timezone.localdate().isoformat()}.zip", | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         self.assertTrue(os.path.isfile(expected_file)) | ||||||
|  | 
 | ||||||
|  |         with ZipFile(expected_file) as zip: | ||||||
|  |             self.assertEqual(len(zip.namelist()), 11) | ||||||
|  |             self.assertIn("manifest.json", zip.namelist()) | ||||||
|  |             self.assertIn("version.json", zip.namelist()) | ||||||
|  | 
 | ||||||
|  |     @override_settings(PASSPHRASE="test") | ||||||
|  |     def test_export_zipped_format(self): | ||||||
|  |         shutil.rmtree(os.path.join(self.dirs.media_dir, "documents")) | ||||||
|  |         shutil.copytree( | ||||||
|  |             os.path.join(os.path.dirname(__file__), "samples", "documents"), | ||||||
|  |             os.path.join(self.dirs.media_dir, "documents"), | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         args = ["document_exporter", self.target, "--zip", "--use-filename-format"] | ||||||
|  | 
 | ||||||
|  |         with override_settings( | ||||||
|  |             FILENAME_FORMAT="{created_year}/{correspondent}/{title}", | ||||||
|  |         ): | ||||||
|  |             call_command(*args) | ||||||
|  | 
 | ||||||
|  |         expected_file = os.path.join( | ||||||
|  |             self.target, | ||||||
|  |             f"export-{timezone.localdate().isoformat()}.zip", | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         self.assertTrue(os.path.isfile(expected_file)) | ||||||
|  | 
 | ||||||
|  |         with ZipFile(expected_file) as zip: | ||||||
|  |             print(zip.namelist()) | ||||||
|  |             # Extras are from the directories, which also appear in the listing | ||||||
|  |             self.assertEqual(len(zip.namelist()), 14) | ||||||
|  |             self.assertIn("manifest.json", zip.namelist()) | ||||||
|  |             self.assertIn("version.json", zip.namelist()) | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user