mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-10-31 02:27:10 -04:00 
			
		
		
		
	Incremental llm index update, add scheduled llm index task
This commit is contained in:
		
							parent
							
								
									f6a3882199
								
							
						
					
					
						commit
						2481a66544
					
				| @ -1816,3 +1816,10 @@ current backend. This setting is required to be set to use the AI features. | ||||
| : The URL to use for the AI backend. This is required for the Ollama backend only. | ||||
| 
 | ||||
|     Defaults to None. | ||||
| 
 | ||||
| #### [`PAPERLESS_LLM_INDEX_TASK_CRON=<cron expression>`](#PAPERLESS_LLM_INDEX_TASK_CRON) {#PAPERLESS_LLM_INDEX_TASK_CRON} | ||||
| 
 | ||||
| : Configures the schedule to update the AI embeddings for all documents. Only performed if | ||||
| AI is enabled and the LLM embedding backend is set. | ||||
| 
 | ||||
|     Defaults to `10 2 * * *`, once per day. | ||||
|  | ||||
| @ -2,20 +2,20 @@ from django.core.management import BaseCommand | ||||
| from django.db import transaction | ||||
| 
 | ||||
| from documents.management.commands.mixins import ProgressBarMixin | ||||
| from documents.tasks import llm_index_rebuild | ||||
| from documents.tasks import llmindex_index | ||||
| 
 | ||||
| 
 | ||||
| class Command(ProgressBarMixin, BaseCommand): | ||||
|     help = "Manages the LLM-based vector index for Paperless." | ||||
| 
 | ||||
|     def add_arguments(self, parser): | ||||
|         parser.add_argument("command", choices=["rebuild"]) | ||||
|         parser.add_argument("command", choices=["rebuild", "update"]) | ||||
|         self.add_argument_progress_bar_mixin(parser) | ||||
| 
 | ||||
|     def handle(self, *args, **options): | ||||
|         self.handle_progress_bar_mixin(**options) | ||||
|         with transaction.atomic(): | ||||
|             llm_index_rebuild( | ||||
|             llmindex_index( | ||||
|                 progress_bar_disable=self.no_progress_bar, | ||||
|                 rebuild=options["command"] == "rebuild", | ||||
|             ) | ||||
|  | ||||
| @ -56,7 +56,7 @@ from documents.signals.handlers import cleanup_document_deletion | ||||
| from documents.signals.handlers import run_workflows | ||||
| from paperless.ai.indexing import llm_index_add_or_update_document | ||||
| from paperless.ai.indexing import llm_index_remove_document | ||||
| from paperless.ai.indexing import rebuild_llm_index | ||||
| from paperless.ai.indexing import update_llm_index | ||||
| from paperless.config import AIConfig | ||||
| 
 | ||||
| if settings.AUDIT_LOG_ENABLED: | ||||
| @ -532,11 +532,14 @@ def check_scheduled_workflows(): | ||||
|                         ) | ||||
| 
 | ||||
| 
 | ||||
| def llm_index_rebuild(*, progress_bar_disable=False, rebuild=False): | ||||
|     rebuild_llm_index( | ||||
|         progress_bar_disable=progress_bar_disable, | ||||
|         rebuild=rebuild, | ||||
|     ) | ||||
| @shared_task | ||||
| def llmindex_index(*, progress_bar_disable=False, rebuild=False): | ||||
|     ai_config = AIConfig() | ||||
|     if ai_config.llm_index_enabled(): | ||||
|         update_llm_index( | ||||
|             progress_bar_disable=progress_bar_disable, | ||||
|             rebuild=rebuild, | ||||
|         ) | ||||
| 
 | ||||
| 
 | ||||
| @shared_task | ||||
| @ -552,6 +555,6 @@ def remove_document_from_llm_index(document): | ||||
| # TODO: schedule to run periodically | ||||
| @shared_task | ||||
| def rebuild_llm_index_task(): | ||||
|     from paperless.ai.indexing import rebuild_llm_index | ||||
|     from paperless.ai.indexing import update_llm_index | ||||
| 
 | ||||
|     rebuild_llm_index(rebuild=True) | ||||
|     update_llm_index(rebuild=True) | ||||
|  | ||||
| @ -8,6 +8,7 @@ from django.conf import settings | ||||
| from llama_index.core import Document as LlamaDocument | ||||
| from llama_index.core import StorageContext | ||||
| from llama_index.core import VectorStoreIndex | ||||
| from llama_index.core import load_index_from_storage | ||||
| from llama_index.core.node_parser import SimpleNodeParser | ||||
| from llama_index.core.retrievers import VectorIndexRetriever | ||||
| from llama_index.core.schema import BaseNode | ||||
| @ -70,7 +71,7 @@ def build_document_node(document: Document) -> list[BaseNode]: | ||||
| 
 | ||||
|     text = build_llm_index_text(document) | ||||
|     metadata = { | ||||
|         "document_id": document.id, | ||||
|         "document_id": str(document.id), | ||||
|         "title": document.title, | ||||
|         "tags": [t.name for t in document.tags.all()], | ||||
|         "correspondent": document.correspondent.name | ||||
| @ -81,32 +82,29 @@ def build_document_node(document: Document) -> list[BaseNode]: | ||||
|         else None, | ||||
|         "created": document.created.isoformat() if document.created else None, | ||||
|         "added": document.added.isoformat() if document.added else None, | ||||
|         "modified": document.modified.isoformat(), | ||||
|     } | ||||
|     doc = LlamaDocument(text=text, metadata=metadata) | ||||
|     parser = SimpleNodeParser() | ||||
|     return parser.get_nodes_from_documents([doc]) | ||||
| 
 | ||||
| 
 | ||||
| def load_or_build_index(storage_context, embed_model, nodes=None): | ||||
| def load_or_build_index(storage_context: StorageContext, embed_model, nodes=None): | ||||
|     """ | ||||
|     Load an existing VectorStoreIndex if present, | ||||
|     or build a new one using provided nodes if storage is empty. | ||||
|     """ | ||||
|     try: | ||||
|         return load_index_from_storage(storage_context=storage_context) | ||||
|     except ValueError as e: | ||||
|         logger.debug("Failed to load index from storage: %s", e) | ||||
|         if not nodes: | ||||
|             return None | ||||
|         return VectorStoreIndex( | ||||
|             nodes=nodes, | ||||
|             storage_context=storage_context, | ||||
|             embed_model=embed_model, | ||||
|         ) | ||||
|     except ValueError as e: | ||||
|         if "One of nodes, objects, or index_struct must be provided" in str(e): | ||||
|             if not nodes: | ||||
|                 return None | ||||
|             return VectorStoreIndex( | ||||
|                 nodes=nodes, | ||||
|                 storage_context=storage_context, | ||||
|                 embed_model=embed_model, | ||||
|             ) | ||||
|         raise | ||||
| 
 | ||||
| 
 | ||||
| def remove_document_docstore_nodes(document: Document, index: VectorStoreIndex): | ||||
| @ -125,31 +123,74 @@ def remove_document_docstore_nodes(document: Document, index: VectorStoreIndex): | ||||
|         index.docstore.delete_document(node_id) | ||||
| 
 | ||||
| 
 | ||||
| def rebuild_llm_index(*, progress_bar_disable=False, rebuild=False): | ||||
| def update_llm_index(*, progress_bar_disable=False, rebuild=False): | ||||
|     """ | ||||
|     Rebuilds the LLM index from scratch. | ||||
|     Rebuild or update the LLM index. | ||||
|     """ | ||||
|     embed_model = get_embedding_model() | ||||
|     llama_settings.Settings.embed_model = embed_model | ||||
| 
 | ||||
|     storage_context = get_or_create_storage_context(rebuild=rebuild) | ||||
| 
 | ||||
|     nodes = [] | ||||
| 
 | ||||
|     for document in tqdm.tqdm(Document.objects.all(), disable=progress_bar_disable): | ||||
|         document_nodes = build_document_node(document) | ||||
|         nodes.extend(document_nodes) | ||||
|     documents = Document.objects.all() | ||||
|     if not documents.exists(): | ||||
|         logger.warning("No documents found to index.") | ||||
|         return | ||||
| 
 | ||||
|     if not nodes: | ||||
|         raise RuntimeError( | ||||
|             "No nodes to index — check that documents are available and have content.", | ||||
|     if rebuild: | ||||
|         # Rebuild index from scratch | ||||
|         for document in tqdm.tqdm(documents, disable=progress_bar_disable): | ||||
|             document_nodes = build_document_node(document) | ||||
|             nodes.extend(document_nodes) | ||||
| 
 | ||||
|         VectorStoreIndex( | ||||
|             nodes=nodes, | ||||
|             storage_context=storage_context, | ||||
|             embed_model=embed_model, | ||||
|             show_progress=not progress_bar_disable, | ||||
|         ) | ||||
|     else: | ||||
|         # Update existing index | ||||
|         index = load_or_build_index(storage_context, embed_model) | ||||
|         all_node_ids = list(index.docstore.docs.keys()) | ||||
|         existing_nodes = { | ||||
|             node.metadata.get("document_id"): node | ||||
|             for node in index.docstore.get_nodes(all_node_ids) | ||||
|         } | ||||
| 
 | ||||
|         node_ids_to_remove = [] | ||||
| 
 | ||||
|         for document in tqdm.tqdm(documents, disable=progress_bar_disable): | ||||
|             doc_id = str(document.id) | ||||
|             document_modified = document.modified.isoformat() | ||||
| 
 | ||||
|             if doc_id in existing_nodes: | ||||
|                 node = existing_nodes[doc_id] | ||||
|                 node_modified = node.metadata.get("modified") | ||||
| 
 | ||||
|                 if node_modified == document_modified: | ||||
|                     continue | ||||
| 
 | ||||
|                 node_ids_to_remove.append(node.node_id) | ||||
|                 nodes.extend(build_document_node(document)) | ||||
|             else: | ||||
|                 # New document, add it | ||||
|                 nodes.extend(build_document_node(document)) | ||||
| 
 | ||||
|         if node_ids_to_remove or nodes: | ||||
|             logger.info( | ||||
|                 "Updating LLM index with %d new nodes and removing %d old nodes.", | ||||
|                 len(nodes), | ||||
|                 len(node_ids_to_remove), | ||||
|             ) | ||||
|             if node_ids_to_remove: | ||||
|                 index.delete_nodes(node_ids_to_remove) | ||||
|             if nodes: | ||||
|                 index.insert_nodes(nodes) | ||||
|         else: | ||||
|             logger.info("No changes detected, skipping llm index rebuild.") | ||||
| 
 | ||||
|     VectorStoreIndex( | ||||
|         nodes=nodes, | ||||
|         storage_context=storage_context, | ||||
|         embed_model=embed_model, | ||||
|     ) | ||||
|     storage_context.persist(persist_dir=settings.LLM_INDEX_DIR) | ||||
| 
 | ||||
| 
 | ||||
| @ -187,6 +228,7 @@ def llm_index_remove_document(document: Document): | ||||
|     storage_context = get_or_create_storage_context(rebuild=False) | ||||
| 
 | ||||
|     index = load_or_build_index(storage_context, embed_model) | ||||
| 
 | ||||
|     if index is None: | ||||
|         return | ||||
| 
 | ||||
|  | ||||
| @ -201,6 +201,4 @@ class AIConfig(BaseConfig): | ||||
|         self.llm_url = app_config.llm_url or settings.LLM_URL | ||||
| 
 | ||||
|     def llm_index_enabled(self) -> bool: | ||||
|         return ( | ||||
|             self.ai_enabled and self.llm_embedding_backend and self.llm_embedding_model | ||||
|         ) | ||||
|         return self.ai_enabled and self.llm_embedding_backend | ||||
|  | ||||
| @ -234,6 +234,20 @@ def _parse_beat_schedule() -> dict: | ||||
|                 "expires": 59.0 * 60.0, | ||||
|             }, | ||||
|         }, | ||||
|         { | ||||
|             "name": "Rebuild LLM index", | ||||
|             "env_key": "PAPERLESS_LLM_INDEX_TASK_CRON", | ||||
|             # Default daily at 02:10 | ||||
|             "env_default": "10 2 * * *", | ||||
|             "task": "documents.tasks.llmindex_index", | ||||
|             "options": { | ||||
|                 # 1 hour before default schedule sends again | ||||
|                 "expires": 23.0 * 60.0 * 60.0, | ||||
|                 "kwargs": { | ||||
|                     "progress_bar_disable": True, | ||||
|                 }, | ||||
|             }, | ||||
|         }, | ||||
|     ] | ||||
|     for task in tasks: | ||||
|         # Either get the environment setting or use the default | ||||
|  | ||||
| @ -53,7 +53,7 @@ class FakeEmbedding(BaseEmbedding): | ||||
| def test_build_document_node(real_document): | ||||
|     nodes = indexing.build_document_node(real_document) | ||||
|     assert len(nodes) > 0 | ||||
|     assert nodes[0].metadata["document_id"] == real_document.id | ||||
|     assert nodes[0].metadata["document_id"] == str(real_document.id) | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.django_db | ||||
| @ -63,8 +63,11 @@ def test_rebuild_llm_index( | ||||
|     mock_embed_model, | ||||
| ): | ||||
|     with patch("documents.models.Document.objects.all") as mock_all: | ||||
|         mock_all.return_value = [real_document] | ||||
|         indexing.rebuild_llm_index(rebuild=True) | ||||
|         mock_queryset = MagicMock() | ||||
|         mock_queryset.exists.return_value = True | ||||
|         mock_queryset.__iter__.return_value = iter([real_document]) | ||||
|         mock_all.return_value = mock_queryset | ||||
|         indexing.update_llm_index(rebuild=True) | ||||
| 
 | ||||
|         assert any(temp_llm_index_dir.glob("*.json")) | ||||
| 
 | ||||
| @ -75,7 +78,7 @@ def test_add_or_update_document_updates_existing_entry( | ||||
|     real_document, | ||||
|     mock_embed_model, | ||||
| ): | ||||
|     indexing.rebuild_llm_index(rebuild=True) | ||||
|     indexing.update_llm_index(rebuild=True) | ||||
|     indexing.llm_index_add_or_update_document(real_document) | ||||
| 
 | ||||
|     assert any(temp_llm_index_dir.glob("*.json")) | ||||
| @ -87,7 +90,7 @@ def test_remove_document_deletes_node_from_docstore( | ||||
|     real_document, | ||||
|     mock_embed_model, | ||||
| ): | ||||
|     indexing.rebuild_llm_index(rebuild=True) | ||||
|     indexing.update_llm_index(rebuild=True) | ||||
|     indexing.llm_index_add_or_update_document(real_document) | ||||
|     indexing.llm_index_remove_document(real_document) | ||||
| 
 | ||||
| @ -100,10 +103,17 @@ def test_rebuild_llm_index_no_documents( | ||||
|     mock_embed_model, | ||||
| ): | ||||
|     with patch("documents.models.Document.objects.all") as mock_all: | ||||
|         mock_all.return_value = [] | ||||
|         mock_queryset = MagicMock() | ||||
|         mock_queryset.exists.return_value = False | ||||
|         mock_queryset.__iter__.return_value = iter([]) | ||||
|         mock_all.return_value = mock_queryset | ||||
| 
 | ||||
|         with pytest.raises(RuntimeError, match="No nodes to index"): | ||||
|             indexing.rebuild_llm_index(rebuild=True) | ||||
|         # check log message | ||||
|         with patch("paperless.ai.indexing.logger") as mock_logger: | ||||
|             indexing.update_llm_index(rebuild=True) | ||||
|             mock_logger.warning.assert_called_once_with( | ||||
|                 "No documents found to index.", | ||||
|             ) | ||||
| 
 | ||||
| 
 | ||||
| def test_query_similar_documents( | ||||
|  | ||||
| @ -161,6 +161,7 @@ class TestCeleryScheduleParsing(TestCase): | ||||
|     SANITY_EXPIRE_TIME = ((7.0 * 24.0) - 1.0) * 60.0 * 60.0 | ||||
|     EMPTY_TRASH_EXPIRE_TIME = 23.0 * 60.0 * 60.0 | ||||
|     RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME = 59.0 * 60.0 | ||||
|     LLM_INDEX_EXPIRE_TIME = 23.0 * 60.0 * 60.0 | ||||
| 
 | ||||
|     def test_schedule_configuration_default(self): | ||||
|         """ | ||||
| @ -205,6 +206,16 @@ class TestCeleryScheduleParsing(TestCase): | ||||
|                     "schedule": crontab(minute="5", hour="*/1"), | ||||
|                     "options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME}, | ||||
|                 }, | ||||
|                 "Rebuild LLM index": { | ||||
|                     "task": "documents.tasks.llmindex_index", | ||||
|                     "schedule": crontab(minute=10, hour=2), | ||||
|                     "options": { | ||||
|                         "expires": self.LLM_INDEX_EXPIRE_TIME, | ||||
|                         "kwargs": { | ||||
|                             "progress_bar_disable": True, | ||||
|                         }, | ||||
|                     }, | ||||
|                 }, | ||||
|             }, | ||||
|             schedule, | ||||
|         ) | ||||
| @ -257,6 +268,16 @@ class TestCeleryScheduleParsing(TestCase): | ||||
|                     "schedule": crontab(minute="5", hour="*/1"), | ||||
|                     "options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME}, | ||||
|                 }, | ||||
|                 "Rebuild LLM index": { | ||||
|                     "task": "documents.tasks.llmindex_index", | ||||
|                     "schedule": crontab(minute=10, hour=2), | ||||
|                     "options": { | ||||
|                         "expires": self.LLM_INDEX_EXPIRE_TIME, | ||||
|                         "kwargs": { | ||||
|                             "progress_bar_disable": True, | ||||
|                         }, | ||||
|                     }, | ||||
|                 }, | ||||
|             }, | ||||
|             schedule, | ||||
|         ) | ||||
| @ -301,6 +322,16 @@ class TestCeleryScheduleParsing(TestCase): | ||||
|                     "schedule": crontab(minute="5", hour="*/1"), | ||||
|                     "options": {"expires": self.RUN_SCHEDULED_WORKFLOWS_EXPIRE_TIME}, | ||||
|                 }, | ||||
|                 "Rebuild LLM index": { | ||||
|                     "task": "documents.tasks.llmindex_index", | ||||
|                     "schedule": crontab(minute=10, hour=2), | ||||
|                     "options": { | ||||
|                         "expires": self.LLM_INDEX_EXPIRE_TIME, | ||||
|                         "kwargs": { | ||||
|                             "progress_bar_disable": True, | ||||
|                         }, | ||||
|                     }, | ||||
|                 }, | ||||
|             }, | ||||
|             schedule, | ||||
|         ) | ||||
| @ -323,6 +354,7 @@ class TestCeleryScheduleParsing(TestCase): | ||||
|                 "PAPERLESS_INDEX_TASK_CRON": "disable", | ||||
|                 "PAPERLESS_EMPTY_TRASH_TASK_CRON": "disable", | ||||
|                 "PAPERLESS_WORKFLOW_SCHEDULED_TASK_CRON": "disable", | ||||
|                 "PAPERLESS_LLM_INDEX_TASK_CRON": "disable", | ||||
|             }, | ||||
|         ): | ||||
|             schedule = _parse_beat_schedule() | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user