mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-04 03:27:12 -05:00 
			
		
		
		
	Respect superuser for advanced queries, test coverage for object perms
This commit is contained in:
		
							parent
							
								
									d2a8076596
								
							
						
					
					
						commit
						e275a2736a
					
				@ -225,15 +225,19 @@ class DelayedQuery:
 | 
			
		||||
 | 
			
		||||
        user_criterias = [query.Term("has_owner", False)]
 | 
			
		||||
        if "user" in self.query_params:
 | 
			
		||||
            if self.query_params["is_superuser"]:  # superusers see all docs
 | 
			
		||||
                user_criterias = []
 | 
			
		||||
            else:
 | 
			
		||||
                user_criterias.append(query.Term("owner_id", self.query_params["user"]))
 | 
			
		||||
                user_criterias.append(
 | 
			
		||||
                    query.Term("viewer_id", str(self.query_params["user"])),
 | 
			
		||||
                )
 | 
			
		||||
        if len(criterias) > 0:
 | 
			
		||||
            if len(user_criterias) > 0:
 | 
			
		||||
                criterias.append(query.Or(user_criterias))
 | 
			
		||||
            return query.And(criterias)
 | 
			
		||||
        else:
 | 
			
		||||
            return query.Or(user_criterias)
 | 
			
		||||
            return query.Or(user_criterias) if len(user_criterias) > 0 else None
 | 
			
		||||
 | 
			
		||||
    def _get_query_sortedby(self):
 | 
			
		||||
        if "ordering" not in self.query_params:
 | 
			
		||||
 | 
			
		||||
@ -27,6 +27,7 @@ from django.contrib.auth.models import Permission
 | 
			
		||||
from django.contrib.auth.models import User
 | 
			
		||||
from django.test import override_settings
 | 
			
		||||
from django.utils import timezone
 | 
			
		||||
from guardian.shortcuts import assign_perm
 | 
			
		||||
from rest_framework import status
 | 
			
		||||
from rest_framework.test import APITestCase
 | 
			
		||||
from whoosh.writing import AsyncWriter
 | 
			
		||||
@ -253,8 +254,6 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
 | 
			
		||||
        response = self.client.get(f"/api/documents/{doc.pk}/thumb/")
 | 
			
		||||
        self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
 | 
			
		||||
 | 
			
		||||
        from guardian.shortcuts import assign_perm
 | 
			
		||||
 | 
			
		||||
        assign_perm("view_document", user2, doc)
 | 
			
		||||
 | 
			
		||||
        response = self.client.get(f"/api/documents/{doc.pk}/download/")
 | 
			
		||||
@ -1064,6 +1063,92 @@ class TestDocumentApi(DirectoriesMixin, DocumentConsumeDelayMixin, APITestCase):
 | 
			
		||||
            ),
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    def test_search_filtering_respect_owner(self):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
            - Documents with owners set & without
 | 
			
		||||
        WHEN:
 | 
			
		||||
            - API reuqest for advanced query (search) is made by non-superuser
 | 
			
		||||
            - API reuqest for advanced query (search) is made by superuser
 | 
			
		||||
        THEN:
 | 
			
		||||
            - Only owned docs are returned for regular users
 | 
			
		||||
            - All docs are returned for superuser
 | 
			
		||||
        """
 | 
			
		||||
        superuser = User.objects.create_superuser("superuser")
 | 
			
		||||
        u1 = User.objects.create_user("user1")
 | 
			
		||||
        u2 = User.objects.create_user("user2")
 | 
			
		||||
        u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
 | 
			
		||||
        u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
 | 
			
		||||
 | 
			
		||||
        Document.objects.create(checksum="1", content="test 1", owner=u1)
 | 
			
		||||
        Document.objects.create(checksum="2", content="test 2", owner=u2)
 | 
			
		||||
        Document.objects.create(checksum="3", content="test 3", owner=u2)
 | 
			
		||||
        Document.objects.create(checksum="4", content="test 4")
 | 
			
		||||
 | 
			
		||||
        with AsyncWriter(index.open_index()) as writer:
 | 
			
		||||
            for doc in Document.objects.all():
 | 
			
		||||
                index.update_document(writer, doc)
 | 
			
		||||
 | 
			
		||||
        self.client.force_authenticate(user=u1)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test")
 | 
			
		||||
        self.assertEqual(r.data["count"], 2)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
 | 
			
		||||
        self.assertEqual(r.data["count"], 2)
 | 
			
		||||
 | 
			
		||||
        self.client.force_authenticate(user=u2)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test")
 | 
			
		||||
        self.assertEqual(r.data["count"], 3)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
 | 
			
		||||
        self.assertEqual(r.data["count"], 3)
 | 
			
		||||
 | 
			
		||||
        self.client.force_authenticate(user=superuser)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test")
 | 
			
		||||
        self.assertEqual(r.data["count"], 4)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
 | 
			
		||||
        self.assertEqual(r.data["count"], 4)
 | 
			
		||||
 | 
			
		||||
    def test_search_filtering_with_object_perms(self):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
            - Documents with granted view permissions to others
 | 
			
		||||
        WHEN:
 | 
			
		||||
            - API reuqest for advanced query (search) is made by user
 | 
			
		||||
        THEN:
 | 
			
		||||
            - Only docs with granted view permissions are returned
 | 
			
		||||
        """
 | 
			
		||||
        u1 = User.objects.create_user("user1")
 | 
			
		||||
        u2 = User.objects.create_user("user2")
 | 
			
		||||
        u1.user_permissions.add(*Permission.objects.filter(codename="view_document"))
 | 
			
		||||
        u2.user_permissions.add(*Permission.objects.filter(codename="view_document"))
 | 
			
		||||
 | 
			
		||||
        Document.objects.create(checksum="1", content="test 1", owner=u1)
 | 
			
		||||
        d2 = Document.objects.create(checksum="2", content="test 2", owner=u2)
 | 
			
		||||
        d3 = Document.objects.create(checksum="3", content="test 3", owner=u2)
 | 
			
		||||
        Document.objects.create(checksum="4", content="test 4")
 | 
			
		||||
 | 
			
		||||
        with AsyncWriter(index.open_index()) as writer:
 | 
			
		||||
            for doc in Document.objects.all():
 | 
			
		||||
                index.update_document(writer, doc)
 | 
			
		||||
 | 
			
		||||
        self.client.force_authenticate(user=u1)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test")
 | 
			
		||||
        self.assertEqual(r.data["count"], 2)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
 | 
			
		||||
        self.assertEqual(r.data["count"], 2)
 | 
			
		||||
 | 
			
		||||
        assign_perm("view_document", u1, d2)
 | 
			
		||||
        assign_perm("view_document", u1, d3)
 | 
			
		||||
 | 
			
		||||
        with AsyncWriter(index.open_index()) as writer:
 | 
			
		||||
            for doc in [d2, d3]:
 | 
			
		||||
                index.update_document(writer, doc)
 | 
			
		||||
 | 
			
		||||
        self.client.force_authenticate(user=u1)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test")
 | 
			
		||||
        self.assertEqual(r.data["count"], 4)
 | 
			
		||||
        r = self.client.get("/api/documents/?query=test&document_type__id__none=1")
 | 
			
		||||
        self.assertEqual(r.data["count"], 4)
 | 
			
		||||
 | 
			
		||||
    def test_search_sorting(self):
 | 
			
		||||
        c1 = Correspondent.objects.create(name="corres Ax")
 | 
			
		||||
        c2 = Correspondent.objects.create(name="corres Cx")
 | 
			
		||||
 | 
			
		||||
@ -604,6 +604,9 @@ class UnifiedSearchViewSet(DocumentViewSet):
 | 
			
		||||
                # pass user to query for perms
 | 
			
		||||
                self.request.query_params._mutable = True
 | 
			
		||||
                self.request.query_params["user"] = self.request.user.id
 | 
			
		||||
                self.request.query_params[
 | 
			
		||||
                    "is_superuser"
 | 
			
		||||
                ] = self.request.user.is_superuser
 | 
			
		||||
                self.request.query_params._mutable = False
 | 
			
		||||
 | 
			
		||||
            if "query" in self.request.query_params:
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user