Chore: refactor some consumer tasks to pre-flight plugin

This commit is contained in:
shamoon 2025-05-21 00:30:10 -07:00
parent bd5f05ff2b
commit 048b933371
No known key found for this signature in database
4 changed files with 169 additions and 139 deletions

View File

@ -98,15 +98,7 @@ class ConsumerStatusShortMessage(str, Enum):
FAILED = "failed" FAILED = "failed"
class ConsumerPlugin( class ConsumerPluginMixin:
AlwaysRunPluginMixin,
NoSetupPluginMixin,
NoCleanupPluginMixin,
LoggingMixin,
ConsumeTaskPlugin,
):
logging_name = "paperless.consumer"
def __init__( def __init__(
self, self,
input_doc: ConsumableDocument, input_doc: ConsumableDocument,
@ -155,88 +147,16 @@ class ConsumerPlugin(
self.log.error(log_message or message, exc_info=exc_info) self.log.error(log_message or message, exc_info=exc_info)
raise ConsumerError(f"{self.filename}: {log_message or message}") from exception raise ConsumerError(f"{self.filename}: {log_message or message}") from exception
def pre_check_file_exists(self):
"""
Confirm the input file still exists where it should
"""
if TYPE_CHECKING:
assert isinstance(self.input_doc.original_file, Path), (
self.input_doc.original_file
)
if not self.input_doc.original_file.is_file():
self._fail(
ConsumerStatusShortMessage.FILE_NOT_FOUND,
f"Cannot consume {self.input_doc.original_file}: File not found.",
)
def pre_check_duplicate(self): class ConsumerPlugin(
""" AlwaysRunPluginMixin,
Using the MD5 of the file, check this exact file doesn't already exist NoSetupPluginMixin,
""" NoCleanupPluginMixin,
with Path(self.input_doc.original_file).open("rb") as f: LoggingMixin,
checksum = hashlib.md5(f.read()).hexdigest() ConsumerPluginMixin,
existing_doc = Document.global_objects.filter( ConsumeTaskPlugin,
Q(checksum=checksum) | Q(archive_checksum=checksum),
)
if existing_doc.exists():
msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS
log_msg = f"Not consuming {self.filename}: It is a duplicate of {existing_doc.get().title} (#{existing_doc.get().pk})."
if existing_doc.first().deleted_at is not None:
msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS_IN_TRASH
log_msg += " Note: existing document is in the trash."
if settings.CONSUMER_DELETE_DUPLICATES:
Path(self.input_doc.original_file).unlink()
self._fail(
msg,
log_msg,
)
def pre_check_directories(self):
"""
Ensure all required directories exist before attempting to use them
"""
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
def pre_check_asn_value(self):
"""
Check that if override_asn is given, it is unique and within a valid range
"""
if self.metadata.asn is None:
# check not necessary in case no ASN gets set
return
# Validate the range is above zero and less than uint32_t max
# otherwise, Whoosh can't handle it in the index
if (
self.metadata.asn < Document.ARCHIVE_SERIAL_NUMBER_MIN
or self.metadata.asn > Document.ARCHIVE_SERIAL_NUMBER_MAX
): ):
self._fail( logging_name = "paperless.consumer"
ConsumerStatusShortMessage.ASN_RANGE,
f"Not consuming {self.filename}: "
f"Given ASN {self.metadata.asn} is out of range "
f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, "
f"{Document.ARCHIVE_SERIAL_NUMBER_MAX:,}]",
)
existing_asn_doc = Document.global_objects.filter(
archive_serial_number=self.metadata.asn,
)
if existing_asn_doc.exists():
msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS
log_msg = f"Not consuming {self.filename}: Given ASN {self.metadata.asn} already exists!"
if existing_asn_doc.first().deleted_at is not None:
msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS_IN_TRASH
log_msg += " Note: existing document is in the trash."
self._fail(
msg,
log_msg,
)
def run_pre_consume_script(self): def run_pre_consume_script(self):
""" """
@ -365,20 +285,7 @@ class ConsumerPlugin(
tempdir = None tempdir = None
try: try:
self._send_progress( # Preflight has already run including progress update to 0%
0,
100,
ProgressStatusOptions.STARTED,
ConsumerStatusShortMessage.NEW_FILE,
)
# Make sure that preconditions for consuming the file are met.
self.pre_check_file_exists()
self.pre_check_directories()
self.pre_check_duplicate()
self.pre_check_asn_value()
self.log.info(f"Consuming {self.filename}") self.log.info(f"Consuming {self.filename}")
# For the actual work, copy the file into a tempdir # For the actual work, copy the file into a tempdir
@ -836,3 +743,113 @@ class ConsumerPlugin(
copy_basic_file_stats(source, target) copy_basic_file_stats(source, target)
except Exception: # pragma: no cover except Exception: # pragma: no cover
pass pass
class ConsumerPreflightPlugin(
NoCleanupPluginMixin,
NoSetupPluginMixin,
AlwaysRunPluginMixin,
LoggingMixin,
ConsumerPluginMixin,
ConsumeTaskPlugin,
):
NAME: str = "ConsumerPreflightPlugin"
logging_name = "paperless.consumer"
def pre_check_file_exists(self):
"""
Confirm the input file still exists where it should
"""
if TYPE_CHECKING:
assert isinstance(self.input_doc.original_file, Path), (
self.input_doc.original_file
)
if not self.input_doc.original_file.is_file():
self._fail(
ConsumerStatusShortMessage.FILE_NOT_FOUND,
f"Cannot consume {self.input_doc.original_file}: File not found.",
)
def pre_check_duplicate(self):
"""
Using the MD5 of the file, check this exact file doesn't already exist
"""
with Path(self.input_doc.original_file).open("rb") as f:
checksum = hashlib.md5(f.read()).hexdigest()
existing_doc = Document.global_objects.filter(
Q(checksum=checksum) | Q(archive_checksum=checksum),
)
if existing_doc.exists():
msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS
log_msg = f"Not consuming {self.filename}: It is a duplicate of {existing_doc.get().title} (#{existing_doc.get().pk})."
if existing_doc.first().deleted_at is not None:
msg = ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS_IN_TRASH
log_msg += " Note: existing document is in the trash."
if settings.CONSUMER_DELETE_DUPLICATES:
Path(self.input_doc.original_file).unlink()
self._fail(
msg,
log_msg,
)
def pre_check_directories(self):
"""
Ensure all required directories exist before attempting to use them
"""
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
settings.THUMBNAIL_DIR.mkdir(parents=True, exist_ok=True)
settings.ORIGINALS_DIR.mkdir(parents=True, exist_ok=True)
settings.ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
def pre_check_asn_value(self):
"""
Check that if override_asn is given, it is unique and within a valid range
"""
if self.metadata.asn is None:
# check not necessary in case no ASN gets set
return
# Validate the range is above zero and less than uint32_t max
# otherwise, Whoosh can't handle it in the index
if (
self.metadata.asn < Document.ARCHIVE_SERIAL_NUMBER_MIN
or self.metadata.asn > Document.ARCHIVE_SERIAL_NUMBER_MAX
):
self._fail(
ConsumerStatusShortMessage.ASN_RANGE,
f"Not consuming {self.filename}: "
f"Given ASN {self.metadata.asn} is out of range "
f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, "
f"{Document.ARCHIVE_SERIAL_NUMBER_MAX:,}]",
)
existing_asn_doc = Document.global_objects.filter(
archive_serial_number=self.metadata.asn,
)
if existing_asn_doc.exists():
msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS
log_msg = f"Not consuming {self.filename}: Given ASN {self.metadata.asn} already exists!"
if existing_asn_doc.first().deleted_at is not None:
msg = ConsumerStatusShortMessage.ASN_ALREADY_EXISTS_IN_TRASH
log_msg += " Note: existing document is in the trash."
self._fail(
msg,
log_msg,
)
def run(self) -> None:
self._send_progress(
0,
100,
ProgressStatusOptions.STARTED,
ConsumerStatusShortMessage.NEW_FILE,
)
# Make sure that preconditions for consuming the file are met.
self.pre_check_file_exists()
self.pre_check_duplicate()
self.pre_check_directories()
self.pre_check_asn_value()

View File

@ -26,6 +26,7 @@ from documents.caching import clear_document_caches
from documents.classifier import DocumentClassifier from documents.classifier import DocumentClassifier
from documents.classifier import load_classifier from documents.classifier import load_classifier
from documents.consumer import ConsumerPlugin from documents.consumer import ConsumerPlugin
from documents.consumer import ConsumerPreflightPlugin
from documents.consumer import WorkflowTriggerPlugin from documents.consumer import WorkflowTriggerPlugin
from documents.data_models import ConsumableDocument from documents.data_models import ConsumableDocument
from documents.data_models import DocumentMetadataOverrides from documents.data_models import DocumentMetadataOverrides
@ -144,6 +145,7 @@ def consume_file(
overrides = DocumentMetadataOverrides() overrides = DocumentMetadataOverrides()
plugins: list[type[ConsumeTaskPlugin]] = [ plugins: list[type[ConsumeTaskPlugin]] = [
ConsumerPreflightPlugin,
CollatePlugin, CollatePlugin,
BarcodePlugin, BarcodePlugin,
WorkflowTriggerPlugin, WorkflowTriggerPlugin,

View File

@ -484,8 +484,8 @@ class TestConsumer(
self._assert_first_last_send_progress() self._assert_first_last_send_progress()
def testNotAFile(self): def testNotAFile(self):
with self.get_consumer(Path("non-existing-file")) as consumer:
with self.assertRaisesMessage(ConsumerError, "File not found"): with self.assertRaisesMessage(ConsumerError, "File not found"):
with self.get_consumer(Path("non-existing-file")) as consumer:
consumer.run() consumer.run()
self._assert_first_last_send_progress(last_status="FAILED") self._assert_first_last_send_progress(last_status="FAILED")
@ -493,8 +493,8 @@ class TestConsumer(
with self.get_consumer(self.get_test_file()) as consumer: with self.get_consumer(self.get_test_file()) as consumer:
consumer.run() consumer.run()
with self.get_consumer(self.get_test_file()) as consumer:
with self.assertRaisesMessage(ConsumerError, "It is a duplicate"): with self.assertRaisesMessage(ConsumerError, "It is a duplicate"):
with self.get_consumer(self.get_test_file()) as consumer:
consumer.run() consumer.run()
self._assert_first_last_send_progress(last_status="FAILED") self._assert_first_last_send_progress(last_status="FAILED")
@ -503,8 +503,8 @@ class TestConsumer(
with self.get_consumer(self.get_test_file()) as consumer: with self.get_consumer(self.get_test_file()) as consumer:
consumer.run() consumer.run()
with self.get_consumer(self.get_test_archive_file()) as consumer:
with self.assertRaisesMessage(ConsumerError, "It is a duplicate"): with self.assertRaisesMessage(ConsumerError, "It is a duplicate"):
with self.get_consumer(self.get_test_archive_file()) as consumer:
consumer.run() consumer.run()
self._assert_first_last_send_progress(last_status="FAILED") self._assert_first_last_send_progress(last_status="FAILED")
@ -521,8 +521,8 @@ class TestConsumer(
Document.objects.all().delete() Document.objects.all().delete()
with self.get_consumer(self.get_test_file()) as consumer:
with self.assertRaisesMessage(ConsumerError, "document is in the trash"): with self.assertRaisesMessage(ConsumerError, "document is in the trash"):
with self.get_consumer(self.get_test_file()) as consumer:
consumer.run() consumer.run()
def testAsnExists(self): def testAsnExists(self):
@ -532,11 +532,11 @@ class TestConsumer(
) as consumer: ) as consumer:
consumer.run() consumer.run()
with self.assertRaisesMessage(ConsumerError, "ASN 123 already exists"):
with self.get_consumer( with self.get_consumer(
self.get_test_file2(), self.get_test_file2(),
DocumentMetadataOverrides(asn=123), DocumentMetadataOverrides(asn=123),
) as consumer: ) as consumer:
with self.assertRaisesMessage(ConsumerError, "ASN 123 already exists"):
consumer.run() consumer.run()
def testAsnExistsInTrash(self): def testAsnExistsInTrash(self):
@ -549,22 +549,22 @@ class TestConsumer(
document = Document.objects.first() document = Document.objects.first()
document.delete() document.delete()
with self.assertRaisesMessage(ConsumerError, "document is in the trash"):
with self.get_consumer( with self.get_consumer(
self.get_test_file2(), self.get_test_file2(),
DocumentMetadataOverrides(asn=123), DocumentMetadataOverrides(asn=123),
) as consumer: ) as consumer:
with self.assertRaisesMessage(ConsumerError, "document is in the trash"):
consumer.run() consumer.run()
@mock.patch("documents.parsers.document_consumer_declaration.send") @mock.patch("documents.parsers.document_consumer_declaration.send")
def testNoParsers(self, m): def testNoParsers(self, m):
m.return_value = [] m.return_value = []
with self.get_consumer(self.get_test_file()) as consumer:
with self.assertRaisesMessage( with self.assertRaisesMessage(
ConsumerError, ConsumerError,
"sample.pdf: Unsupported mime type application/pdf", "sample.pdf: Unsupported mime type application/pdf",
): ):
with self.get_consumer(self.get_test_file()) as consumer:
consumer.run() consumer.run()
self._assert_first_last_send_progress(last_status="FAILED") self._assert_first_last_send_progress(last_status="FAILED")
@ -726,8 +726,8 @@ class TestConsumer(
dst = self.get_test_file() dst = self.get_test_file()
self.assertIsFile(dst) self.assertIsFile(dst)
with self.get_consumer(dst) as consumer:
with self.assertRaises(ConsumerError): with self.assertRaises(ConsumerError):
with self.get_consumer(dst) as consumer:
consumer.run() consumer.run()
self.assertIsNotFile(dst) self.assertIsNotFile(dst)
@ -751,11 +751,11 @@ class TestConsumer(
dst = self.get_test_file() dst = self.get_test_file()
self.assertIsFile(dst) self.assertIsFile(dst)
with self.get_consumer(dst) as consumer:
with self.assertRaisesRegex( with self.assertRaisesRegex(
ConsumerError, ConsumerError,
r"sample\.pdf: Not consuming sample\.pdf: It is a duplicate of sample \(#\d+\)", r"sample\.pdf: Not consuming sample\.pdf: It is a duplicate of sample \(#\d+\)",
): ):
with self.get_consumer(dst) as consumer:
consumer.run() consumer.run()
self.assertIsFile(dst) self.assertIsFile(dst)

View File

@ -21,6 +21,7 @@ from django.test import TransactionTestCase
from django.test import override_settings from django.test import override_settings
from documents.consumer import ConsumerPlugin from documents.consumer import ConsumerPlugin
from documents.consumer import ConsumerPreflightPlugin
from documents.data_models import ConsumableDocument from documents.data_models import ConsumableDocument
from documents.data_models import DocumentMetadataOverrides from documents.data_models import DocumentMetadataOverrides
from documents.data_models import DocumentSource from documents.data_models import DocumentSource
@ -344,12 +345,21 @@ class GetConsumerMixin:
) -> Generator[ConsumerPlugin, None, None]: ) -> Generator[ConsumerPlugin, None, None]:
# Store this for verification # Store this for verification
self.status = DummyProgressManager(filepath.name, None) self.status = DummyProgressManager(filepath.name, None)
reader = ConsumerPlugin( doc = ConsumableDocument(
ConsumableDocument(
source, source,
original_file=filepath, original_file=filepath,
mailrule_id=mailrule_id or None, mailrule_id=mailrule_id or None,
), )
preflight_plugin = ConsumerPreflightPlugin(
doc,
overrides or DocumentMetadataOverrides(),
self.status, # type: ignore
self.dirs.scratch_dir,
"task-id",
)
preflight_plugin.setup()
reader = ConsumerPlugin(
doc,
overrides or DocumentMetadataOverrides(), overrides or DocumentMetadataOverrides(),
self.status, # type: ignore self.status, # type: ignore
self.dirs.scratch_dir, self.dirs.scratch_dir,
@ -357,6 +367,7 @@ class GetConsumerMixin:
) )
reader.setup() reader.setup()
try: try:
preflight_plugin.run()
yield reader yield reader
finally: finally:
reader.cleanup() reader.cleanup()