mirror of
				https://github.com/immich-app/immich.git
				synced 2025-11-04 03:27:09 -05:00 
			
		
		
		
	Coming from nixos, this test fails when cuda is enabled. https://github.com/NixOS/nixpkgs/pull/382896 Solution as proposed by @mertalev
		
			
				
	
	
		
			1100 lines
		
	
	
		
			48 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			1100 lines
		
	
	
		
			48 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import json
 | 
						|
import os
 | 
						|
from io import BytesIO
 | 
						|
from pathlib import Path
 | 
						|
from random import randint
 | 
						|
from types import SimpleNamespace
 | 
						|
from typing import Any, Callable
 | 
						|
from unittest import mock
 | 
						|
 | 
						|
import cv2
 | 
						|
import numpy as np
 | 
						|
import onnxruntime as ort
 | 
						|
import orjson
 | 
						|
import pytest
 | 
						|
from fastapi import HTTPException
 | 
						|
from fastapi.testclient import TestClient
 | 
						|
from PIL import Image
 | 
						|
from pytest import MonkeyPatch
 | 
						|
from pytest_mock import MockerFixture
 | 
						|
 | 
						|
from immich_ml.config import Settings, settings
 | 
						|
from immich_ml.main import load, preload_models
 | 
						|
from immich_ml.models.base import InferenceModel
 | 
						|
from immich_ml.models.cache import ModelCache
 | 
						|
from immich_ml.models.clip.textual import MClipTextualEncoder, OpenClipTextualEncoder
 | 
						|
from immich_ml.models.clip.visual import OpenClipVisualEncoder
 | 
						|
from immich_ml.models.facial_recognition.detection import FaceDetector
 | 
						|
from immich_ml.models.facial_recognition.recognition import FaceRecognizer
 | 
						|
from immich_ml.schemas import ModelFormat, ModelTask, ModelType
 | 
						|
from immich_ml.sessions.ann import AnnSession
 | 
						|
from immich_ml.sessions.ort import OrtSession
 | 
						|
from immich_ml.sessions.rknn import RknnSession, run_inference
 | 
						|
 | 
						|
 | 
						|
class TestBase:
 | 
						|
    def test_sets_default_cache_dir(self) -> None:
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert encoder.cache_dir == Path(settings.cache_folder) / "clip" / "ViT-B-32__openai"
 | 
						|
 | 
						|
    def test_sets_cache_dir_kwarg(self) -> None:
 | 
						|
        cache_dir = Path("/test_cache")
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=cache_dir)
 | 
						|
 | 
						|
        assert encoder.cache_dir == cache_dir
 | 
						|
 | 
						|
    def test_sets_default_model_format(self, mocker: MockerFixture) -> None:
 | 
						|
        mocker.patch.object(settings, "ann", True)
 | 
						|
        mocker.patch("immich_ml.sessions.ann.loader.is_available", False)
 | 
						|
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert encoder.model_format == ModelFormat.ONNX
 | 
						|
 | 
						|
    def test_sets_default_model_format_to_armnn_if_available(self, path: mock.Mock, mocker: MockerFixture) -> None:
 | 
						|
        mocker.patch.object(settings, "ann", True)
 | 
						|
        mocker.patch("immich_ml.sessions.ann.loader.is_available", True)
 | 
						|
        path.suffix = ".armnn"
 | 
						|
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=path)
 | 
						|
 | 
						|
        assert encoder.model_format == ModelFormat.ARMNN
 | 
						|
 | 
						|
    def test_sets_model_format_kwarg(self, mocker: MockerFixture) -> None:
 | 
						|
        mocker.patch.object(settings, "ann", False)
 | 
						|
        mocker.patch("immich_ml.sessions.ann.loader.is_available", False)
 | 
						|
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", model_format=ModelFormat.ARMNN)
 | 
						|
 | 
						|
        assert encoder.model_format == ModelFormat.ARMNN
 | 
						|
 | 
						|
    def test_sets_default_model_format_to_rknn_if_available(self, mocker: MockerFixture) -> None:
 | 
						|
        mocker.patch.object(settings, "rknn", True)
 | 
						|
        mocker.patch("immich_ml.sessions.rknn.is_available", True)
 | 
						|
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert encoder.model_format == ModelFormat.RKNN
 | 
						|
 | 
						|
    def test_casts_cache_dir_string_to_path(self) -> None:
 | 
						|
        cache_dir = "/test_cache"
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=cache_dir)
 | 
						|
 | 
						|
        assert encoder.cache_dir == Path(cache_dir)
 | 
						|
 | 
						|
    def test_clear_cache(self, rmtree: mock.Mock, path: mock.Mock, info: mock.Mock) -> None:
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=path)
 | 
						|
        encoder.clear_cache()
 | 
						|
 | 
						|
        rmtree.assert_called_once_with(encoder.cache_dir)
 | 
						|
        info.assert_called_with(f"Cleared cache directory for model '{encoder.model_name}'.")
 | 
						|
 | 
						|
    def test_clear_cache_warns_if_path_does_not_exist(
 | 
						|
        self, rmtree: mock.Mock, path: mock.Mock, warning: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        path.return_value.exists.return_value = False
 | 
						|
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=path)
 | 
						|
        encoder.clear_cache()
 | 
						|
 | 
						|
        rmtree.assert_not_called()
 | 
						|
        warning.assert_called_once()
 | 
						|
 | 
						|
    def test_clear_cache_raises_exception_if_vulnerable_to_symlink_attack(
 | 
						|
        self, rmtree: mock.Mock, path: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        rmtree.avoids_symlink_attacks = False
 | 
						|
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=path)
 | 
						|
        with pytest.raises(RuntimeError):
 | 
						|
            encoder.clear_cache()
 | 
						|
 | 
						|
        rmtree.assert_not_called()
 | 
						|
 | 
						|
    def test_clear_cache_replaces_file_with_dir_if_path_is_file(
 | 
						|
        self, rmtree: mock.Mock, path: mock.Mock, warning: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        path.return_value.is_dir.return_value = False
 | 
						|
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=path)
 | 
						|
        encoder.clear_cache()
 | 
						|
 | 
						|
        rmtree.assert_not_called()
 | 
						|
        path.return_value.unlink.assert_called_once()
 | 
						|
        path.return_value.mkdir.assert_called_once()
 | 
						|
        warning.assert_called_once()
 | 
						|
 | 
						|
    def test_download(self, snapshot_download: mock.Mock) -> None:
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="/path/to/cache")
 | 
						|
        encoder.download()
 | 
						|
 | 
						|
        snapshot_download.assert_called_once_with(
 | 
						|
            "immich-app/ViT-B-32__openai",
 | 
						|
            cache_dir=encoder.cache_dir,
 | 
						|
            local_dir=encoder.cache_dir,
 | 
						|
            ignore_patterns=["*.armnn", "*.rknn"],
 | 
						|
        )
 | 
						|
 | 
						|
    def test_download_downloads_armnn_if_preferred_format(self, snapshot_download: mock.Mock) -> None:
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", model_format=ModelFormat.ARMNN)
 | 
						|
        encoder.download()
 | 
						|
 | 
						|
        snapshot_download.assert_called_once_with(
 | 
						|
            "immich-app/ViT-B-32__openai",
 | 
						|
            cache_dir=encoder.cache_dir,
 | 
						|
            local_dir=encoder.cache_dir,
 | 
						|
            ignore_patterns=["*.rknn"],
 | 
						|
        )
 | 
						|
 | 
						|
    def test_download_downloads_rknn_if_preferred_format(self, snapshot_download: mock.Mock) -> None:
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", model_format=ModelFormat.RKNN)
 | 
						|
        encoder.download()
 | 
						|
 | 
						|
        snapshot_download.assert_called_once_with(
 | 
						|
            "immich-app/ViT-B-32__openai",
 | 
						|
            cache_dir=encoder.cache_dir,
 | 
						|
            local_dir=encoder.cache_dir,
 | 
						|
            ignore_patterns=["*.armnn"],
 | 
						|
        )
 | 
						|
 | 
						|
    def test_throws_exception_if_model_path_does_not_exist(
 | 
						|
        self, snapshot_download: mock.Mock, ort_session: mock.Mock, path: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        path.return_value.__truediv__.return_value.__truediv__.return_value.is_file.return_value = False
 | 
						|
 | 
						|
        encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=path)
 | 
						|
 | 
						|
        with pytest.raises(FileNotFoundError):
 | 
						|
            encoder.load()
 | 
						|
 | 
						|
        snapshot_download.assert_called_once()
 | 
						|
        ort_session.assert_not_called()
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.usefixtures("ort_session")
 | 
						|
class TestOrtSession:
 | 
						|
    CPU_EP = ["CPUExecutionProvider"]
 | 
						|
    CUDA_EP = ["CUDAExecutionProvider", "CPUExecutionProvider"]
 | 
						|
    OV_EP = ["OpenVINOExecutionProvider", "CPUExecutionProvider"]
 | 
						|
    CUDA_EP_OUT_OF_ORDER = ["CPUExecutionProvider", "CUDAExecutionProvider"]
 | 
						|
    TRT_EP = ["TensorrtExecutionProvider", "CUDAExecutionProvider", "CPUExecutionProvider"]
 | 
						|
    ROCM_EP = ["ROCMExecutionProvider", "CPUExecutionProvider"]
 | 
						|
 | 
						|
    @pytest.mark.providers(CPU_EP)
 | 
						|
    def test_sets_cpu_provider(self, providers: list[str]) -> None:
 | 
						|
        session = OrtSession("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert session.providers == self.CPU_EP
 | 
						|
 | 
						|
    @pytest.mark.providers(CUDA_EP)
 | 
						|
    def test_sets_cuda_provider_if_available(self, providers: list[str]) -> None:
 | 
						|
        session = OrtSession("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert session.providers == self.CUDA_EP
 | 
						|
 | 
						|
    @pytest.mark.ov_device_ids(["GPU.0", "CPU"])
 | 
						|
    @pytest.mark.providers(OV_EP)
 | 
						|
    def test_sets_openvino_provider_if_available(self, providers: list[str], ov_device_ids: list[str]) -> None:
 | 
						|
        session = OrtSession("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert session.providers == self.OV_EP
 | 
						|
 | 
						|
    @pytest.mark.ov_device_ids(["CPU"])
 | 
						|
    @pytest.mark.providers(OV_EP)
 | 
						|
    def test_avoids_openvino_if_gpu_not_available(self, providers: list[str], ov_device_ids: list[str]) -> None:
 | 
						|
        session = OrtSession("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert session.providers == self.CPU_EP
 | 
						|
 | 
						|
    @pytest.mark.providers(CUDA_EP_OUT_OF_ORDER)
 | 
						|
    def test_sets_providers_in_correct_order(self, providers: list[str]) -> None:
 | 
						|
        session = OrtSession("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert session.providers == self.CUDA_EP
 | 
						|
 | 
						|
    @pytest.mark.providers(TRT_EP)
 | 
						|
    def test_ignores_unsupported_providers(self, providers: list[str]) -> None:
 | 
						|
        session = OrtSession("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert session.providers == self.CUDA_EP
 | 
						|
 | 
						|
    @pytest.mark.providers(ROCM_EP)
 | 
						|
    def test_uses_rocm(self, providers: list[str]) -> None:
 | 
						|
        session = OrtSession("ViT-B-32__openai")
 | 
						|
 | 
						|
        assert session.providers == self.ROCM_EP
 | 
						|
 | 
						|
    def test_sets_provider_kwarg(self) -> None:
 | 
						|
        providers = ["CUDAExecutionProvider"]
 | 
						|
        session = OrtSession("ViT-B-32__openai", providers=providers)
 | 
						|
 | 
						|
        assert session.providers == providers
 | 
						|
 | 
						|
    @pytest.mark.ov_device_ids(["GPU.0", "CPU"])
 | 
						|
    def test_sets_default_provider_options(self, ov_device_ids: list[str]) -> None:
 | 
						|
        model_path = "/cache/ViT-B-32__openai/model.onnx"
 | 
						|
        session = OrtSession(model_path, providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"])
 | 
						|
 | 
						|
        assert session.provider_options == [
 | 
						|
            {"device_type": "GPU.0", "precision": "FP32", "cache_dir": "/cache/ViT-B-32__openai/openvino"},
 | 
						|
            {"arena_extend_strategy": "kSameAsRequested"},
 | 
						|
        ]
 | 
						|
 | 
						|
    def test_sets_provider_options_for_openvino(self) -> None:
 | 
						|
        model_path = "/cache/ViT-B-32__openai/textual/model.onnx"
 | 
						|
        os.environ["MACHINE_LEARNING_DEVICE_ID"] = "1"
 | 
						|
 | 
						|
        session = OrtSession(model_path, providers=["OpenVINOExecutionProvider"])
 | 
						|
 | 
						|
        assert session.provider_options == [
 | 
						|
            {
 | 
						|
                "device_type": "GPU.1",
 | 
						|
                "precision": "FP32",
 | 
						|
                "cache_dir": "/cache/ViT-B-32__openai/textual/openvino",
 | 
						|
            }
 | 
						|
        ]
 | 
						|
 | 
						|
    def test_sets_provider_options_for_cuda(self) -> None:
 | 
						|
        os.environ["MACHINE_LEARNING_DEVICE_ID"] = "1"
 | 
						|
 | 
						|
        session = OrtSession("ViT-B-32__openai", providers=["CUDAExecutionProvider"])
 | 
						|
 | 
						|
        assert session.provider_options == [{"arena_extend_strategy": "kSameAsRequested", "device_id": "1"}]
 | 
						|
 | 
						|
    def test_sets_provider_options_for_rocm(self) -> None:
 | 
						|
        os.environ["MACHINE_LEARNING_DEVICE_ID"] = "1"
 | 
						|
 | 
						|
        session = OrtSession("ViT-B-32__openai", providers=["ROCMExecutionProvider"])
 | 
						|
 | 
						|
        assert session.provider_options == [{"arena_extend_strategy": "kSameAsRequested", "device_id": "1"}]
 | 
						|
 | 
						|
    def test_sets_provider_options_kwarg(self) -> None:
 | 
						|
        session = OrtSession(
 | 
						|
            "ViT-B-32__openai",
 | 
						|
            providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"],
 | 
						|
            provider_options=[],
 | 
						|
        )
 | 
						|
 | 
						|
        assert session.provider_options == []
 | 
						|
 | 
						|
    def test_sets_default_sess_options_if_cpu(self) -> None:
 | 
						|
        session = OrtSession("ViT-B-32__openai", providers=["CPUExecutionProvider"])
 | 
						|
 | 
						|
        assert session.sess_options.execution_mode == ort.ExecutionMode.ORT_SEQUENTIAL
 | 
						|
        assert session.sess_options.inter_op_num_threads == 1
 | 
						|
        assert session.sess_options.intra_op_num_threads == 2
 | 
						|
        assert session.sess_options.enable_cpu_mem_arena is False
 | 
						|
 | 
						|
    def test_sets_default_sess_options_does_not_set_threads_if_non_cpu_and_default_threads(self) -> None:
 | 
						|
        session = OrtSession("ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
 | 
						|
 | 
						|
        assert session.sess_options.inter_op_num_threads == 0
 | 
						|
        assert session.sess_options.intra_op_num_threads == 0
 | 
						|
 | 
						|
    def test_sets_default_sess_options_sets_threads_if_non_cpu_and_set_threads(self, mocker: MockerFixture) -> None:
 | 
						|
        mock_settings = mocker.patch("immich_ml.sessions.ort.settings", autospec=True)
 | 
						|
        mock_settings.model_inter_op_threads = 2
 | 
						|
        mock_settings.model_intra_op_threads = 4
 | 
						|
 | 
						|
        session = OrtSession("ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
 | 
						|
 | 
						|
        assert session.sess_options.inter_op_num_threads == 2
 | 
						|
        assert session.sess_options.intra_op_num_threads == 4
 | 
						|
 | 
						|
    def test_sets_sess_options_kwarg(self) -> None:
 | 
						|
        sess_options = ort.SessionOptions()
 | 
						|
        session = OrtSession(
 | 
						|
            "ViT-B-32__openai",
 | 
						|
            providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"],
 | 
						|
            provider_options=[],
 | 
						|
            sess_options=sess_options,
 | 
						|
        )
 | 
						|
 | 
						|
        assert sess_options is session.sess_options
 | 
						|
 | 
						|
 | 
						|
class TestAnnSession:
 | 
						|
    def test_creates_ann_session(self, ann_session: mock.Mock, info: mock.Mock) -> None:
 | 
						|
        model_path = mock.MagicMock(spec=Path)
 | 
						|
        cache_dir = mock.MagicMock(spec=Path)
 | 
						|
 | 
						|
        AnnSession(model_path, cache_dir)
 | 
						|
 | 
						|
        ann_session.assert_called_once_with(tuning_level=2, tuning_file=(cache_dir / "gpu-tuning.ann").as_posix())
 | 
						|
        ann_session.return_value.load.assert_called_once_with(
 | 
						|
            model_path.as_posix(), cached_network_path=model_path.with_suffix(".anncache").as_posix(), fp16=False
 | 
						|
        )
 | 
						|
        info.assert_has_calls(
 | 
						|
            [
 | 
						|
                mock.call("Loading ANN model %s ...", model_path),
 | 
						|
                mock.call("Loaded ANN model with ID %d", ann_session.return_value.load.return_value),
 | 
						|
            ]
 | 
						|
        )
 | 
						|
 | 
						|
    def test_get_inputs(self, ann_session: mock.Mock) -> None:
 | 
						|
        ann_session.return_value.load.return_value = 123
 | 
						|
        ann_session.return_value.input_shapes = {123: [(1, 3, 224, 224)]}
 | 
						|
        session = AnnSession(Path("ViT-B-32__openai"))
 | 
						|
 | 
						|
        inputs = session.get_inputs()
 | 
						|
 | 
						|
        assert len(inputs) == 1
 | 
						|
        assert inputs[0].name is None
 | 
						|
        assert inputs[0].shape == (1, 3, 224, 224)
 | 
						|
 | 
						|
    def test_get_outputs(self, ann_session: mock.Mock) -> None:
 | 
						|
        ann_session.return_value.load.return_value = 123
 | 
						|
        ann_session.return_value.output_shapes = {123: [(1, 3, 224, 224)]}
 | 
						|
        session = AnnSession(Path("ViT-B-32__openai"))
 | 
						|
 | 
						|
        outputs = session.get_outputs()
 | 
						|
 | 
						|
        assert len(outputs) == 1
 | 
						|
        assert outputs[0].name is None
 | 
						|
        assert outputs[0].shape == (1, 3, 224, 224)
 | 
						|
 | 
						|
    def test_run(self, ann_session: mock.Mock, mocker: MockerFixture) -> None:
 | 
						|
        ann_session.return_value.load.return_value = 123
 | 
						|
        np_spy = mocker.spy(np, "ascontiguousarray")
 | 
						|
        session = AnnSession(Path("ViT-B-32__openai"))
 | 
						|
        [input1, input2] = [np.random.rand(1, 3, 224, 224).astype(np.float32) for _ in range(2)]
 | 
						|
        input_feed = {"input.1": input1, "input.2": input2}
 | 
						|
 | 
						|
        session.run(None, input_feed)
 | 
						|
 | 
						|
        ann_session.return_value.execute.assert_called_once_with(123, [input1, input2])
 | 
						|
        assert np_spy.call_count == 2
 | 
						|
        np_spy.assert_has_calls([mock.call(input1), mock.call(input2)])
 | 
						|
 | 
						|
 | 
						|
class TestRknnSession:
 | 
						|
    def test_creates_rknn_session(self, rknn_session: mock.Mock, info: mock.Mock, mocker: MockerFixture) -> None:
 | 
						|
        model_path = mock.MagicMock(spec=Path)
 | 
						|
        tpe = 1
 | 
						|
        mocker.patch("immich_ml.sessions.rknn.soc_name", "rk3566")
 | 
						|
        mocker.patch("immich_ml.sessions.rknn.is_available", True)
 | 
						|
        RknnSession(model_path)
 | 
						|
 | 
						|
        rknn_session.assert_called_once_with(model_path=model_path.as_posix(), tpes=tpe, func=run_inference)
 | 
						|
 | 
						|
        info.assert_has_calls([mock.call(f"Loaded RKNN model from {model_path} with {tpe} threads.")])
 | 
						|
 | 
						|
    def test_run_rknn(self, rknn_session: mock.Mock, mocker: MockerFixture) -> None:
 | 
						|
        rknn_session.return_value.load.return_value = 123
 | 
						|
        np_spy = mocker.spy(np, "ascontiguousarray")
 | 
						|
        mocker.patch("immich_ml.sessions.rknn.soc_name", "rk3566")
 | 
						|
        session = RknnSession(Path("ViT-B-32__openai"))
 | 
						|
        [input1, input2] = [np.random.rand(1, 3, 224, 224).astype(np.float32) for _ in range(2)]
 | 
						|
        input_feed = {"input.1": input1, "input.2": input2}
 | 
						|
 | 
						|
        session.run(None, input_feed)
 | 
						|
 | 
						|
        rknn_session.return_value.put.assert_called_once_with([input1, input2])
 | 
						|
        np_spy.call_count == 2
 | 
						|
        np_spy.assert_has_calls([mock.call(input1), mock.call(input2)])
 | 
						|
 | 
						|
 | 
						|
class TestCLIP:
 | 
						|
    embedding = np.random.rand(512).astype(np.float32)
 | 
						|
    cache_dir = Path("test_cache")
 | 
						|
 | 
						|
    def test_basic_image(
 | 
						|
        self,
 | 
						|
        pil_image: Image.Image,
 | 
						|
        mocker: MockerFixture,
 | 
						|
        clip_model_cfg: dict[str, Any],
 | 
						|
        clip_preprocess_cfg: Callable[[Path], dict[str, Any]],
 | 
						|
    ) -> None:
 | 
						|
        mocker.patch.object(OpenClipVisualEncoder, "download")
 | 
						|
        mocker.patch.object(OpenClipVisualEncoder, "model_cfg", clip_model_cfg)
 | 
						|
        mocker.patch.object(OpenClipVisualEncoder, "preprocess_cfg", clip_preprocess_cfg)
 | 
						|
 | 
						|
        mocked = mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
 | 
						|
        mocked.run.return_value = [[self.embedding]]
 | 
						|
 | 
						|
        clip_encoder = OpenClipVisualEncoder("ViT-B-32__openai", cache_dir="test_cache")
 | 
						|
        embedding_str = clip_encoder.predict(pil_image)
 | 
						|
        assert isinstance(embedding_str, str)
 | 
						|
        embedding = orjson.loads(embedding_str)
 | 
						|
        assert isinstance(embedding, list)
 | 
						|
        assert len(embedding) == clip_model_cfg["embed_dim"]
 | 
						|
        mocked.run.assert_called_once()
 | 
						|
 | 
						|
    def test_basic_text(
 | 
						|
        self,
 | 
						|
        mocker: MockerFixture,
 | 
						|
        clip_model_cfg: dict[str, Any],
 | 
						|
        clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
 | 
						|
    ) -> None:
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "download")
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
 | 
						|
 | 
						|
        mocked = mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
 | 
						|
        mocked.run.return_value = [[self.embedding]]
 | 
						|
        mocker.patch("immich_ml.models.clip.textual.Tokenizer.from_file", autospec=True)
 | 
						|
 | 
						|
        clip_encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache")
 | 
						|
        embedding_str = clip_encoder.predict("test search query")
 | 
						|
        assert isinstance(embedding_str, str)
 | 
						|
        embedding = orjson.loads(embedding_str)
 | 
						|
        assert isinstance(embedding, list)
 | 
						|
        assert len(embedding) == clip_model_cfg["embed_dim"]
 | 
						|
        mocked.run.assert_called_once()
 | 
						|
 | 
						|
    def test_openclip_tokenizer(
 | 
						|
        self,
 | 
						|
        mocker: MockerFixture,
 | 
						|
        clip_model_cfg: dict[str, Any],
 | 
						|
        clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
 | 
						|
    ) -> None:
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "download")
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
 | 
						|
        mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
 | 
						|
        mock_tokenizer = mocker.patch("immich_ml.models.clip.textual.Tokenizer.from_file", autospec=True).return_value
 | 
						|
        mock_ids = [randint(0, 50000) for _ in range(77)]
 | 
						|
        mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids)
 | 
						|
 | 
						|
        clip_encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache")
 | 
						|
        clip_encoder._load()
 | 
						|
        tokens = clip_encoder.tokenize("test   search query")
 | 
						|
 | 
						|
        assert "text" in tokens
 | 
						|
        assert isinstance(tokens["text"], np.ndarray)
 | 
						|
        assert tokens["text"].shape == (1, 77)
 | 
						|
        assert tokens["text"].dtype == np.int32
 | 
						|
        assert np.allclose(tokens["text"], np.array([mock_ids], dtype=np.int32), atol=0)
 | 
						|
        mock_tokenizer.encode.assert_called_once_with("test search query")
 | 
						|
 | 
						|
    def test_openclip_tokenizer_canonicalizes_text(
 | 
						|
        self,
 | 
						|
        mocker: MockerFixture,
 | 
						|
        clip_model_cfg: dict[str, Any],
 | 
						|
        clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
 | 
						|
    ) -> None:
 | 
						|
        clip_model_cfg["text_cfg"]["tokenizer_kwargs"] = {"clean": "canonicalize"}
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "download")
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
 | 
						|
        mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
 | 
						|
        mock_tokenizer = mocker.patch("immich_ml.models.clip.textual.Tokenizer.from_file", autospec=True).return_value
 | 
						|
        mock_ids = [randint(0, 50000) for _ in range(77)]
 | 
						|
        mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids)
 | 
						|
 | 
						|
        clip_encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache")
 | 
						|
        clip_encoder._load()
 | 
						|
        tokens = clip_encoder.tokenize("Test   Search Query!")
 | 
						|
 | 
						|
        assert "text" in tokens
 | 
						|
        assert isinstance(tokens["text"], np.ndarray)
 | 
						|
        assert tokens["text"].shape == (1, 77)
 | 
						|
        assert tokens["text"].dtype == np.int32
 | 
						|
        assert np.allclose(tokens["text"], np.array([mock_ids], dtype=np.int32), atol=0)
 | 
						|
        mock_tokenizer.encode.assert_called_once_with("test search query")
 | 
						|
 | 
						|
    def test_openclip_tokenizer_adds_flores_token_for_nllb(
 | 
						|
        self,
 | 
						|
        mocker: MockerFixture,
 | 
						|
        clip_model_cfg: dict[str, Any],
 | 
						|
        clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
 | 
						|
    ) -> None:
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "download")
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
 | 
						|
        mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
 | 
						|
        mock_tokenizer = mocker.patch("immich_ml.models.clip.textual.Tokenizer.from_file", autospec=True).return_value
 | 
						|
        mock_ids = [randint(0, 50000) for _ in range(77)]
 | 
						|
        mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids)
 | 
						|
 | 
						|
        clip_encoder = OpenClipTextualEncoder("nllb-clip-base-siglip__mrl", cache_dir="test_cache")
 | 
						|
        clip_encoder._load()
 | 
						|
        clip_encoder.tokenize("test search query", language="de")
 | 
						|
 | 
						|
        mock_tokenizer.encode.assert_called_once_with("deu_Latntest search query")
 | 
						|
 | 
						|
    def test_openclip_tokenizer_removes_country_code_from_language_for_nllb_if_not_found(
 | 
						|
        self,
 | 
						|
        mocker: MockerFixture,
 | 
						|
        clip_model_cfg: dict[str, Any],
 | 
						|
        clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
 | 
						|
    ) -> None:
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "download")
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
 | 
						|
        mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
 | 
						|
        mock_tokenizer = mocker.patch("immich_ml.models.clip.textual.Tokenizer.from_file", autospec=True).return_value
 | 
						|
        mock_ids = [randint(0, 50000) for _ in range(77)]
 | 
						|
        mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids)
 | 
						|
 | 
						|
        clip_encoder = OpenClipTextualEncoder("nllb-clip-base-siglip__mrl", cache_dir="test_cache")
 | 
						|
        clip_encoder._load()
 | 
						|
        clip_encoder.tokenize("test search query", language="de-CH")
 | 
						|
 | 
						|
        mock_tokenizer.encode.assert_called_once_with("deu_Latntest search query")
 | 
						|
 | 
						|
    def test_openclip_tokenizer_falls_back_to_english_for_nllb_if_language_code_not_found(
 | 
						|
        self,
 | 
						|
        mocker: MockerFixture,
 | 
						|
        clip_model_cfg: dict[str, Any],
 | 
						|
        clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
 | 
						|
        warning: mock.Mock,
 | 
						|
    ) -> None:
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "download")
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
 | 
						|
        mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
 | 
						|
        mock_tokenizer = mocker.patch("immich_ml.models.clip.textual.Tokenizer.from_file", autospec=True).return_value
 | 
						|
        mock_ids = [randint(0, 50000) for _ in range(77)]
 | 
						|
        mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids)
 | 
						|
 | 
						|
        clip_encoder = OpenClipTextualEncoder("nllb-clip-base-siglip__mrl", cache_dir="test_cache")
 | 
						|
        clip_encoder._load()
 | 
						|
        clip_encoder.tokenize("test search query", language="unknown")
 | 
						|
 | 
						|
        mock_tokenizer.encode.assert_called_once_with("eng_Latntest search query")
 | 
						|
        warning.assert_called_once_with("Language 'unknown' not found, defaulting to 'en'")
 | 
						|
 | 
						|
    def test_openclip_tokenizer_does_not_add_flores_token_for_non_nllb_model(
 | 
						|
        self,
 | 
						|
        mocker: MockerFixture,
 | 
						|
        clip_model_cfg: dict[str, Any],
 | 
						|
        clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
 | 
						|
    ) -> None:
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "download")
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
 | 
						|
        mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
 | 
						|
        mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
 | 
						|
        mock_tokenizer = mocker.patch("immich_ml.models.clip.textual.Tokenizer.from_file", autospec=True).return_value
 | 
						|
        mock_ids = [randint(0, 50000) for _ in range(77)]
 | 
						|
        mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids)
 | 
						|
 | 
						|
        clip_encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache")
 | 
						|
        clip_encoder._load()
 | 
						|
        clip_encoder.tokenize("test search query", language="de")
 | 
						|
 | 
						|
        mock_tokenizer.encode.assert_called_once_with("test search query")
 | 
						|
 | 
						|
    def test_mclip_tokenizer(
 | 
						|
        self,
 | 
						|
        mocker: MockerFixture,
 | 
						|
        clip_model_cfg: dict[str, Any],
 | 
						|
        clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
 | 
						|
    ) -> None:
 | 
						|
        mocker.patch.object(MClipTextualEncoder, "download")
 | 
						|
        mocker.patch.object(MClipTextualEncoder, "model_cfg", clip_model_cfg)
 | 
						|
        mocker.patch.object(MClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
 | 
						|
        mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
 | 
						|
        mock_tokenizer = mocker.patch("immich_ml.models.clip.textual.Tokenizer.from_file", autospec=True).return_value
 | 
						|
        mock_ids = [randint(0, 50000) for _ in range(77)]
 | 
						|
        mock_attention_mask = [randint(0, 1) for _ in range(77)]
 | 
						|
        mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids, attention_mask=mock_attention_mask)
 | 
						|
 | 
						|
        clip_encoder = MClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache")
 | 
						|
        clip_encoder._load()
 | 
						|
        tokens = clip_encoder.tokenize("test search query")
 | 
						|
 | 
						|
        assert "input_ids" in tokens
 | 
						|
        assert "attention_mask" in tokens
 | 
						|
        assert isinstance(tokens["input_ids"], np.ndarray)
 | 
						|
        assert isinstance(tokens["attention_mask"], np.ndarray)
 | 
						|
        assert tokens["input_ids"].shape == (1, 77)
 | 
						|
        assert tokens["attention_mask"].shape == (1, 77)
 | 
						|
        assert np.allclose(tokens["input_ids"], np.array([mock_ids], dtype=np.int32), atol=0)
 | 
						|
        assert np.allclose(tokens["attention_mask"], np.array([mock_attention_mask], dtype=np.int32), atol=0)
 | 
						|
 | 
						|
 | 
						|
class TestFaceRecognition:
 | 
						|
    def test_set_min_score(self, snapshot_download: mock.Mock, ort_session: mock.Mock, path: mock.Mock) -> None:
 | 
						|
        path.return_value.__truediv__.return_value.__truediv__.return_value.suffix = ".onnx"
 | 
						|
 | 
						|
        face_detector = FaceDetector("buffalo_s", min_score=0.5, cache_dir="test_cache")
 | 
						|
        face_detector.load()
 | 
						|
 | 
						|
        assert face_detector.min_score == 0.5
 | 
						|
        assert face_detector.model.det_thresh == 0.5
 | 
						|
 | 
						|
    def test_detection(self, cv_image: cv2.Mat, mocker: MockerFixture) -> None:
 | 
						|
        mocker.patch.object(FaceDetector, "load")
 | 
						|
        face_detector = FaceDetector("buffalo_s", min_score=0.0, cache_dir="test_cache")
 | 
						|
 | 
						|
        det_model = mock.Mock()
 | 
						|
        num_faces = 2
 | 
						|
        bbox = np.random.rand(num_faces, 4).astype(np.float32)
 | 
						|
        scores = np.array([[0.67]] * num_faces).astype(np.float32)
 | 
						|
        kpss = np.random.rand(num_faces, 5, 2).astype(np.float32)
 | 
						|
        det_model.detect.return_value = (np.concatenate([bbox, scores], axis=-1), kpss)
 | 
						|
        face_detector.model = det_model
 | 
						|
 | 
						|
        faces = face_detector.predict(cv_image)
 | 
						|
 | 
						|
        assert isinstance(faces, dict)
 | 
						|
        assert isinstance(faces.get("boxes", None), np.ndarray)
 | 
						|
        assert isinstance(faces.get("landmarks", None), np.ndarray)
 | 
						|
        assert isinstance(faces.get("scores", None), np.ndarray)
 | 
						|
        assert np.equal(faces["boxes"], bbox.round()).all()
 | 
						|
        assert np.equal(faces["landmarks"], kpss).all()
 | 
						|
        assert np.equal(faces["scores"], scores).all()
 | 
						|
        det_model.detect.assert_called_once()
 | 
						|
 | 
						|
    def test_recognition(self, cv_image: cv2.Mat, mocker: MockerFixture) -> None:
 | 
						|
        mocker.patch.object(FaceRecognizer, "load")
 | 
						|
        face_recognizer = FaceRecognizer("buffalo_s", min_score=0.0, cache_dir="test_cache")
 | 
						|
 | 
						|
        num_faces = 2
 | 
						|
        bbox = np.random.rand(num_faces, 4).astype(np.float32)
 | 
						|
        scores = np.array([0.67] * num_faces).astype(np.float32)
 | 
						|
        kpss = np.random.rand(num_faces, 5, 2).astype(np.float32)
 | 
						|
        faces = {"boxes": bbox, "landmarks": kpss, "scores": scores}
 | 
						|
 | 
						|
        rec_model = mock.Mock()
 | 
						|
        embedding = np.random.rand(num_faces, 512).astype(np.float32)
 | 
						|
        rec_model.get_feat.return_value = embedding
 | 
						|
        face_recognizer.model = rec_model
 | 
						|
 | 
						|
        faces = face_recognizer.predict(cv_image, faces)
 | 
						|
 | 
						|
        assert isinstance(faces, list)
 | 
						|
        assert len(faces) == num_faces
 | 
						|
        for face in faces:
 | 
						|
            assert isinstance(face.get("boundingBox"), dict)
 | 
						|
            assert set(face["boundingBox"]) == {"x1", "y1", "x2", "y2"}
 | 
						|
            assert all(isinstance(val, np.float32) for val in face["boundingBox"].values())
 | 
						|
            embedding_str = face.get("embedding")
 | 
						|
            assert isinstance(embedding_str, str)
 | 
						|
            embedding = orjson.loads(embedding_str)
 | 
						|
            assert isinstance(embedding, list)
 | 
						|
            assert len(embedding) == 512
 | 
						|
            assert isinstance(face.get("score", None), np.float32)
 | 
						|
 | 
						|
        rec_model.get_feat.assert_called_once()
 | 
						|
        call_args = rec_model.get_feat.call_args_list[0].args
 | 
						|
        assert len(call_args) == 1
 | 
						|
        assert isinstance(call_args[0], list)
 | 
						|
        assert isinstance(call_args[0][0], np.ndarray)
 | 
						|
        assert call_args[0][0].shape == (112, 112, 3)
 | 
						|
 | 
						|
    def test_recognition_adds_batch_axis_for_ort(
 | 
						|
        self, ort_session: mock.Mock, path: mock.Mock, mocker: MockerFixture
 | 
						|
    ) -> None:
 | 
						|
        onnx = mocker.patch("immich_ml.models.facial_recognition.recognition.onnx", autospec=True)
 | 
						|
        update_dims = mocker.patch(
 | 
						|
            "immich_ml.models.facial_recognition.recognition.update_inputs_outputs_dims", autospec=True
 | 
						|
        )
 | 
						|
        mocker.patch("immich_ml.models.base.InferenceModel.download")
 | 
						|
        mocker.patch("immich_ml.models.facial_recognition.recognition.ArcFaceONNX")
 | 
						|
        ort_session.return_value.get_inputs.return_value = [SimpleNamespace(name="input.1", shape=(1, 3, 224, 224))]
 | 
						|
        ort_session.return_value.get_outputs.return_value = [SimpleNamespace(name="output.1", shape=(1, 800))]
 | 
						|
        path.return_value.__truediv__.return_value.__truediv__.return_value.suffix = ".onnx"
 | 
						|
 | 
						|
        proto = mock.Mock()
 | 
						|
 | 
						|
        input_dims = mock.Mock()
 | 
						|
        input_dims.name = "input.1"
 | 
						|
        input_dims.type.tensor_type.shape.dim = [SimpleNamespace(dim_value=size) for size in [1, 3, 224, 224]]
 | 
						|
        proto.graph.input = [input_dims]
 | 
						|
 | 
						|
        output_dims = mock.Mock()
 | 
						|
        output_dims.name = "output.1"
 | 
						|
        output_dims.type.tensor_type.shape.dim = [SimpleNamespace(dim_value=size) for size in [1, 800]]
 | 
						|
        proto.graph.output = [output_dims]
 | 
						|
 | 
						|
        onnx.load.return_value = proto
 | 
						|
 | 
						|
        face_recognizer = FaceRecognizer("buffalo_s", cache_dir=path)
 | 
						|
        face_recognizer.load()
 | 
						|
 | 
						|
        assert face_recognizer.batch_size is None
 | 
						|
        update_dims.assert_called_once_with(proto, {"input.1": ["batch", 3, 224, 224]}, {"output.1": ["batch", 800]})
 | 
						|
        onnx.save.assert_called_once_with(update_dims.return_value, face_recognizer.model_path)
 | 
						|
 | 
						|
    def test_recognition_does_not_add_batch_axis_if_exists(
 | 
						|
        self, ort_session: mock.Mock, path: mock.Mock, mocker: MockerFixture
 | 
						|
    ) -> None:
 | 
						|
        onnx = mocker.patch("immich_ml.models.facial_recognition.recognition.onnx", autospec=True)
 | 
						|
        update_dims = mocker.patch(
 | 
						|
            "immich_ml.models.facial_recognition.recognition.update_inputs_outputs_dims", autospec=True
 | 
						|
        )
 | 
						|
        mocker.patch("immich_ml.models.base.InferenceModel.download")
 | 
						|
        mocker.patch("immich_ml.models.facial_recognition.recognition.ArcFaceONNX")
 | 
						|
        path.return_value.__truediv__.return_value.__truediv__.return_value.suffix = ".onnx"
 | 
						|
 | 
						|
        inputs = [SimpleNamespace(name="input.1", shape=("batch", 3, 224, 224))]
 | 
						|
        outputs = [SimpleNamespace(name="output.1", shape=("batch", 800))]
 | 
						|
        ort_session.return_value.get_inputs.return_value = inputs
 | 
						|
        ort_session.return_value.get_outputs.return_value = outputs
 | 
						|
 | 
						|
        face_recognizer = FaceRecognizer("buffalo_s", cache_dir=path)
 | 
						|
        face_recognizer.load()
 | 
						|
 | 
						|
        assert face_recognizer.batch_size is None
 | 
						|
        update_dims.assert_not_called()
 | 
						|
        onnx.load.assert_not_called()
 | 
						|
        onnx.save.assert_not_called()
 | 
						|
 | 
						|
    def test_recognition_does_not_add_batch_axis_for_armnn(
 | 
						|
        self, ann_session: mock.Mock, path: mock.Mock, mocker: MockerFixture
 | 
						|
    ) -> None:
 | 
						|
        onnx = mocker.patch("immich_ml.models.facial_recognition.recognition.onnx", autospec=True)
 | 
						|
        update_dims = mocker.patch(
 | 
						|
            "immich_ml.models.facial_recognition.recognition.update_inputs_outputs_dims", autospec=True
 | 
						|
        )
 | 
						|
        mocker.patch("immich_ml.models.base.InferenceModel.download")
 | 
						|
        mocker.patch("immich_ml.models.facial_recognition.recognition.ArcFaceONNX")
 | 
						|
        path.return_value.__truediv__.return_value.__truediv__.return_value.suffix = ".armnn"
 | 
						|
 | 
						|
        inputs = [SimpleNamespace(name="input.1", shape=("batch", 3, 224, 224))]
 | 
						|
        outputs = [SimpleNamespace(name="output.1", shape=("batch", 800))]
 | 
						|
        ann_session.return_value.get_inputs.return_value = inputs
 | 
						|
        ann_session.return_value.get_outputs.return_value = outputs
 | 
						|
 | 
						|
        face_recognizer = FaceRecognizer("buffalo_s", model_format=ModelFormat.ARMNN, cache_dir=path)
 | 
						|
        face_recognizer.load()
 | 
						|
 | 
						|
        assert face_recognizer.batch_size == 1
 | 
						|
        update_dims.assert_not_called()
 | 
						|
        onnx.load.assert_not_called()
 | 
						|
        onnx.save.assert_not_called()
 | 
						|
 | 
						|
    def test_recognition_does_not_add_batch_axis_for_openvino(
 | 
						|
        self, ort_session: mock.Mock, path: mock.Mock, mocker: MockerFixture
 | 
						|
    ) -> None:
 | 
						|
        onnx = mocker.patch("immich_ml.models.facial_recognition.recognition.onnx", autospec=True)
 | 
						|
        update_dims = mocker.patch(
 | 
						|
            "immich_ml.models.facial_recognition.recognition.update_inputs_outputs_dims", autospec=True
 | 
						|
        )
 | 
						|
        mocker.patch("immich_ml.models.base.InferenceModel.download")
 | 
						|
        mocker.patch("immich_ml.models.facial_recognition.recognition.ArcFaceONNX")
 | 
						|
        path.return_value.__truediv__.return_value.__truediv__.return_value.suffix = ".onnx"
 | 
						|
 | 
						|
        inputs = [SimpleNamespace(name="input.1", shape=("batch", 3, 224, 224))]
 | 
						|
        outputs = [SimpleNamespace(name="output.1", shape=("batch", 800))]
 | 
						|
        ort_session.return_value.get_inputs.return_value = inputs
 | 
						|
        ort_session.return_value.get_outputs.return_value = outputs
 | 
						|
 | 
						|
        face_recognizer = FaceRecognizer(
 | 
						|
            "buffalo_s", model_format=ModelFormat.ARMNN, cache_dir=path, providers=["OpenVINOExecutionProvider"]
 | 
						|
        )
 | 
						|
        face_recognizer.load()
 | 
						|
 | 
						|
        assert face_recognizer.batch_size == 1
 | 
						|
        update_dims.assert_not_called()
 | 
						|
        onnx.load.assert_not_called()
 | 
						|
        onnx.save.assert_not_called()
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.asyncio
 | 
						|
class TestCache:
 | 
						|
    async def test_caches(self, mock_get_model: mock.Mock) -> None:
 | 
						|
        model_cache = ModelCache()
 | 
						|
        await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION)
 | 
						|
        await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION)
 | 
						|
        assert len(model_cache.cache._cache) == 1
 | 
						|
        mock_get_model.assert_called_once()
 | 
						|
 | 
						|
    async def test_kwargs_used(self, mock_get_model: mock.Mock) -> None:
 | 
						|
        model_cache = ModelCache()
 | 
						|
        await model_cache.get(
 | 
						|
            "test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, cache_dir="test_cache"
 | 
						|
        )
 | 
						|
        mock_get_model.assert_called_once_with(
 | 
						|
            "test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, cache_dir="test_cache"
 | 
						|
        )
 | 
						|
 | 
						|
    async def test_different_clip(self, mock_get_model: mock.Mock) -> None:
 | 
						|
        model_cache = ModelCache()
 | 
						|
        await model_cache.get("test_model_name", ModelType.VISUAL, ModelTask.SEARCH)
 | 
						|
        await model_cache.get("test_model_name", ModelType.TEXTUAL, ModelTask.SEARCH)
 | 
						|
        mock_get_model.assert_has_calls(
 | 
						|
            [
 | 
						|
                mock.call("test_model_name", ModelType.VISUAL, ModelTask.SEARCH),
 | 
						|
                mock.call("test_model_name", ModelType.TEXTUAL, ModelTask.SEARCH),
 | 
						|
            ]
 | 
						|
        )
 | 
						|
        assert len(model_cache.cache._cache) == 2
 | 
						|
 | 
						|
    @mock.patch("immich_ml.models.cache.OptimisticLock", autospec=True)
 | 
						|
    async def test_model_ttl(self, mock_lock_cls: mock.Mock, mock_get_model: mock.Mock) -> None:
 | 
						|
        model_cache = ModelCache()
 | 
						|
        await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
 | 
						|
        mock_lock_cls.return_value.__aenter__.return_value.cas.assert_called_with(mock.ANY, ttl=100)
 | 
						|
 | 
						|
    @mock.patch("immich_ml.models.cache.SimpleMemoryCache.expire")
 | 
						|
    async def test_revalidate_get(self, mock_cache_expire: mock.Mock, mock_get_model: mock.Mock) -> None:
 | 
						|
        model_cache = ModelCache(revalidate=True)
 | 
						|
        await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
 | 
						|
        await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
 | 
						|
        mock_cache_expire.assert_called_once_with(mock.ANY, 100)
 | 
						|
 | 
						|
    async def test_profiling(self, mock_get_model: mock.Mock) -> None:
 | 
						|
        model_cache = ModelCache(profiling=True)
 | 
						|
        await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
 | 
						|
        profiling = await model_cache.get_profiling()
 | 
						|
        assert isinstance(profiling, dict)
 | 
						|
        assert profiling == model_cache.cache.profiling
 | 
						|
 | 
						|
    async def test_loads_mclip(self) -> None:
 | 
						|
        model_cache = ModelCache()
 | 
						|
 | 
						|
        model = await model_cache.get("XLM-Roberta-Large-Vit-B-32", ModelType.TEXTUAL, ModelTask.SEARCH)
 | 
						|
 | 
						|
        assert isinstance(model, MClipTextualEncoder)
 | 
						|
        assert model.model_name == "XLM-Roberta-Large-Vit-B-32"
 | 
						|
 | 
						|
    async def test_raises_exception_if_invalid_model_type(self) -> None:
 | 
						|
        invalid: Any = SimpleNamespace(value="invalid")
 | 
						|
        model_cache = ModelCache()
 | 
						|
 | 
						|
        with pytest.raises(ValueError):
 | 
						|
            await model_cache.get("XLM-Roberta-Large-Vit-B-32", ModelType.TEXTUAL, invalid)
 | 
						|
 | 
						|
    async def test_raises_exception_if_unknown_model_name(self) -> None:
 | 
						|
        model_cache = ModelCache()
 | 
						|
 | 
						|
        with pytest.raises(ValueError):
 | 
						|
            await model_cache.get("test_model_name", ModelType.TEXTUAL, ModelTask.SEARCH)
 | 
						|
 | 
						|
    async def test_preloads_clip_models(self, monkeypatch: MonkeyPatch, mock_get_model: mock.Mock) -> None:
 | 
						|
        os.environ["MACHINE_LEARNING_PRELOAD__CLIP__TEXTUAL"] = "ViT-B-32__openai"
 | 
						|
        os.environ["MACHINE_LEARNING_PRELOAD__CLIP__VISUAL"] = "ViT-B-32__openai"
 | 
						|
 | 
						|
        settings = Settings()
 | 
						|
        assert settings.preload is not None
 | 
						|
        assert settings.preload.clip.textual == "ViT-B-32__openai"
 | 
						|
        assert settings.preload.clip.visual == "ViT-B-32__openai"
 | 
						|
 | 
						|
        model_cache = ModelCache()
 | 
						|
        monkeypatch.setattr("immich_ml.main.model_cache", model_cache)
 | 
						|
 | 
						|
        await preload_models(settings.preload)
 | 
						|
        mock_get_model.assert_has_calls(
 | 
						|
            [
 | 
						|
                mock.call("ViT-B-32__openai", ModelType.TEXTUAL, ModelTask.SEARCH),
 | 
						|
                mock.call("ViT-B-32__openai", ModelType.VISUAL, ModelTask.SEARCH),
 | 
						|
            ],
 | 
						|
            any_order=True,
 | 
						|
        )
 | 
						|
 | 
						|
    async def test_preloads_facial_recognition_models(
 | 
						|
        self, monkeypatch: MonkeyPatch, mock_get_model: mock.Mock
 | 
						|
    ) -> None:
 | 
						|
        os.environ["MACHINE_LEARNING_PRELOAD__FACIAL_RECOGNITION__DETECTION"] = "buffalo_s"
 | 
						|
        os.environ["MACHINE_LEARNING_PRELOAD__FACIAL_RECOGNITION__RECOGNITION"] = "buffalo_s"
 | 
						|
 | 
						|
        settings = Settings()
 | 
						|
        assert settings.preload is not None
 | 
						|
        assert settings.preload.facial_recognition.detection == "buffalo_s"
 | 
						|
        assert settings.preload.facial_recognition.recognition == "buffalo_s"
 | 
						|
 | 
						|
        model_cache = ModelCache()
 | 
						|
        monkeypatch.setattr("immich_ml.main.model_cache", model_cache)
 | 
						|
 | 
						|
        await preload_models(settings.preload)
 | 
						|
        mock_get_model.assert_has_calls(
 | 
						|
            [
 | 
						|
                mock.call("buffalo_s", ModelType.DETECTION, ModelTask.FACIAL_RECOGNITION),
 | 
						|
                mock.call("buffalo_s", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION),
 | 
						|
            ],
 | 
						|
            any_order=True,
 | 
						|
        )
 | 
						|
 | 
						|
    async def test_preloads_all_models(self, monkeypatch: MonkeyPatch, mock_get_model: mock.Mock) -> None:
 | 
						|
        os.environ["MACHINE_LEARNING_PRELOAD__CLIP__TEXTUAL"] = "ViT-B-32__openai"
 | 
						|
        os.environ["MACHINE_LEARNING_PRELOAD__CLIP__VISUAL"] = "ViT-B-32__openai"
 | 
						|
        os.environ["MACHINE_LEARNING_PRELOAD__FACIAL_RECOGNITION__RECOGNITION"] = "buffalo_s"
 | 
						|
        os.environ["MACHINE_LEARNING_PRELOAD__FACIAL_RECOGNITION__DETECTION"] = "buffalo_s"
 | 
						|
 | 
						|
        settings = Settings()
 | 
						|
        assert settings.preload is not None
 | 
						|
        assert settings.preload.clip.visual == "ViT-B-32__openai"
 | 
						|
        assert settings.preload.clip.textual == "ViT-B-32__openai"
 | 
						|
        assert settings.preload.facial_recognition.recognition == "buffalo_s"
 | 
						|
        assert settings.preload.facial_recognition.detection == "buffalo_s"
 | 
						|
 | 
						|
        model_cache = ModelCache()
 | 
						|
        monkeypatch.setattr("immich_ml.main.model_cache", model_cache)
 | 
						|
 | 
						|
        await preload_models(settings.preload)
 | 
						|
        mock_get_model.assert_has_calls(
 | 
						|
            [
 | 
						|
                mock.call("ViT-B-32__openai", ModelType.TEXTUAL, ModelTask.SEARCH),
 | 
						|
                mock.call("ViT-B-32__openai", ModelType.VISUAL, ModelTask.SEARCH),
 | 
						|
                mock.call("buffalo_s", ModelType.DETECTION, ModelTask.FACIAL_RECOGNITION),
 | 
						|
                mock.call("buffalo_s", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION),
 | 
						|
            ],
 | 
						|
            any_order=True,
 | 
						|
        )
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.asyncio
 | 
						|
class TestLoad:
 | 
						|
    async def test_load(self) -> None:
 | 
						|
        mock_model = mock.Mock(spec=InferenceModel)
 | 
						|
        mock_model.loaded = False
 | 
						|
        mock_model.load_attempts = 0
 | 
						|
 | 
						|
        res = await load(mock_model)
 | 
						|
 | 
						|
        assert res is mock_model
 | 
						|
        mock_model.load.assert_called_once()
 | 
						|
        mock_model.clear_cache.assert_not_called()
 | 
						|
 | 
						|
    async def test_load_returns_model_if_loaded(self) -> None:
 | 
						|
        mock_model = mock.Mock(spec=InferenceModel)
 | 
						|
        mock_model.loaded = True
 | 
						|
 | 
						|
        res = await load(mock_model)
 | 
						|
 | 
						|
        assert res is mock_model
 | 
						|
        mock_model.load.assert_not_called()
 | 
						|
 | 
						|
    async def test_load_clears_cache_and_retries_if_os_error(self) -> None:
 | 
						|
        mock_model = mock.Mock(spec=InferenceModel)
 | 
						|
        mock_model.model_name = "test_model_name"
 | 
						|
        mock_model.model_type = ModelType.VISUAL
 | 
						|
        mock_model.model_task = ModelTask.SEARCH
 | 
						|
        mock_model.load.side_effect = [OSError, None]
 | 
						|
        mock_model.loaded = False
 | 
						|
        mock_model.load_attempts = 0
 | 
						|
 | 
						|
        res = await load(mock_model)
 | 
						|
 | 
						|
        assert res is mock_model
 | 
						|
        mock_model.clear_cache.assert_called_once()
 | 
						|
        assert mock_model.load.call_count == 2
 | 
						|
 | 
						|
    async def test_load_raises_if_os_error_and_already_retried(self) -> None:
 | 
						|
        mock_model = mock.Mock(spec=InferenceModel)
 | 
						|
        mock_model.model_name = "test_model_name"
 | 
						|
        mock_model.model_type = ModelType.VISUAL
 | 
						|
        mock_model.model_task = ModelTask.SEARCH
 | 
						|
        mock_model.loaded = False
 | 
						|
        mock_model.load_attempts = 2
 | 
						|
 | 
						|
        with pytest.raises(HTTPException):
 | 
						|
            await load(mock_model)
 | 
						|
 | 
						|
        mock_model.clear_cache.assert_not_called()
 | 
						|
        mock_model.load.assert_not_called()
 | 
						|
 | 
						|
    async def test_falls_back_to_onnx_if_other_format_does_not_exist(self, warning: mock.Mock) -> None:
 | 
						|
        mock_model = mock.Mock(spec=InferenceModel)
 | 
						|
        mock_model.model_name = "test_model_name"
 | 
						|
        mock_model.model_type = ModelType.VISUAL
 | 
						|
        mock_model.model_task = ModelTask.SEARCH
 | 
						|
        mock_model.model_format = ModelFormat.ARMNN
 | 
						|
        mock_model.loaded = False
 | 
						|
        mock_model.load_attempts = 0
 | 
						|
        error = FileNotFoundError()
 | 
						|
        mock_model.load.side_effect = [error, None]
 | 
						|
 | 
						|
        await load(mock_model)
 | 
						|
 | 
						|
        mock_model.clear_cache.assert_not_called()
 | 
						|
        assert mock_model.load.call_count == 2
 | 
						|
        warning.assert_called_once_with(
 | 
						|
            "ARMNN is available, but model 'test_model_name' does not support it.", exc_info=error
 | 
						|
        )
 | 
						|
        mock_model.model_format = ModelFormat.ONNX
 | 
						|
 | 
						|
 | 
						|
def test_root_endpoint(deployed_app: TestClient) -> None:
 | 
						|
    response = deployed_app.get("http://localhost:3003")
 | 
						|
 | 
						|
    body = response.json()
 | 
						|
    assert response.status_code == 200
 | 
						|
    assert body == {"message": "Immich ML"}
 | 
						|
 | 
						|
 | 
						|
def test_ping_endpoint(deployed_app: TestClient) -> None:
 | 
						|
    response = deployed_app.get("http://localhost:3003/ping")
 | 
						|
 | 
						|
    assert response.status_code == 200
 | 
						|
    assert response.text == "pong"
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.skipif(
 | 
						|
    not settings.test_full,
 | 
						|
    reason="More time-consuming since it deploys the app and loads models.",
 | 
						|
)
 | 
						|
class TestPredictionEndpoints:
 | 
						|
    def test_clip_image_endpoint(
 | 
						|
        self, pil_image: Image.Image, responses: dict[str, Any], deployed_app: TestClient
 | 
						|
    ) -> None:
 | 
						|
        byte_image = BytesIO()
 | 
						|
        pil_image.save(byte_image, format="jpeg")
 | 
						|
        expected = responses["clip"]["image"]
 | 
						|
 | 
						|
        response = deployed_app.post(
 | 
						|
            "http://localhost:3003/predict",
 | 
						|
            data={"entries": json.dumps({"clip": {"visual": {"modelName": "ViT-B-32__openai"}}})},
 | 
						|
            files={"image": byte_image.getvalue()},
 | 
						|
        )
 | 
						|
 | 
						|
        actual = response.json()
 | 
						|
        assert response.status_code == 200
 | 
						|
        assert isinstance(actual, dict)
 | 
						|
        embedding = actual.get("clip", None)
 | 
						|
        assert isinstance(embedding, str)
 | 
						|
        parsed_embedding = orjson.loads(embedding)
 | 
						|
        assert np.allclose(expected, parsed_embedding)
 | 
						|
 | 
						|
    def test_clip_text_endpoint(self, responses: dict[str, Any], deployed_app: TestClient) -> None:
 | 
						|
        expected = responses["clip"]["text"]
 | 
						|
 | 
						|
        response = deployed_app.post(
 | 
						|
            "http://localhost:3003/predict",
 | 
						|
            data={
 | 
						|
                "entries": json.dumps(
 | 
						|
                    {
 | 
						|
                        "clip": {"textual": {"modelName": "ViT-B-32__openai"}},
 | 
						|
                    },
 | 
						|
                ),
 | 
						|
                "text": "test search query",
 | 
						|
            },
 | 
						|
        )
 | 
						|
 | 
						|
        actual = response.json()
 | 
						|
        assert response.status_code == 200
 | 
						|
        assert isinstance(actual, dict)
 | 
						|
        embedding = actual.get("clip", None)
 | 
						|
        assert isinstance(embedding, str)
 | 
						|
        parsed_embedding = orjson.loads(embedding)
 | 
						|
        assert np.allclose(expected, parsed_embedding)
 | 
						|
 | 
						|
    def test_face_endpoint(self, pil_image: Image.Image, responses: dict[str, Any], deployed_app: TestClient) -> None:
 | 
						|
        byte_image = BytesIO()
 | 
						|
        pil_image.save(byte_image, format="jpeg")
 | 
						|
 | 
						|
        response = deployed_app.post(
 | 
						|
            "http://localhost:3003/predict",
 | 
						|
            data={
 | 
						|
                "entries": json.dumps(
 | 
						|
                    {
 | 
						|
                        "facial-recognition": {
 | 
						|
                            "detection": {"modelName": "buffalo_l", "options": {"minScore": 0.034}},
 | 
						|
                            "recognition": {"modelName": "buffalo_l"},
 | 
						|
                        }
 | 
						|
                    }
 | 
						|
                )
 | 
						|
            },
 | 
						|
            files={"image": byte_image.getvalue()},
 | 
						|
        )
 | 
						|
 | 
						|
        actual = response.json()
 | 
						|
        assert response.status_code == 200
 | 
						|
        assert isinstance(actual, dict)
 | 
						|
        assert actual.get("imageHeight", None) == responses["imageHeight"]
 | 
						|
        assert actual.get("imageWidth", None) == responses["imageWidth"]
 | 
						|
        assert "facial-recognition" in actual and isinstance(actual["facial-recognition"], list)
 | 
						|
        assert len(actual["facial-recognition"]) == len(responses["facial-recognition"])
 | 
						|
 | 
						|
        for expected_face, actual_face in zip(responses["facial-recognition"], actual["facial-recognition"]):
 | 
						|
            assert expected_face["boundingBox"] == actual_face["boundingBox"]
 | 
						|
            embedding = actual_face.get("embedding", None)
 | 
						|
            assert isinstance(embedding, str)
 | 
						|
            parsed_embedding = orjson.loads(embedding)
 | 
						|
            assert np.allclose(expected_face["embedding"], parsed_embedding)
 | 
						|
            assert np.allclose(expected_face["score"], actual_face["score"])
 |