mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-04 03:27:12 -05:00 
			
		
		
		
	Chore: refactor consumer plugin checks to a pre-flight plugin (#9994)
This commit is contained in:
		
							parent
							
								
									42100588d5
								
							
						
					
					
						commit
						e97cfb9b5e
					
				@ -98,15 +98,7 @@ class ConsumerStatusShortMessage(str, Enum):
 | 
			
		||||
    FAILED = "failed"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ConsumerPlugin(
 | 
			
		||||
    AlwaysRunPluginMixin,
 | 
			
		||||
    NoSetupPluginMixin,
 | 
			
		||||
    NoCleanupPluginMixin,
 | 
			
		||||
    LoggingMixin,
 | 
			
		||||
    ConsumeTaskPlugin,
 | 
			
		||||
):
 | 
			
		||||
    logging_name = "paperless.consumer"
 | 
			
		||||
 | 
			
		||||
class ConsumerPluginMixin:
 | 
			
		||||
    def __init__(
 | 
			
		||||
        self,
 | 
			
		||||
        input_doc: ConsumableDocument,
 | 
			
		||||
@ -155,88 +147,16 @@ class ConsumerPlugin(
 | 
			
		||||
        self.log.error(log_message or message, exc_info=exc_info)
 | 
			
		||||
        raise ConsumerError(f"{self.filename}: {log_message or message}") from exception
 | 
			
		||||
 | 
			
		||||
    def pre_check_file_exists(self):
 | 
			
		||||
        """
 | 
			
		||||
        Confirm the input file still exists where it should
 | 
			
		||||
        """
 | 
			
		||||
        if TYPE_CHECKING:
 | 
			
		||||
            assert isinstance(self.input_doc.original_file, Path), (
 | 
			
		||||
                self.input_doc.original_file
 | 
			
		||||
            )
 | 
			
		||||
        if not self.input_doc.original_file.is_file():
 | 
			
		||||
            self._fail(
 | 
			
		||||
                ConsumerStatusShortMessage.FILE_NOT_FOUND,
 | 
			
		||||
                f"Cannot consume {self.input_doc.original_file}: File not found.",
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def pre_check_duplicate(self):
 | 
			
		||||
        """
 | 
			
		||||
        Using the MD5 of the file, check this exact file doesn't already exist
 | 
			
		||||
        """
 | 
			
		||||
        with Path(self.input_doc.original_file).open("rb") as f:
 | 
			
		||||
            checksum = hashlib.md5(f.read()).hexdigest()
 | 
			
		||||
        existing_doc = Document.global_objects.filter(
 | 
			
		||||
            Q(checksum=checksum) | Q(archive_checksum=checksum),
 | 
			
		||||
        )
 | 
			
		||||
        if existing_doc.exists():
 | 
			
		||||
            msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS
 | 
			
		||||
            log_msg = f"Not consuming {self.filename}: It is a duplicate of {existing_doc.get().title} (#{existing_doc.get().pk})."
 | 
			
		||||
 | 
			
		||||
            if existing_doc.first().deleted_at is not None:
 | 
			
		||||
                msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS_IN_TRASH
 | 
			
		||||
                log_msg += " Note: existing document is in the trash."
 | 
			
		||||
 | 
			
		||||
            if settings.CONSUMER_DELETE_DUPLICATES:
 | 
			
		||||
                Path(self.input_doc.original_file).unlink()
 | 
			
		||||
            self._fail(
 | 
			
		||||
                msg,
 | 
			
		||||
                log_msg,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def pre_check_directories(self):
 | 
			
		||||
        """
 | 
			
		||||
        Ensure all required directories exist before attempting to use them
 | 
			
		||||
        """
 | 
			
		||||
        settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
 | 
			
		||||
        settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
 | 
			
		||||
        settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
 | 
			
		||||
        settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
 | 
			
		||||
 | 
			
		||||
    def pre_check_asn_value(self):
 | 
			
		||||
        """
 | 
			
		||||
        Check that if override_asn is given, it is unique and within a valid range
 | 
			
		||||
        """
 | 
			
		||||
        if self.metadata.asn is None:
 | 
			
		||||
            # check not necessary in case no ASN gets set
 | 
			
		||||
            return
 | 
			
		||||
        # Validate the range is above zero and less than uint32_t max
 | 
			
		||||
        # otherwise, Whoosh can't handle it in the index
 | 
			
		||||
        if (
 | 
			
		||||
            self.metadata.asn < Document.ARCHIVE_SERIAL_NUMBER_MIN
 | 
			
		||||
            or self.metadata.asn > Document.ARCHIVE_SERIAL_NUMBER_MAX
 | 
			
		||||
        ):
 | 
			
		||||
            self._fail(
 | 
			
		||||
                ConsumerStatusShortMessage.ASN_RANGE,
 | 
			
		||||
                f"Not consuming {self.filename}: "
 | 
			
		||||
                f"Given ASN {self.metadata.asn} is out of range "
 | 
			
		||||
                f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, "
 | 
			
		||||
                f"{Document.ARCHIVE_SERIAL_NUMBER_MAX:,}]",
 | 
			
		||||
            )
 | 
			
		||||
        existing_asn_doc = Document.global_objects.filter(
 | 
			
		||||
            archive_serial_number=self.metadata.asn,
 | 
			
		||||
        )
 | 
			
		||||
        if existing_asn_doc.exists():
 | 
			
		||||
            msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS
 | 
			
		||||
            log_msg = f"Not consuming {self.filename}: Given ASN {self.metadata.asn} already exists!"
 | 
			
		||||
 | 
			
		||||
            if existing_asn_doc.first().deleted_at is not None:
 | 
			
		||||
                msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS_IN_TRASH
 | 
			
		||||
                log_msg += " Note: existing document is in the trash."
 | 
			
		||||
 | 
			
		||||
            self._fail(
 | 
			
		||||
                msg,
 | 
			
		||||
                log_msg,
 | 
			
		||||
            )
 | 
			
		||||
class ConsumerPlugin(
 | 
			
		||||
    AlwaysRunPluginMixin,
 | 
			
		||||
    NoSetupPluginMixin,
 | 
			
		||||
    NoCleanupPluginMixin,
 | 
			
		||||
    LoggingMixin,
 | 
			
		||||
    ConsumerPluginMixin,
 | 
			
		||||
    ConsumeTaskPlugin,
 | 
			
		||||
):
 | 
			
		||||
    logging_name = "paperless.consumer"
 | 
			
		||||
 | 
			
		||||
    def run_pre_consume_script(self):
 | 
			
		||||
        """
 | 
			
		||||
@ -366,20 +286,7 @@ class ConsumerPlugin(
 | 
			
		||||
        tempdir = None
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            self._send_progress(
 | 
			
		||||
                0,
 | 
			
		||||
                100,
 | 
			
		||||
                ProgressStatusOptions.STARTED,
 | 
			
		||||
                ConsumerStatusShortMessage.NEW_FILE,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # Make sure that preconditions for consuming the file are met.
 | 
			
		||||
 | 
			
		||||
            self.pre_check_file_exists()
 | 
			
		||||
            self.pre_check_directories()
 | 
			
		||||
            self.pre_check_duplicate()
 | 
			
		||||
            self.pre_check_asn_value()
 | 
			
		||||
 | 
			
		||||
            # Preflight has already run including progress update to 0%
 | 
			
		||||
            self.log.info(f"Consuming {self.filename}")
 | 
			
		||||
 | 
			
		||||
            # For the actual work, copy the file into a tempdir
 | 
			
		||||
@ -837,3 +744,113 @@ class ConsumerPlugin(
 | 
			
		||||
            copy_basic_file_stats(source, target)
 | 
			
		||||
        except Exception:  # pragma: no cover
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ConsumerPreflightPlugin(
 | 
			
		||||
    NoCleanupPluginMixin,
 | 
			
		||||
    NoSetupPluginMixin,
 | 
			
		||||
    AlwaysRunPluginMixin,
 | 
			
		||||
    LoggingMixin,
 | 
			
		||||
    ConsumerPluginMixin,
 | 
			
		||||
    ConsumeTaskPlugin,
 | 
			
		||||
):
 | 
			
		||||
    NAME: str = "ConsumerPreflightPlugin"
 | 
			
		||||
    logging_name = "paperless.consumer"
 | 
			
		||||
 | 
			
		||||
    def pre_check_file_exists(self):
 | 
			
		||||
        """
 | 
			
		||||
        Confirm the input file still exists where it should
 | 
			
		||||
        """
 | 
			
		||||
        if TYPE_CHECKING:
 | 
			
		||||
            assert isinstance(self.input_doc.original_file, Path), (
 | 
			
		||||
                self.input_doc.original_file
 | 
			
		||||
            )
 | 
			
		||||
        if not self.input_doc.original_file.is_file():
 | 
			
		||||
            self._fail(
 | 
			
		||||
                ConsumerStatusShortMessage.FILE_NOT_FOUND,
 | 
			
		||||
                f"Cannot consume {self.input_doc.original_file}: File not found.",
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def pre_check_duplicate(self):
 | 
			
		||||
        """
 | 
			
		||||
        Using the MD5 of the file, check this exact file doesn't already exist
 | 
			
		||||
        """
 | 
			
		||||
        with Path(self.input_doc.original_file).open("rb") as f:
 | 
			
		||||
            checksum = hashlib.md5(f.read()).hexdigest()
 | 
			
		||||
        existing_doc = Document.global_objects.filter(
 | 
			
		||||
            Q(checksum=checksum) | Q(archive_checksum=checksum),
 | 
			
		||||
        )
 | 
			
		||||
        if existing_doc.exists():
 | 
			
		||||
            msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS
 | 
			
		||||
            log_msg = f"Not consuming {self.filename}: It is a duplicate of {existing_doc.get().title} (#{existing_doc.get().pk})."
 | 
			
		||||
 | 
			
		||||
            if existing_doc.first().deleted_at is not None:
 | 
			
		||||
                msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS_IN_TRASH
 | 
			
		||||
                log_msg += " Note: existing document is in the trash."
 | 
			
		||||
 | 
			
		||||
            if settings.CONSUMER_DELETE_DUPLICATES:
 | 
			
		||||
                Path(self.input_doc.original_file).unlink()
 | 
			
		||||
            self._fail(
 | 
			
		||||
                msg,
 | 
			
		||||
                log_msg,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def pre_check_directories(self):
 | 
			
		||||
        """
 | 
			
		||||
        Ensure all required directories exist before attempting to use them
 | 
			
		||||
        """
 | 
			
		||||
        settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
 | 
			
		||||
        settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
 | 
			
		||||
        settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
 | 
			
		||||
        settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
 | 
			
		||||
 | 
			
		||||
    def pre_check_asn_value(self):
 | 
			
		||||
        """
 | 
			
		||||
        Check that if override_asn is given, it is unique and within a valid range
 | 
			
		||||
        """
 | 
			
		||||
        if self.metadata.asn is None:
 | 
			
		||||
            # check not necessary in case no ASN gets set
 | 
			
		||||
            return
 | 
			
		||||
        # Validate the range is above zero and less than uint32_t max
 | 
			
		||||
        # otherwise, Whoosh can't handle it in the index
 | 
			
		||||
        if (
 | 
			
		||||
            self.metadata.asn < Document.ARCHIVE_SERIAL_NUMBER_MIN
 | 
			
		||||
            or self.metadata.asn > Document.ARCHIVE_SERIAL_NUMBER_MAX
 | 
			
		||||
        ):
 | 
			
		||||
            self._fail(
 | 
			
		||||
                ConsumerStatusShortMessage.ASN_RANGE,
 | 
			
		||||
                f"Not consuming {self.filename}: "
 | 
			
		||||
                f"Given ASN {self.metadata.asn} is out of range "
 | 
			
		||||
                f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, "
 | 
			
		||||
                f"{Document.ARCHIVE_SERIAL_NUMBER_MAX:,}]",
 | 
			
		||||
            )
 | 
			
		||||
        existing_asn_doc = Document.global_objects.filter(
 | 
			
		||||
            archive_serial_number=self.metadata.asn,
 | 
			
		||||
        )
 | 
			
		||||
        if existing_asn_doc.exists():
 | 
			
		||||
            msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS
 | 
			
		||||
            log_msg = f"Not consuming {self.filename}: Given ASN {self.metadata.asn} already exists!"
 | 
			
		||||
 | 
			
		||||
            if existing_asn_doc.first().deleted_at is not None:
 | 
			
		||||
                msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS_IN_TRASH
 | 
			
		||||
                log_msg += " Note: existing document is in the trash."
 | 
			
		||||
 | 
			
		||||
            self._fail(
 | 
			
		||||
                msg,
 | 
			
		||||
                log_msg,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    def run(self) -> None:
 | 
			
		||||
        self._send_progress(
 | 
			
		||||
            0,
 | 
			
		||||
            100,
 | 
			
		||||
            ProgressStatusOptions.STARTED,
 | 
			
		||||
            ConsumerStatusShortMessage.NEW_FILE,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # Make sure that preconditions for consuming the file are met.
 | 
			
		||||
 | 
			
		||||
        self.pre_check_file_exists()
 | 
			
		||||
        self.pre_check_duplicate()
 | 
			
		||||
        self.pre_check_directories()
 | 
			
		||||
        self.pre_check_asn_value()
 | 
			
		||||
 | 
			
		||||
@ -26,6 +26,7 @@ from documents.caching import clear_document_caches
 | 
			
		||||
from documents.classifier import DocumentClassifier
 | 
			
		||||
from documents.classifier import load_classifier
 | 
			
		||||
from documents.consumer import ConsumerPlugin
 | 
			
		||||
from documents.consumer import ConsumerPreflightPlugin
 | 
			
		||||
from documents.consumer import WorkflowTriggerPlugin
 | 
			
		||||
from documents.data_models import ConsumableDocument
 | 
			
		||||
from documents.data_models import DocumentMetadataOverrides
 | 
			
		||||
@ -144,6 +145,7 @@ def consume_file(
 | 
			
		||||
        overrides = DocumentMetadataOverrides()
 | 
			
		||||
 | 
			
		||||
    plugins: list[type[ConsumeTaskPlugin]] = [
 | 
			
		||||
        ConsumerPreflightPlugin,
 | 
			
		||||
        CollatePlugin,
 | 
			
		||||
        BarcodePlugin,
 | 
			
		||||
        WorkflowTriggerPlugin,
 | 
			
		||||
 | 
			
		||||
@ -484,8 +484,8 @@ class TestConsumer(
 | 
			
		||||
        self._assert_first_last_send_progress()
 | 
			
		||||
 | 
			
		||||
    def testNotAFile(self):
 | 
			
		||||
        with self.get_consumer(Path("non-existing-file")) as consumer:
 | 
			
		||||
            with self.assertRaisesMessage(ConsumerError, "File not found"):
 | 
			
		||||
        with self.assertRaisesMessage(ConsumerError, "File not found"):
 | 
			
		||||
            with self.get_consumer(Path("non-existing-file")) as consumer:
 | 
			
		||||
                consumer.run()
 | 
			
		||||
        self._assert_first_last_send_progress(last_status="FAILED")
 | 
			
		||||
 | 
			
		||||
@ -493,8 +493,8 @@ class TestConsumer(
 | 
			
		||||
        with self.get_consumer(self.get_test_file()) as consumer:
 | 
			
		||||
            consumer.run()
 | 
			
		||||
 | 
			
		||||
        with self.get_consumer(self.get_test_file()) as consumer:
 | 
			
		||||
            with self.assertRaisesMessage(ConsumerError, "It is a duplicate"):
 | 
			
		||||
        with self.assertRaisesMessage(ConsumerError, "It is a duplicate"):
 | 
			
		||||
            with self.get_consumer(self.get_test_file()) as consumer:
 | 
			
		||||
                consumer.run()
 | 
			
		||||
 | 
			
		||||
        self._assert_first_last_send_progress(last_status="FAILED")
 | 
			
		||||
@ -503,8 +503,8 @@ class TestConsumer(
 | 
			
		||||
        with self.get_consumer(self.get_test_file()) as consumer:
 | 
			
		||||
            consumer.run()
 | 
			
		||||
 | 
			
		||||
        with self.get_consumer(self.get_test_archive_file()) as consumer:
 | 
			
		||||
            with self.assertRaisesMessage(ConsumerError, "It is a duplicate"):
 | 
			
		||||
        with self.assertRaisesMessage(ConsumerError, "It is a duplicate"):
 | 
			
		||||
            with self.get_consumer(self.get_test_archive_file()) as consumer:
 | 
			
		||||
                consumer.run()
 | 
			
		||||
 | 
			
		||||
        self._assert_first_last_send_progress(last_status="FAILED")
 | 
			
		||||
@ -521,8 +521,8 @@ class TestConsumer(
 | 
			
		||||
 | 
			
		||||
        Document.objects.all().delete()
 | 
			
		||||
 | 
			
		||||
        with self.get_consumer(self.get_test_file()) as consumer:
 | 
			
		||||
            with self.assertRaisesMessage(ConsumerError, "document is in the trash"):
 | 
			
		||||
        with self.assertRaisesMessage(ConsumerError, "document is in the trash"):
 | 
			
		||||
            with self.get_consumer(self.get_test_file()) as consumer:
 | 
			
		||||
                consumer.run()
 | 
			
		||||
 | 
			
		||||
    def testAsnExists(self):
 | 
			
		||||
@ -532,11 +532,11 @@ class TestConsumer(
 | 
			
		||||
        ) as consumer:
 | 
			
		||||
            consumer.run()
 | 
			
		||||
 | 
			
		||||
        with self.get_consumer(
 | 
			
		||||
            self.get_test_file2(),
 | 
			
		||||
            DocumentMetadataOverrides(asn=123),
 | 
			
		||||
        ) as consumer:
 | 
			
		||||
            with self.assertRaisesMessage(ConsumerError, "ASN 123 already exists"):
 | 
			
		||||
        with self.assertRaisesMessage(ConsumerError, "ASN 123 already exists"):
 | 
			
		||||
            with self.get_consumer(
 | 
			
		||||
                self.get_test_file2(),
 | 
			
		||||
                DocumentMetadataOverrides(asn=123),
 | 
			
		||||
            ) as consumer:
 | 
			
		||||
                consumer.run()
 | 
			
		||||
 | 
			
		||||
    def testAsnExistsInTrash(self):
 | 
			
		||||
@ -549,22 +549,22 @@ class TestConsumer(
 | 
			
		||||
            document = Document.objects.first()
 | 
			
		||||
            document.delete()
 | 
			
		||||
 | 
			
		||||
        with self.get_consumer(
 | 
			
		||||
            self.get_test_file2(),
 | 
			
		||||
            DocumentMetadataOverrides(asn=123),
 | 
			
		||||
        ) as consumer:
 | 
			
		||||
            with self.assertRaisesMessage(ConsumerError, "document is in the trash"):
 | 
			
		||||
        with self.assertRaisesMessage(ConsumerError, "document is in the trash"):
 | 
			
		||||
            with self.get_consumer(
 | 
			
		||||
                self.get_test_file2(),
 | 
			
		||||
                DocumentMetadataOverrides(asn=123),
 | 
			
		||||
            ) as consumer:
 | 
			
		||||
                consumer.run()
 | 
			
		||||
 | 
			
		||||
    @mock.patch("documents.parsers.document_consumer_declaration.send")
 | 
			
		||||
    def testNoParsers(self, m):
 | 
			
		||||
        m.return_value = []
 | 
			
		||||
 | 
			
		||||
        with self.get_consumer(self.get_test_file()) as consumer:
 | 
			
		||||
            with self.assertRaisesMessage(
 | 
			
		||||
                ConsumerError,
 | 
			
		||||
                "sample.pdf: Unsupported mime type application/pdf",
 | 
			
		||||
            ):
 | 
			
		||||
        with self.assertRaisesMessage(
 | 
			
		||||
            ConsumerError,
 | 
			
		||||
            "sample.pdf: Unsupported mime type application/pdf",
 | 
			
		||||
        ):
 | 
			
		||||
            with self.get_consumer(self.get_test_file()) as consumer:
 | 
			
		||||
                consumer.run()
 | 
			
		||||
 | 
			
		||||
        self._assert_first_last_send_progress(last_status="FAILED")
 | 
			
		||||
@ -726,8 +726,8 @@ class TestConsumer(
 | 
			
		||||
        dst = self.get_test_file()
 | 
			
		||||
        self.assertIsFile(dst)
 | 
			
		||||
 | 
			
		||||
        with self.get_consumer(dst) as consumer:
 | 
			
		||||
            with self.assertRaises(ConsumerError):
 | 
			
		||||
        with self.assertRaises(ConsumerError):
 | 
			
		||||
            with self.get_consumer(dst) as consumer:
 | 
			
		||||
                consumer.run()
 | 
			
		||||
 | 
			
		||||
        self.assertIsNotFile(dst)
 | 
			
		||||
@ -751,11 +751,11 @@ class TestConsumer(
 | 
			
		||||
        dst = self.get_test_file()
 | 
			
		||||
        self.assertIsFile(dst)
 | 
			
		||||
 | 
			
		||||
        with self.get_consumer(dst) as consumer:
 | 
			
		||||
            with self.assertRaisesRegex(
 | 
			
		||||
                ConsumerError,
 | 
			
		||||
                r"sample\.pdf: Not consuming sample\.pdf: It is a duplicate of sample \(#\d+\)",
 | 
			
		||||
            ):
 | 
			
		||||
        with self.assertRaisesRegex(
 | 
			
		||||
            ConsumerError,
 | 
			
		||||
            r"sample\.pdf: Not consuming sample\.pdf: It is a duplicate of sample \(#\d+\)",
 | 
			
		||||
        ):
 | 
			
		||||
            with self.get_consumer(dst) as consumer:
 | 
			
		||||
                consumer.run()
 | 
			
		||||
 | 
			
		||||
        self.assertIsFile(dst)
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ from django.test import TransactionTestCase
 | 
			
		||||
from django.test import override_settings
 | 
			
		||||
 | 
			
		||||
from documents.consumer import ConsumerPlugin
 | 
			
		||||
from documents.consumer import ConsumerPreflightPlugin
 | 
			
		||||
from documents.data_models import ConsumableDocument
 | 
			
		||||
from documents.data_models import DocumentMetadataOverrides
 | 
			
		||||
from documents.data_models import DocumentSource
 | 
			
		||||
@ -344,12 +345,21 @@ class GetConsumerMixin:
 | 
			
		||||
    ) -> Generator[ConsumerPlugin, None, None]:
 | 
			
		||||
        # Store this for verification
 | 
			
		||||
        self.status = DummyProgressManager(filepath.name, None)
 | 
			
		||||
        doc = ConsumableDocument(
 | 
			
		||||
            source,
 | 
			
		||||
            original_file=filepath,
 | 
			
		||||
            mailrule_id=mailrule_id or None,
 | 
			
		||||
        )
 | 
			
		||||
        preflight_plugin = ConsumerPreflightPlugin(
 | 
			
		||||
            doc,
 | 
			
		||||
            overrides or DocumentMetadataOverrides(),
 | 
			
		||||
            self.status,  # type: ignore
 | 
			
		||||
            self.dirs.scratch_dir,
 | 
			
		||||
            "task-id",
 | 
			
		||||
        )
 | 
			
		||||
        preflight_plugin.setup()
 | 
			
		||||
        reader = ConsumerPlugin(
 | 
			
		||||
            ConsumableDocument(
 | 
			
		||||
                source,
 | 
			
		||||
                original_file=filepath,
 | 
			
		||||
                mailrule_id=mailrule_id or None,
 | 
			
		||||
            ),
 | 
			
		||||
            doc,
 | 
			
		||||
            overrides or DocumentMetadataOverrides(),
 | 
			
		||||
            self.status,  # type: ignore
 | 
			
		||||
            self.dirs.scratch_dir,
 | 
			
		||||
@ -357,6 +367,7 @@ class GetConsumerMixin:
 | 
			
		||||
        )
 | 
			
		||||
        reader.setup()
 | 
			
		||||
        try:
 | 
			
		||||
            preflight_plugin.run()
 | 
			
		||||
            yield reader
 | 
			
		||||
        finally:
 | 
			
		||||
            reader.cleanup()
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user