paperless-ngx/src/paperless/tests/test_websockets.py
dependabot[bot] 4ddac79f0f
Chore(deps): Bump the small-changes group across 1 directory with 3 updates (#10880)
* Chore(deps): Bump the small-changes group across 1 directory with 3 updates

Bumps the small-changes group with 3 updates in the / directory: [ocrmypdf](https://github.com/ocrmypdf/OCRmyPDF), [mkdocs-material](https://github.com/squidfunk/mkdocs-material) and [ruff](https://github.com/astral-sh/ruff).


Updates `ocrmypdf` from 16.10.4 to 16.11.0
- [Release notes](https://github.com/ocrmypdf/OCRmyPDF/releases)
- [Changelog](https://github.com/ocrmypdf/OCRmyPDF/blob/main/docs/release_notes.md)
- [Commits](https://github.com/ocrmypdf/OCRmyPDF/compare/v16.10.4...v16.11.0)

Updates `mkdocs-material` from 9.6.19 to 9.6.20
- [Release notes](https://github.com/squidfunk/mkdocs-material/releases)
- [Changelog](https://github.com/squidfunk/mkdocs-material/blob/master/CHANGELOG)
- [Commits](https://github.com/squidfunk/mkdocs-material/compare/9.6.19...9.6.20)

Updates `ruff` from 0.12.12 to 0.13.0
- [Release notes](https://github.com/astral-sh/ruff/releases)
- [Changelog](https://github.com/astral-sh/ruff/blob/main/CHANGELOG.md)
- [Commits](https://github.com/astral-sh/ruff/compare/0.12.12...0.13.0)

---
updated-dependencies:
- dependency-name: ocrmypdf
  dependency-version: 16.11.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: small-changes
- dependency-name: mkdocs-material
  dependency-version: 9.6.20
  dependency-type: direct:production
  update-type: version-update:semver-patch
  dependency-group: small-changes
- dependency-name: ruff
  dependency-version: 0.13.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: small-changes
...

Signed-off-by: dependabot[bot] <support@github.com>

* Applies the new Ruff rule for unpacking

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Trenton H <797416+stumpylog@users.noreply.github.com>
2025-09-17 13:16:34 -07:00

208 lines
6.7 KiB
Python

from unittest import mock
from channels.layers import get_channel_layer
from channels.testing import WebsocketCommunicator
from django.test import TestCase
from django.test import override_settings
from documents.plugins.helpers import DocumentsStatusManager
from documents.plugins.helpers import ProgressManager
from documents.plugins.helpers import ProgressStatusOptions
from paperless.asgi import application
TEST_CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels.layers.InMemoryChannelLayer",
},
}
@override_settings(CHANNEL_LAYERS=TEST_CHANNEL_LAYERS)
class TestWebSockets(TestCase):
async def test_no_auth(self):
communicator = WebsocketCommunicator(application, "/ws/status/")
connected, _ = await communicator.connect()
self.assertFalse(connected)
await communicator.disconnect()
@mock.patch("paperless.consumers.StatusConsumer.close")
@mock.patch("paperless.consumers.StatusConsumer._authenticated")
async def test_close_on_no_auth(self, _authenticated, mock_close):
_authenticated.return_value = True
communicator = WebsocketCommunicator(application, "/ws/status/")
connected, _ = await communicator.connect()
self.assertTrue(connected)
message = {"type": "status_update", "data": {"task_id": "test"}}
_authenticated.return_value = False
channel_layer = get_channel_layer()
await channel_layer.group_send(
"status_updates",
message,
)
await communicator.receive_nothing()
mock_close.assert_called_once()
mock_close.reset_mock()
message = {"type": "documents_deleted", "data": {"documents": [1, 2, 3]}}
await channel_layer.group_send(
"status_updates",
message,
)
await communicator.receive_nothing()
mock_close.assert_called_once()
@mock.patch("paperless.consumers.StatusConsumer._authenticated")
async def test_auth(self, _authenticated):
_authenticated.return_value = True
communicator = WebsocketCommunicator(application, "/ws/status/")
connected, _ = await communicator.connect()
self.assertTrue(connected)
await communicator.disconnect()
@mock.patch("paperless.consumers.StatusConsumer._authenticated")
async def test_receive_status_update(self, _authenticated):
_authenticated.return_value = True
communicator = WebsocketCommunicator(application, "/ws/status/")
connected, _ = await communicator.connect()
self.assertTrue(connected)
message = {"type": "status_update", "data": {"task_id": "test"}}
channel_layer = get_channel_layer()
await channel_layer.group_send(
"status_updates",
message,
)
response = await communicator.receive_json_from()
self.assertEqual(response, message)
await communicator.disconnect()
async def test_status_update_check_perms(self):
communicator = WebsocketCommunicator(application, "/ws/status/")
communicator.scope["user"] = mock.Mock()
communicator.scope["user"].is_authenticated = True
communicator.scope["user"].is_superuser = False
communicator.scope["user"].id = 1
connected, _ = await communicator.connect()
self.assertTrue(connected)
# Test as owner
message = {"type": "status_update", "data": {"task_id": "test", "owner_id": 1}}
channel_layer = get_channel_layer()
await channel_layer.group_send(
"status_updates",
message,
)
response = await communicator.receive_json_from()
self.assertEqual(response, message)
# Test with a group that the user belongs to
communicator.scope["user"].groups.filter.return_value.exists.return_value = True
message = {
"type": "status_update",
"data": {"task_id": "test", "owner_id": 2, "groups_can_view": [1]},
}
channel_layer = get_channel_layer()
await channel_layer.group_send(
"status_updates",
message,
)
response = await communicator.receive_json_from()
self.assertEqual(response, message)
# Test with a different owner_id
message = {"type": "status_update", "data": {"task_id": "test", "owner_id": 2}}
channel_layer = get_channel_layer()
await channel_layer.group_send(
"status_updates",
message,
)
response = await communicator.receive_nothing()
self.assertNotEqual(response, message)
await communicator.disconnect()
@mock.patch("paperless.consumers.StatusConsumer._authenticated")
async def test_receive_documents_deleted(self, _authenticated):
_authenticated.return_value = True
communicator = WebsocketCommunicator(application, "/ws/status/")
connected, _ = await communicator.connect()
self.assertTrue(connected)
message = {"type": "documents_deleted", "data": {"documents": [1, 2, 3]}}
channel_layer = get_channel_layer()
await channel_layer.group_send(
"status_updates",
message,
)
response = await communicator.receive_json_from()
self.assertEqual(response, message)
await communicator.disconnect()
@mock.patch("channels.layers.InMemoryChannelLayer.group_send")
def test_manager_send_progress(self, mock_group_send):
with ProgressManager(task_id="test") as manager:
manager.send_progress(
ProgressStatusOptions.STARTED,
"Test message",
1,
10,
extra_args={
"foo": "bar",
},
)
message = mock_group_send.call_args[0][1]
self.assertEqual(
message,
{
"type": "status_update",
"data": {
"filename": None,
"task_id": "test",
"current_progress": 1,
"max_progress": 10,
"status": ProgressStatusOptions.STARTED,
"message": "Test message",
"foo": "bar",
},
},
)
@mock.patch("channels.layers.InMemoryChannelLayer.group_send")
def test_manager_send_documents_deleted(self, mock_group_send):
with DocumentsStatusManager() as manager:
manager.send_documents_deleted([1, 2, 3])
message = mock_group_send.call_args[0][1]
self.assertEqual(
message,
{
"type": "documents_deleted",
"data": {
"documents": [1, 2, 3],
},
},
)