mirror of
				https://github.com/paperless-ngx/paperless-ngx.git
				synced 2025-11-03 19:17:13 -05:00 
			
		
		
		
	Merge pull request #2827 from paperless-ngx/feature-owner-aware-unique-model-names
Feature: owner-aware unique model name constraint
This commit is contained in:
		
						commit
						2042b85056
					
				@ -0,0 +1,107 @@
 | 
			
		||||
# Generated by Django 4.1.5 on 2023-03-04 22:33
 | 
			
		||||
 | 
			
		||||
from django.db import migrations, models
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Migration(migrations.Migration):
 | 
			
		||||
 | 
			
		||||
    dependencies = [
 | 
			
		||||
        ("documents", "1032_alter_correspondent_matching_algorithm_and_more"),
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    operations = [
 | 
			
		||||
        migrations.AlterModelOptions(
 | 
			
		||||
            name="documenttype",
 | 
			
		||||
            options={
 | 
			
		||||
                "ordering": ("name",),
 | 
			
		||||
                "verbose_name": "document type",
 | 
			
		||||
                "verbose_name_plural": "document types",
 | 
			
		||||
            },
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AlterModelOptions(
 | 
			
		||||
            name="tag",
 | 
			
		||||
            options={
 | 
			
		||||
                "ordering": ("name",),
 | 
			
		||||
                "verbose_name": "tag",
 | 
			
		||||
                "verbose_name_plural": "tags",
 | 
			
		||||
            },
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AlterField(
 | 
			
		||||
            model_name="correspondent",
 | 
			
		||||
            name="name",
 | 
			
		||||
            field=models.CharField(max_length=128, verbose_name="name"),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AlterField(
 | 
			
		||||
            model_name="documenttype",
 | 
			
		||||
            name="name",
 | 
			
		||||
            field=models.CharField(max_length=128, verbose_name="name"),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AlterField(
 | 
			
		||||
            model_name="storagepath",
 | 
			
		||||
            name="name",
 | 
			
		||||
            field=models.CharField(max_length=128, verbose_name="name"),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AlterField(
 | 
			
		||||
            model_name="tag",
 | 
			
		||||
            name="name",
 | 
			
		||||
            field=models.CharField(max_length=128, verbose_name="name"),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AddConstraint(
 | 
			
		||||
            model_name="correspondent",
 | 
			
		||||
            constraint=models.UniqueConstraint(
 | 
			
		||||
                fields=("name", "owner"),
 | 
			
		||||
                name="documents_correspondent_unique_name_owner",
 | 
			
		||||
            ),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AddConstraint(
 | 
			
		||||
            model_name="correspondent",
 | 
			
		||||
            constraint=models.UniqueConstraint(
 | 
			
		||||
                condition=models.Q(("owner__isnull", True)),
 | 
			
		||||
                fields=("name",),
 | 
			
		||||
                name="documents_correspondent_name_uniq",
 | 
			
		||||
            ),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AddConstraint(
 | 
			
		||||
            model_name="documenttype",
 | 
			
		||||
            constraint=models.UniqueConstraint(
 | 
			
		||||
                fields=("name", "owner"),
 | 
			
		||||
                name="documents_documenttype_unique_name_owner",
 | 
			
		||||
            ),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AddConstraint(
 | 
			
		||||
            model_name="documenttype",
 | 
			
		||||
            constraint=models.UniqueConstraint(
 | 
			
		||||
                condition=models.Q(("owner__isnull", True)),
 | 
			
		||||
                fields=("name",),
 | 
			
		||||
                name="documents_documenttype_name_uniq",
 | 
			
		||||
            ),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AddConstraint(
 | 
			
		||||
            model_name="storagepath",
 | 
			
		||||
            constraint=models.UniqueConstraint(
 | 
			
		||||
                fields=("name", "owner"), name="documents_storagepath_unique_name_owner"
 | 
			
		||||
            ),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AddConstraint(
 | 
			
		||||
            model_name="storagepath",
 | 
			
		||||
            constraint=models.UniqueConstraint(
 | 
			
		||||
                condition=models.Q(("owner__isnull", True)),
 | 
			
		||||
                fields=("name",),
 | 
			
		||||
                name="documents_storagepath_name_uniq",
 | 
			
		||||
            ),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AddConstraint(
 | 
			
		||||
            model_name="tag",
 | 
			
		||||
            constraint=models.UniqueConstraint(
 | 
			
		||||
                fields=("name", "owner"), name="documents_tag_unique_name_owner"
 | 
			
		||||
            ),
 | 
			
		||||
        ),
 | 
			
		||||
        migrations.AddConstraint(
 | 
			
		||||
            model_name="tag",
 | 
			
		||||
            constraint=models.UniqueConstraint(
 | 
			
		||||
                condition=models.Q(("owner__isnull", True)),
 | 
			
		||||
                fields=("name",),
 | 
			
		||||
                name="documents_tag_name_uniq",
 | 
			
		||||
            ),
 | 
			
		||||
        ),
 | 
			
		||||
    ]
 | 
			
		||||
@ -23,7 +23,20 @@ ALL_STATES = sorted(states.ALL_STATES)
 | 
			
		||||
TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MatchingModel(models.Model):
 | 
			
		||||
class ModelWithOwner(models.Model):
 | 
			
		||||
    owner = models.ForeignKey(
 | 
			
		||||
        User,
 | 
			
		||||
        blank=True,
 | 
			
		||||
        null=True,
 | 
			
		||||
        on_delete=models.SET_NULL,
 | 
			
		||||
        verbose_name=_("owner"),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    class Meta:
 | 
			
		||||
        abstract = True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MatchingModel(ModelWithOwner):
 | 
			
		||||
 | 
			
		||||
    MATCH_NONE = 0
 | 
			
		||||
    MATCH_ANY = 1
 | 
			
		||||
@ -43,7 +56,7 @@ class MatchingModel(models.Model):
 | 
			
		||||
        (MATCH_AUTO, _("Automatic")),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    name = models.CharField(_("name"), max_length=128, unique=True)
 | 
			
		||||
    name = models.CharField(_("name"), max_length=128)
 | 
			
		||||
 | 
			
		||||
    match = models.CharField(_("match"), max_length=256, blank=True)
 | 
			
		||||
 | 
			
		||||
@ -58,32 +71,29 @@ class MatchingModel(models.Model):
 | 
			
		||||
    class Meta:
 | 
			
		||||
        abstract = True
 | 
			
		||||
        ordering = ("name",)
 | 
			
		||||
        constraints = [
 | 
			
		||||
            models.UniqueConstraint(
 | 
			
		||||
                fields=["name", "owner"],
 | 
			
		||||
                name="%(app_label)s_%(class)s_unique_name_owner",
 | 
			
		||||
            ),
 | 
			
		||||
            models.UniqueConstraint(
 | 
			
		||||
                name="%(app_label)s_%(class)s_name_uniq",
 | 
			
		||||
                fields=["name"],
 | 
			
		||||
                condition=models.Q(owner__isnull=True),
 | 
			
		||||
            ),
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
    def __str__(self):
 | 
			
		||||
        return self.name
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ModelWithOwner(models.Model):
 | 
			
		||||
    owner = models.ForeignKey(
 | 
			
		||||
        User,
 | 
			
		||||
        blank=True,
 | 
			
		||||
        null=True,
 | 
			
		||||
        on_delete=models.SET_NULL,
 | 
			
		||||
        verbose_name=_("owner"),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    class Meta:
 | 
			
		||||
        abstract = True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Correspondent(MatchingModel, ModelWithOwner):
 | 
			
		||||
    class Meta:
 | 
			
		||||
        ordering = ("name",)
 | 
			
		||||
class Correspondent(MatchingModel):
 | 
			
		||||
    class Meta(MatchingModel.Meta):
 | 
			
		||||
        verbose_name = _("correspondent")
 | 
			
		||||
        verbose_name_plural = _("correspondents")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class Tag(MatchingModel, ModelWithOwner):
 | 
			
		||||
class Tag(MatchingModel):
 | 
			
		||||
 | 
			
		||||
    color = models.CharField(_("color"), max_length=7, default="#a6cee3")
 | 
			
		||||
 | 
			
		||||
@ -96,25 +106,24 @@ class Tag(MatchingModel, ModelWithOwner):
 | 
			
		||||
        ),
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    class Meta:
 | 
			
		||||
    class Meta(MatchingModel.Meta):
 | 
			
		||||
        verbose_name = _("tag")
 | 
			
		||||
        verbose_name_plural = _("tags")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class DocumentType(MatchingModel, ModelWithOwner):
 | 
			
		||||
    class Meta:
 | 
			
		||||
class DocumentType(MatchingModel):
 | 
			
		||||
    class Meta(MatchingModel.Meta):
 | 
			
		||||
        verbose_name = _("document type")
 | 
			
		||||
        verbose_name_plural = _("document types")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StoragePath(MatchingModel, ModelWithOwner):
 | 
			
		||||
class StoragePath(MatchingModel):
 | 
			
		||||
    path = models.CharField(
 | 
			
		||||
        _("path"),
 | 
			
		||||
        max_length=512,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    class Meta:
 | 
			
		||||
        ordering = ("name",)
 | 
			
		||||
    class Meta(MatchingModel.Meta):
 | 
			
		||||
        verbose_name = _("storage path")
 | 
			
		||||
        verbose_name_plural = _("storage paths")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -68,6 +68,25 @@ class MatchingModelSerializer(serializers.ModelSerializer):
 | 
			
		||||
 | 
			
		||||
    slug = SerializerMethodField()
 | 
			
		||||
 | 
			
		||||
    def validate(self, data):
 | 
			
		||||
        # see https://github.com/encode/django-rest-framework/issues/7173
 | 
			
		||||
        name = data["name"] if "name" in data else self.instance.name
 | 
			
		||||
        owner = (
 | 
			
		||||
            data["owner"]
 | 
			
		||||
            if "owner" in data
 | 
			
		||||
            else self.user
 | 
			
		||||
            if hasattr(self, "user")
 | 
			
		||||
            else None
 | 
			
		||||
        )
 | 
			
		||||
        if ("name" in data or "owner" in data) and self.Meta.model.objects.filter(
 | 
			
		||||
            name=name,
 | 
			
		||||
            owner=owner,
 | 
			
		||||
        ).exists():
 | 
			
		||||
            raise serializers.ValidationError(
 | 
			
		||||
                {"error": "Object violates owner / name unique constraint"},
 | 
			
		||||
            )
 | 
			
		||||
        return data
 | 
			
		||||
 | 
			
		||||
    def validate_match(self, match):
 | 
			
		||||
        if (
 | 
			
		||||
            "matching_algorithm" in self.initial_data
 | 
			
		||||
@ -186,6 +205,17 @@ class OwnedObjectSerializer(serializers.ModelSerializer, SetPermissionsMixin):
 | 
			
		||||
    def update(self, instance, validated_data):
 | 
			
		||||
        if "set_permissions" in validated_data:
 | 
			
		||||
            self._set_permissions(validated_data["set_permissions"], instance)
 | 
			
		||||
        if "owner" in validated_data and "name" in self.Meta.fields:
 | 
			
		||||
            name = validated_data["name"] if "name" in validated_data else instance.name
 | 
			
		||||
            not_unique = (
 | 
			
		||||
                self.Meta.model.objects.exclude(pk=instance.pk)
 | 
			
		||||
                .filter(owner=validated_data["owner"], name=name)
 | 
			
		||||
                .exists()
 | 
			
		||||
            )
 | 
			
		||||
            if not_unique:
 | 
			
		||||
                raise serializers.ValidationError(
 | 
			
		||||
                    {"error": "Object violates owner / name unique constraint"},
 | 
			
		||||
                )
 | 
			
		||||
        return super().update(instance, validated_data)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -20,6 +20,8 @@ except ImportError:
 | 
			
		||||
    import backports.zoneinfo as zoneinfo
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
from django.db import transaction
 | 
			
		||||
from django.db.utils import IntegrityError
 | 
			
		||||
from django.conf import settings
 | 
			
		||||
from django.contrib.auth.models import Group
 | 
			
		||||
from django.contrib.auth.models import Permission
 | 
			
		||||
@ -1844,6 +1846,100 @@ class TestDocumentApi(DirectoriesMixin, APITestCase):
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
 | 
			
		||||
 | 
			
		||||
    def test_tag_unique_name_and_owner(self):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
            - Multiple users
 | 
			
		||||
            - Tags owned by particular users
 | 
			
		||||
        WHEN:
 | 
			
		||||
            - API request for creating items which are unique by name and owner
 | 
			
		||||
        THEN:
 | 
			
		||||
            - Unique items are created
 | 
			
		||||
            - Non-unique items are not allowed
 | 
			
		||||
        """
 | 
			
		||||
        user1 = User.objects.create_user(username="test1")
 | 
			
		||||
        user1.user_permissions.add(*Permission.objects.filter(codename="add_tag"))
 | 
			
		||||
        user1.save()
 | 
			
		||||
 | 
			
		||||
        user2 = User.objects.create_user(username="test2")
 | 
			
		||||
        user2.user_permissions.add(*Permission.objects.filter(codename="add_tag"))
 | 
			
		||||
        user2.save()
 | 
			
		||||
 | 
			
		||||
        # User 1 creates tag 1 owned by user 1 by default
 | 
			
		||||
        # No issue
 | 
			
		||||
        self.client.force_authenticate(user1)
 | 
			
		||||
        response = self.client.post("/api/tags/", {"name": "tag 1"}, format="json")
 | 
			
		||||
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
 | 
			
		||||
 | 
			
		||||
        # User 2 creates tag 1 owned by user 2 by default
 | 
			
		||||
        # No issue
 | 
			
		||||
        self.client.force_authenticate(user2)
 | 
			
		||||
        response = self.client.post("/api/tags/", {"name": "tag 1"}, format="json")
 | 
			
		||||
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
 | 
			
		||||
 | 
			
		||||
        # User 2 creates tag 2 owned by user 1
 | 
			
		||||
        # No issue
 | 
			
		||||
        self.client.force_authenticate(user2)
 | 
			
		||||
        response = self.client.post(
 | 
			
		||||
            "/api/tags/",
 | 
			
		||||
            {"name": "tag 2", "owner": user1.pk},
 | 
			
		||||
            format="json",
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
 | 
			
		||||
 | 
			
		||||
        # User 1 creates tag 2 owned by user 1 by default
 | 
			
		||||
        # Not allowed, would create tag2/user1 which already exists
 | 
			
		||||
        self.client.force_authenticate(user1)
 | 
			
		||||
        response = self.client.post(
 | 
			
		||||
            "/api/tags/",
 | 
			
		||||
            {"name": "tag 2"},
 | 
			
		||||
            format="json",
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
 | 
			
		||||
 | 
			
		||||
        # User 1 creates tag 2 owned by user 1
 | 
			
		||||
        # Not allowed, would create tag2/user1 which already exists
 | 
			
		||||
        response = self.client.post(
 | 
			
		||||
            "/api/tags/",
 | 
			
		||||
            {"name": "tag 2", "owner": user1.pk},
 | 
			
		||||
            format="json",
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
 | 
			
		||||
 | 
			
		||||
    def test_tag_unique_name_and_owner_enforced_on_update(self):
 | 
			
		||||
        """
 | 
			
		||||
        GIVEN:
 | 
			
		||||
            - Multiple users
 | 
			
		||||
            - Tags owned by particular users
 | 
			
		||||
        WHEN:
 | 
			
		||||
            - API request for to update tag in such as way as makes it non-unqiue
 | 
			
		||||
        THEN:
 | 
			
		||||
            - Unique items are created
 | 
			
		||||
            - Non-unique items are not allowed on update
 | 
			
		||||
        """
 | 
			
		||||
        user1 = User.objects.create_user(username="test1")
 | 
			
		||||
        user1.user_permissions.add(*Permission.objects.filter(codename="change_tag"))
 | 
			
		||||
        user1.save()
 | 
			
		||||
 | 
			
		||||
        user2 = User.objects.create_user(username="test2")
 | 
			
		||||
        user2.user_permissions.add(*Permission.objects.filter(codename="change_tag"))
 | 
			
		||||
        user2.save()
 | 
			
		||||
 | 
			
		||||
        # Create name tag 1 owned by user 1
 | 
			
		||||
        # Create name tag 1 owned by user 2
 | 
			
		||||
        Tag.objects.create(name="tag 1", owner=user1)
 | 
			
		||||
        tag2 = Tag.objects.create(name="tag 1", owner=user2)
 | 
			
		||||
 | 
			
		||||
        # User 2 attempts to change the owner of tag to user 1
 | 
			
		||||
        # Not allowed, would change to tag1/user1 which already exists
 | 
			
		||||
        self.client.force_authenticate(user2)
 | 
			
		||||
        response = self.client.patch(
 | 
			
		||||
            f"/api/tags/{tag2.id}/",
 | 
			
		||||
            {"owner": user1.pk},
 | 
			
		||||
            format="json",
 | 
			
		||||
        )
 | 
			
		||||
        self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestDocumentApiV2(DirectoriesMixin, APITestCase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user