mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 19:17:13 -05:00 
			
		
		
		
	testing the updated migration
This commit is contained in:
		
							parent
							
								
									1ba89ddd09
								
							
						
					
					
						commit
						69d7f8c180
					
				@ -6,21 +6,19 @@ from pathlib import Path
 | 
				
			|||||||
from django.conf import settings
 | 
					from django.conf import settings
 | 
				
			||||||
from django.test import override_settings
 | 
					from django.test import override_settings
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from documents.sanity_checker import SanityFailedError
 | 
					 | 
				
			||||||
from documents.tasks import sanity_check
 | 
					 | 
				
			||||||
from documents.tests.utils import DirectoriesMixin, TestMigrations
 | 
					from documents.tests.utils import DirectoriesMixin, TestMigrations
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
STORAGE_TYPE_GPG = "gpg"
 | 
					STORAGE_TYPE_GPG = "gpg"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def archive_name_from_filename_old(filename):
 | 
					def archive_name_from_filename(filename):
 | 
				
			||||||
    return os.path.splitext(filename)[0] + ".pdf"
 | 
					    return os.path.splitext(filename)[0] + ".pdf"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def archive_path_old(self):
 | 
					def archive_path_old(self):
 | 
				
			||||||
    if self.filename:
 | 
					    if self.filename:
 | 
				
			||||||
        fname = archive_name_from_filename_old(self.filename)
 | 
					        fname = archive_name_from_filename(self.filename)
 | 
				
			||||||
    else:
 | 
					    else:
 | 
				
			||||||
        fname = "{:07}.pdf".format(self.pk)
 | 
					        fname = "{:07}.pdf".format(self.pk)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -30,24 +28,14 @@ def archive_path_old(self):
 | 
				
			|||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def archive_name_from_filename_new(filename):
 | 
					def archive_path_new(doc):
 | 
				
			||||||
    name, ext = os.path.splitext(filename)
 | 
					        if doc.archive_filename is not None:
 | 
				
			||||||
    if ext == ".pdf":
 | 
					 | 
				
			||||||
        return filename
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        return filename + ".pdf"
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
def archive_path_new(self):
 | 
					 | 
				
			||||||
    if self.filename:
 | 
					 | 
				
			||||||
        fname = archive_name_from_filename_new(self.filename)
 | 
					 | 
				
			||||||
    else:
 | 
					 | 
				
			||||||
        fname = "{:07}.pdf".format(self.pk)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
            return os.path.join(
 | 
					            return os.path.join(
 | 
				
			||||||
                settings.ARCHIVE_DIR,
 | 
					                settings.ARCHIVE_DIR,
 | 
				
			||||||
        fname
 | 
					                str(doc.archive_filename)
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					        else:
 | 
				
			||||||
 | 
					            return None
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def source_path(doc):
 | 
					def source_path(doc):
 | 
				
			||||||
@ -75,22 +63,25 @@ def thumbnail_path(doc):
 | 
				
			|||||||
    )
 | 
					    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
def make_test_document(document_class, title: str, filename: str, mime_type: str, original: str, archive: str = None, new: bool = False):
 | 
					def make_test_document(document_class, title: str, mime_type: str, original: str, original_filename: str, archive: str = None, archive_filename: str = None):
 | 
				
			||||||
    doc = document_class()
 | 
					    doc = document_class()
 | 
				
			||||||
    doc.filename = filename
 | 
					    doc.filename = original_filename
 | 
				
			||||||
    doc.title = title
 | 
					    doc.title = title
 | 
				
			||||||
    doc.mime_type = mime_type
 | 
					    doc.mime_type = mime_type
 | 
				
			||||||
    doc.content = "the content, does not matter for this test"
 | 
					    doc.content = "the content, does not matter for this test"
 | 
				
			||||||
 | 
					    doc.save()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    shutil.copy2(original, source_path(doc))
 | 
					    shutil.copy2(original, source_path(doc))
 | 
				
			||||||
    with open(original, "rb") as f:
 | 
					    with open(original, "rb") as f:
 | 
				
			||||||
        doc.checksum = hashlib.md5(f.read()).hexdigest()
 | 
					        doc.checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if archive:
 | 
					    if archive:
 | 
				
			||||||
        if new:
 | 
					        if archive_filename:
 | 
				
			||||||
 | 
					            doc.archive_filename = archive_filename
 | 
				
			||||||
            shutil.copy2(archive, archive_path_new(doc))
 | 
					            shutil.copy2(archive, archive_path_new(doc))
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            shutil.copy2(archive, archive_path_old(doc))
 | 
					            shutil.copy2(archive, archive_path_old(doc))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        with open(archive, "rb") as f:
 | 
					        with open(archive, "rb") as f:
 | 
				
			||||||
            doc.archive_checksum = hashlib.md5(f.read()).hexdigest()
 | 
					            doc.archive_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -101,36 +92,42 @@ def make_test_document(document_class, title: str, filename: str, mime_type: str
 | 
				
			|||||||
    return doc
 | 
					    return doc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@override_settings(PAPERLESS_FILENAME_FORMAT="{title}")
 | 
					simple_jpg = os.path.join(os.path.dirname(__file__), "samples", "simple.jpg")
 | 
				
			||||||
 | 
					simple_pdf = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
 | 
				
			||||||
 | 
					simple_pdf2 = os.path.join(os.path.dirname(__file__), "samples", "documents", "originals", "0000002.pdf")
 | 
				
			||||||
 | 
					simple_txt = os.path.join(os.path.dirname(__file__), "samples", "simple.txt")
 | 
				
			||||||
 | 
					simple_png = os.path.join(os.path.dirname(__file__), "samples", "simple-noalpha.png")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@override_settings(PAPERLESS_FILENAME_FORMAT="")
 | 
				
			||||||
class TestMigrateArchiveFiles(DirectoriesMixin, TestMigrations):
 | 
					class TestMigrateArchiveFiles(DirectoriesMixin, TestMigrations):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    migrate_from = '1011_auto_20210101_2340'
 | 
					    migrate_from = '1011_auto_20210101_2340'
 | 
				
			||||||
    migrate_to = '1012_fix_archive_files'
 | 
					    migrate_to = '1012_fix_archive_files'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def setUpBeforeMigration(self, apps):
 | 
					    def setUpBeforeMigration(self, apps):
 | 
				
			||||||
        simple_jpg = os.path.join(os.path.dirname(__file__), "samples", "simple.jpg")
 | 
					 | 
				
			||||||
        simple_pdf = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
 | 
					 | 
				
			||||||
        simple_pdf2 = os.path.join(os.path.dirname(__file__), "samples", "documents", "originals", "0000002.pdf")
 | 
					 | 
				
			||||||
        simple_txt = os.path.join(os.path.dirname(__file__), "samples", "simple.txt")
 | 
					 | 
				
			||||||
        simple_png = os.path.join(os.path.dirname(__file__), "samples", "simple-noalpha.png")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        Document = apps.get_model("documents", "Document")
 | 
					        Document = apps.get_model("documents", "Document")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.doc_unrelated = make_test_document(Document, "unrelated", "unrelated.txt", "application/pdf", simple_pdf2, simple_pdf2)
 | 
					        doc_no_archive = make_test_document(Document, "no_archive", "text/plain", simple_txt, "no_archive.txt")
 | 
				
			||||||
        self.doc_no_archive = make_test_document(Document, "no_archive", "no_archive.txt", "text/plain", simple_txt)
 | 
					        clash1 = make_test_document(Document, "clash", "application/pdf", simple_pdf, "clash.pdf", simple_pdf)
 | 
				
			||||||
        self.clashA = make_test_document(Document, "clash", "clash.pdf", "application/pdf", simple_pdf, simple_pdf)
 | 
					        clash2 = make_test_document(Document, "clash", "image/jpeg", simple_jpg, "clash.jpg", simple_pdf)
 | 
				
			||||||
        self.clashB = make_test_document(Document, "clash", "clash.jpg", "image/jpeg", simple_jpg, simple_pdf)
 | 
					        clash3 = make_test_document(Document, "clash", "image/png", simple_png, "clash.png", simple_pdf)
 | 
				
			||||||
        self.clashC = make_test_document(Document, "clash", "clash.png", "image/png", simple_png, simple_pdf)
 | 
					        clash4 = make_test_document(Document, "clash.png", "application/pdf", simple_pdf2, "clash.png.pdf", simple_pdf2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.assertEqual(archive_path_old(self.clashA), archive_path_old(self.clashB))
 | 
					        self.assertEqual(archive_path_old(clash1), archive_path_old(clash2))
 | 
				
			||||||
        self.assertEqual(archive_path_old(self.clashA), archive_path_old(self.clashC))
 | 
					        self.assertEqual(archive_path_old(clash1), archive_path_old(clash3))
 | 
				
			||||||
        self.assertRaises(SanityFailedError, sanity_check)
 | 
					        self.assertNotEqual(archive_path_old(clash1), archive_path_old(clash4))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def testArchiveFilesMigrated(self):
 | 
					    def testArchiveFilesMigrated(self):
 | 
				
			||||||
        Document = self.apps.get_model('documents', 'Document')
 | 
					        Document = self.apps.get_model('documents', 'Document')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for doc in Document.objects.all():
 | 
					        for doc in Document.objects.all():
 | 
				
			||||||
            self.assertTrue(os.path.isfile(archive_path_new(self.clashB)))
 | 
					            if doc.archive_checksum:
 | 
				
			||||||
 | 
					                self.assertIsNotNone(doc.archive_filename)
 | 
				
			||||||
 | 
					                self.assertTrue(os.path.isfile(archive_path_new(doc)))
 | 
				
			||||||
 | 
					            else:
 | 
				
			||||||
 | 
					                self.assertIsNone(doc.archive_filename)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            with open(source_path(doc), "rb") as f:
 | 
					            with open(source_path(doc), "rb") as f:
 | 
				
			||||||
                original_checksum = hashlib.md5(f.read()).hexdigest()
 | 
					                original_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
            self.assertEqual(original_checksum, doc.checksum)
 | 
					            self.assertEqual(original_checksum, doc.checksum)
 | 
				
			||||||
@ -143,32 +140,32 @@ class TestMigrateArchiveFiles(DirectoriesMixin, TestMigrations):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
        self.assertEqual(Document.objects.filter(archive_checksum__isnull=False).count(), 4)
 | 
					        self.assertEqual(Document.objects.filter(archive_checksum__isnull=False).count(), 4)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # this will raise errors when any inconsistencies remain after migration
 | 
					
 | 
				
			||||||
        sanity_check()
 | 
					@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/{title}")
 | 
				
			||||||
 | 
					class TestMigrateArchiveFilesWithFilenameFormat(TestMigrateArchiveFiles):
 | 
				
			||||||
 | 
					    pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@override_settings(PAPERLESS_FILENAME_FORMAT="")
 | 
				
			||||||
class TestMigrateArchiveFilesBackwards(DirectoriesMixin, TestMigrations):
 | 
					class TestMigrateArchiveFilesBackwards(DirectoriesMixin, TestMigrations):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    migrate_from = '1012_fix_archive_files'
 | 
					    migrate_from = '1012_fix_archive_files'
 | 
				
			||||||
    migrate_to = '1011_auto_20210101_2340'
 | 
					    migrate_to = '1011_auto_20210101_2340'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def setUpBeforeMigration(self, apps):
 | 
					    def setUpBeforeMigration(self, apps):
 | 
				
			||||||
        simple_jpg = os.path.join(os.path.dirname(__file__), "samples", "simple.jpg")
 | 
					 | 
				
			||||||
        simple_pdf = os.path.join(os.path.dirname(__file__), "samples", "simple.pdf")
 | 
					 | 
				
			||||||
        simple_pdf2 = os.path.join(os.path.dirname(__file__), "samples", "documents", "originals", "0000002.pdf")
 | 
					 | 
				
			||||||
        simple_txt = os.path.join(os.path.dirname(__file__), "samples", "simple.txt")
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Document = apps.get_model("documents", "Document")
 | 
					        Document = apps.get_model("documents", "Document")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.doc_unrelated = make_test_document(Document, "unrelated", "unrelated.txt", "application/pdf", simple_pdf2, simple_pdf2, new=True)
 | 
					        doc_unrelated = make_test_document(Document, "unrelated", "application/pdf", simple_pdf2, "unrelated.txt", simple_pdf2, "unrelated.pdf")
 | 
				
			||||||
        self.doc_no_archive = make_test_document(Document, "no_archive", "no_archive.txt", "text/plain", simple_txt, new=True)
 | 
					        doc_no_archive = make_test_document(Document, "no_archive", "text/plain", simple_txt, "no_archive.txt")
 | 
				
			||||||
        self.clashB = make_test_document(Document, "clash", "clash.jpg", "image/jpeg", simple_jpg, simple_pdf, new=True)
 | 
					        clashB = make_test_document(Document, "clash", "image/jpeg", simple_jpg, "clash.jpg", simple_pdf, "clash_02.pdf")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def testArchiveFilesReverted(self):
 | 
					    def testArchiveFilesReverted(self):
 | 
				
			||||||
        Document = self.apps.get_model('documents', 'Document')
 | 
					        Document = self.apps.get_model('documents', 'Document')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for doc in Document.objects.all():
 | 
					        for doc in Document.objects.all():
 | 
				
			||||||
            self.assertTrue(os.path.isfile(archive_path_old(self.clashB)))
 | 
					            if doc.archive_checksum:
 | 
				
			||||||
 | 
					                self.assertTrue(os.path.isfile(archive_path_old(doc)))
 | 
				
			||||||
            with open(source_path(doc), "rb") as f:
 | 
					            with open(source_path(doc), "rb") as f:
 | 
				
			||||||
                original_checksum = hashlib.md5(f.read()).hexdigest()
 | 
					                original_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
            self.assertEqual(original_checksum, doc.checksum)
 | 
					            self.assertEqual(original_checksum, doc.checksum)
 | 
				
			||||||
@ -178,3 +175,36 @@ class TestMigrateArchiveFilesBackwards(DirectoriesMixin, TestMigrations):
 | 
				
			|||||||
                with open(archive_path_old(doc), "rb") as f:
 | 
					                with open(archive_path_old(doc), "rb") as f:
 | 
				
			||||||
                    archive_checksum = hashlib.md5(f.read()).hexdigest()
 | 
					                    archive_checksum = hashlib.md5(f.read()).hexdigest()
 | 
				
			||||||
                self.assertEqual(archive_checksum, doc.archive_checksum)
 | 
					                self.assertEqual(archive_checksum, doc.archive_checksum)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertEqual(Document.objects.filter(archive_checksum__isnull=False).count(), 2)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@override_settings(PAPERLESS_FILENAME_FORMAT="{correspondent}/{title}")
 | 
				
			||||||
 | 
					class TestMigrateArchiveFilesBackwardsWithFilenameFormat(TestMigrateArchiveFilesBackwards):
 | 
				
			||||||
 | 
					    pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					@override_settings(PAPERLESS_FILENAME_FORMAT="")
 | 
				
			||||||
 | 
					class TestMigrateArchiveFilesBackwardsErrors(DirectoriesMixin, TestMigrations):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    migrate_from = '1012_fix_archive_files'
 | 
				
			||||||
 | 
					    migrate_to = '1011_auto_20210101_2340'
 | 
				
			||||||
 | 
					    auto_migrate = False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_filename_clash(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Document = self.apps.get_model("documents", "Document")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.clashA = make_test_document(Document, "clash", "application/pdf", simple_pdf, "clash.pdf", simple_pdf, "clash_02.pdf")
 | 
				
			||||||
 | 
					        self.clashB = make_test_document(Document, "clash", "image/jpeg", simple_jpg, "clash.jpg", simple_pdf, "clash_01.pdf")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertRaisesMessage(ValueError, "would clash with another archive filename", self.performMigration)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def test_filename_exists(self):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        Document = self.apps.get_model("documents", "Document")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.clashA = make_test_document(Document, "clash", "application/pdf", simple_pdf, "clash.pdf", simple_pdf, "clash.pdf")
 | 
				
			||||||
 | 
					        self.clashB = make_test_document(Document, "clash", "image/jpeg", simple_jpg, "clash.jpg", simple_pdf, "clash_01.pdf")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        self.assertRaisesMessage(ValueError, "file already exists.", self.performMigration)
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user