1
0
forked from Cutlery/immich

Compare commits

...

1 Commits

Author SHA1 Message Date
mertalev
c6a1d7b4f7
modularize model classes 2024-03-31 23:51:02 -04:00
17 changed files with 674 additions and 473 deletions

View File

@ -12,8 +12,6 @@ from rich.logging import RichHandler
from uvicorn import Server from uvicorn import Server
from uvicorn.workers import UvicornWorker from uvicorn.workers import UvicornWorker
from .schemas import ModelType
class PreloadModelData(BaseModel): class PreloadModelData(BaseModel):
clip: str | None clip: str | None
@ -21,7 +19,7 @@ class PreloadModelData(BaseModel):
class Settings(BaseSettings): class Settings(BaseSettings):
cache_folder: str = "/cache" cache_folder: Path = Path("/cache")
model_ttl: int = 300 model_ttl: int = 300
model_ttl_poll_s: int = 10 model_ttl_poll_s: int = 10
host: str = "0.0.0.0" host: str = "0.0.0.0"
@ -55,14 +53,6 @@ def clean_name(model_name: str) -> str:
return model_name.split("/")[-1].translate(_clean_name) return model_name.split("/")[-1].translate(_clean_name)
def get_cache_dir(model_name: str, model_type: ModelType) -> Path:
return Path(settings.cache_folder) / model_type.value / clean_name(model_name)
def get_hf_model_name(model_name: str) -> str:
return f"immich-app/{clean_name(model_name)}"
LOG_LEVELS: dict[str, int] = { LOG_LEVELS: dict[str, int] = {
"critical": logging.ERROR, "critical": logging.ERROR,
"error": logging.ERROR, "error": logging.ERROR,

View File

@ -6,22 +6,21 @@ import threading
import time import time
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial
from typing import Any, AsyncGenerator, Callable, Iterator from typing import Any, AsyncGenerator, Callable, Iterator
from zipfile import BadZipFile
import orjson import orjson
from fastapi import Depends, FastAPI, Form, HTTPException, UploadFile from fastapi import Depends, FastAPI, Form, HTTPException, UploadFile
from fastapi.responses import ORJSONResponse from fastapi.responses import ORJSONResponse
from onnxruntime.capi.onnxruntime_pybind11_state import InvalidProtobuf, NoSuchFile
from starlette.formparsers import MultiPartParser from starlette.formparsers import MultiPartParser
from app.models.base import InferenceModel
from .config import PreloadModelData, log, settings from .config import PreloadModelData, log, settings
from .models.cache import ModelCache from .models.cache import ModelCache
from .schemas import ( from .schemas import (
MessageResponse, MessageResponse,
ModelTask,
ModelType, ModelType,
Predictor,
TextResponse, TextResponse,
) )
@ -63,12 +62,21 @@ async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]:
gc.collect() gc.collect()
async def preload_models(preload_models: PreloadModelData) -> None: async def preload_models(preload: PreloadModelData) -> None:
log.info(f"Preloading models: {preload_models}") log.info(f"Preloading models: {preload}")
if preload_models.clip is not None: if preload.clip is not None:
await load(await model_cache.get(preload_models.clip, ModelType.CLIP)) model = await model_cache.get(preload.clip, ModelType.TEXTUAL, ModelTask.SEARCH)
if preload_models.facial_recognition is not None: await load(model)
await load(await model_cache.get(preload_models.facial_recognition, ModelType.FACIAL_RECOGNITION))
model = await model_cache.get(preload.clip, ModelType.VISUAL, ModelTask.SEARCH)
await load(model)
if preload.facial_recognition is not None:
model = await model_cache.get(preload.facial_recognition, ModelType.DETECTION, ModelTask.FACIAL_RECOGNITION)
await load(model)
model = await model_cache.get(preload.facial_recognition, ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION)
await load(model)
def update_state() -> Iterator[None]: def update_state() -> Iterator[None]:
@ -98,6 +106,7 @@ def ping() -> str:
async def predict( async def predict(
model_name: str = Form(alias="modelName"), model_name: str = Form(alias="modelName"),
model_type: ModelType = Form(alias="modelType"), model_type: ModelType = Form(alias="modelType"),
model_task: ModelTask = Form(alias="modelTask"),
options: str = Form(default="{}"), options: str = Form(default="{}"),
text: str | None = Form(default=None), text: str | None = Form(default=None),
image: UploadFile | None = None, image: UploadFile | None = None,
@ -113,39 +122,30 @@ async def predict(
except orjson.JSONDecodeError: except orjson.JSONDecodeError:
raise HTTPException(400, f"Invalid options JSON: {options}") raise HTTPException(400, f"Invalid options JSON: {options}")
model = await load(await model_cache.get(model_name, model_type, ttl=settings.model_ttl, **kwargs)) model = await model_cache.get(model_name, model_type, model_task, ttl=settings.model_ttl, **kwargs)
model.configure(**kwargs) model = await load(model)
outputs = await run(model.predict, inputs) outputs = await run(model.predict, inputs, **kwargs)
return ORJSONResponse(outputs) return ORJSONResponse(outputs)
async def run(func: Callable[..., Any], inputs: Any) -> Any: async def run(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
if thread_pool is None: if thread_pool is None:
return func(inputs) return func(*args, **kwargs)
return await asyncio.get_running_loop().run_in_executor(thread_pool, func, inputs) partial_func = partial(func, *args, **kwargs)
return await asyncio.get_running_loop().run_in_executor(thread_pool, partial_func)
async def load(model: InferenceModel) -> InferenceModel: async def load(model: Predictor) -> Predictor:
if model.loaded: if model.loaded:
return model return model
def _load(model: InferenceModel) -> None: def _load(model: Predictor) -> Predictor:
with lock: with lock:
model.load() model.load()
return model
try: await run(_load, model)
await run(_load, model) return model
return model
except (OSError, InvalidProtobuf, BadZipFile, NoSuchFile):
log.warning(
(
f"Failed to load {model.model_type.replace('_', ' ')} model '{model.model_name}'."
"Clearing cache and retrying."
)
)
model.clear_cache()
await run(_load, model)
return model
async def idle_shutdown_task() -> None: async def idle_shutdown_task() -> None:

View File

@ -1,24 +1,31 @@
from typing import Any from typing import Any
from app.schemas import ModelType from app.models.clip.textual import MClipTextualEncoder, OpenClipTextualEncoder
from app.models.clip.visual import OpenClipVisualEncoder
from app.schemas import ModelSource, ModelTask, ModelType, Predictor
from .base import InferenceModel from .constants import get_model_source
from .clip import MCLIPEncoder, OpenCLIPEncoder from .facial_recognition.detection import FaceDetector
from .constants import is_insightface, is_mclip, is_openclip from .facial_recognition.recognition import FaceRecognizer
from .facial_recognition import FaceRecognizer
def from_model_type(model_type: ModelType, model_name: str, **model_kwargs: Any) -> InferenceModel: def from_model_type(model_name: str, model_type: ModelType, model_task: ModelTask, **model_kwargs: Any) -> Predictor:
match model_type: source = get_model_source(model_name)
case ModelType.CLIP: match source, model_type, model_task:
if is_openclip(model_name): case ModelSource.OPENCLIP | ModelSource.MCLIP, ModelType.VISUAL, ModelTask.SEARCH:
return OpenCLIPEncoder(model_name, **model_kwargs) return OpenClipVisualEncoder(model_name, **model_kwargs)
elif is_mclip(model_name):
return MCLIPEncoder(model_name, **model_kwargs) case ModelSource.OPENCLIP, ModelType.TEXTUAL, ModelTask.SEARCH:
case ModelType.FACIAL_RECOGNITION: return OpenClipTextualEncoder(model_name, **model_kwargs)
if is_insightface(model_name):
return FaceRecognizer(model_name, **model_kwargs) case ModelSource.MCLIP, ModelType.TEXTUAL, ModelTask.SEARCH:
return MClipTextualEncoder(model_name, **model_kwargs)
case ModelSource.INSIGHTFACE, ModelType.DETECTION, ModelTask.FACIAL_RECOGNITION:
return FaceDetector(model_name, **model_kwargs)
case ModelSource.INSIGHTFACE, ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION:
return FaceRecognizer(model_name, **model_kwargs)
case _: case _:
raise ValueError(f"Unknown model type {model_type}") raise ValueError(f"Unknown model combination: {source}, {model_type}, {model_task}")
raise ValueError(f"Unknown {model_type} model {model_name}")

View File

@ -3,21 +3,24 @@ from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from pathlib import Path from pathlib import Path
from shutil import rmtree from shutil import rmtree
from typing import Any from typing import Any, ClassVar
from zipfile import BadZipFile
import onnxruntime as ort import onnxruntime as ort
from onnxruntime.capi.onnxruntime_pybind11_state import InvalidProtobuf, NoSuchFile
from huggingface_hub import snapshot_download from huggingface_hub import snapshot_download
import ann.ann import ann.ann
from app.models.constants import SUPPORTED_PROVIDERS from app.models.constants import SUPPORTED_PROVIDERS
from ..config import get_cache_dir, get_hf_model_name, log, settings from ..config import clean_name, log, settings
from ..schemas import ModelRuntime, ModelType from ..schemas import ModelFormat, ModelSession, ModelTask, ModelType
from .ann import AnnSession from .ann import AnnSession
class InferenceModel(ABC): class InferenceModel(ABC):
_model_type: ModelType _model_task: ClassVar[ModelTask]
_model_type: ClassVar[ModelType]
def __init__( def __init__(
self, self,
@ -26,16 +29,16 @@ class InferenceModel(ABC):
providers: list[str] | None = None, providers: list[str] | None = None,
provider_options: list[dict[str, Any]] | None = None, provider_options: list[dict[str, Any]] | None = None,
sess_options: ort.SessionOptions | None = None, sess_options: ort.SessionOptions | None = None,
preferred_runtime: ModelRuntime | None = None, preferred_format: ModelFormat | None = None,
**model_kwargs: Any, **model_kwargs: Any,
) -> None: ) -> None:
self.loaded = False self.loaded = False
self.model_name = model_name self.model_name = clean_name(model_name)
self.cache_dir = Path(cache_dir) if cache_dir is not None else self.cache_dir_default self.cache_dir = Path(cache_dir) if cache_dir is not None else self.cache_dir_default
self.providers = providers if providers is not None else self.providers_default self.providers = providers if providers is not None else self.providers_default
self.provider_options = provider_options if provider_options is not None else self.provider_options_default self.provider_options = provider_options if provider_options is not None else self.provider_options_default
self.sess_options = sess_options if sess_options is not None else self.sess_options_default self.sess_options = sess_options if sess_options is not None else self.sess_options_default
self.preferred_runtime = preferred_runtime if preferred_runtime is not None else self.preferred_runtime_default self.preferred_runtime = preferred_format if preferred_format is not None else self.preferred_runtime_default
def download(self) -> None: def download(self) -> None:
if not self.cached: if not self.cached:
@ -47,35 +50,47 @@ class InferenceModel(ABC):
def load(self) -> None: def load(self) -> None:
if self.loaded: if self.loaded:
return return
self.download()
log.info(f"Loading {self.model_type.replace('-', ' ')} model '{self.model_name}' to memory") try:
self._load() self.download()
log.info(f"Loading {self.model_type.replace('-', ' ')} model '{self.model_name}' to memory")
self.session = self._load()
except (OSError, InvalidProtobuf, BadZipFile, NoSuchFile):
log.warning(
(
f"Failed to load {self.model_type.replace('_', ' ')} model '{self.model_name}'."
"Clearing cache and retrying."
)
)
self.clear_cache()
self.download()
self.session = self._load()
self.loaded = True self.loaded = True
def predict(self, inputs: Any, **model_kwargs: Any) -> Any: def predict(self, inputs: Any, **model_kwargs: Any) -> Any:
self.load() self.load()
if model_kwargs: if model_kwargs:
self.configure(**model_kwargs) self.configure(**model_kwargs)
return self._predict(inputs) return self._predict(inputs, **model_kwargs)
@abstractmethod @abstractmethod
def _predict(self, inputs: Any) -> Any: ... def _predict(self, inputs: Any, **model_kwargs: Any) -> Any: ...
def configure(self, **model_kwargs: Any) -> None: def configure(self, **kwargs: Any) -> None:
pass pass
def _download(self) -> None: def _download(self) -> None:
ignore_patterns = [] if self.preferred_runtime == ModelRuntime.ARMNN else ["*.armnn"] ignore_patterns = [] if self.preferred_runtime == ModelFormat.ARMNN else ["*.armnn"]
snapshot_download( snapshot_download(
get_hf_model_name(self.model_name), f"immich-app/{clean_name(self.model_name)}",
cache_dir=self.cache_dir, cache_dir=self.cache_dir,
local_dir=self.cache_dir, local_dir=self.cache_dir,
local_dir_use_symlinks=False, local_dir_use_symlinks=False,
ignore_patterns=ignore_patterns, ignore_patterns=ignore_patterns,
) )
@abstractmethod def _load(self) -> ModelSession:
def _load(self) -> None: ... return self._make_session(self.model_path)
def clear_cache(self) -> None: def clear_cache(self) -> None:
if not self.cache_dir.exists(): if not self.cache_dir.exists():
@ -99,7 +114,7 @@ class InferenceModel(ABC):
self.cache_dir.unlink() self.cache_dir.unlink()
self.cache_dir.mkdir(parents=True, exist_ok=True) self.cache_dir.mkdir(parents=True, exist_ok=True)
def _make_session(self, model_path: Path) -> AnnSession | ort.InferenceSession: def _make_session(self, model_path: Path) -> ModelSession:
if not model_path.is_file(): if not model_path.is_file():
onnx_path = model_path.with_suffix(".onnx") onnx_path = model_path.with_suffix(".onnx")
if not onnx_path.is_file(): if not onnx_path.is_file():
@ -124,6 +139,14 @@ class InferenceModel(ABC):
raise ValueError(f"Unsupported model file type: {model_path.suffix}") raise ValueError(f"Unsupported model file type: {model_path.suffix}")
return session return session
@property
def model_path(self) -> Path:
return self.cache_dir / self.model_type.value / f"model.{self.preferred_runtime}"
@property
def model_task(self) -> ModelTask:
return self._model_task
@property @property
def model_type(self) -> ModelType: def model_type(self) -> ModelType:
return self._model_type return self._model_type
@ -138,11 +161,11 @@ class InferenceModel(ABC):
@property @property
def cache_dir_default(self) -> Path: def cache_dir_default(self) -> Path:
return get_cache_dir(self.model_name, self.model_type) return settings.cache_folder / self.model_task.value / self.model_name
@property @property
def cached(self) -> bool: def cached(self) -> bool:
return self.cache_dir.is_dir() and any(self.cache_dir.iterdir()) return self.model_path.is_file()
@property @property
def providers(self) -> list[str]: def providers(self) -> list[str]:
@ -226,14 +249,14 @@ class InferenceModel(ABC):
return sess_options return sess_options
@property @property
def preferred_runtime(self) -> ModelRuntime: def preferred_runtime(self) -> ModelFormat:
return self._preferred_runtime return self._preferred_runtime
@preferred_runtime.setter @preferred_runtime.setter
def preferred_runtime(self, preferred_runtime: ModelRuntime) -> None: def preferred_runtime(self, preferred_runtime: ModelFormat) -> None:
log.debug(f"Setting preferred runtime to {preferred_runtime}") log.debug(f"Setting preferred runtime to {preferred_runtime}")
self._preferred_runtime = preferred_runtime self._preferred_runtime = preferred_runtime
@property @property
def preferred_runtime_default(self) -> ModelRuntime: def preferred_runtime_default(self) -> ModelFormat:
return ModelRuntime.ARMNN if ann.ann.is_available and settings.ann else ModelRuntime.ONNX return ModelFormat.ARMNN if ann.ann.is_available and settings.ann else ModelFormat.ONNX

View File

@ -5,9 +5,9 @@ from aiocache.lock import OptimisticLock
from aiocache.plugins import TimingPlugin from aiocache.plugins import TimingPlugin
from app.models import from_model_type from app.models import from_model_type
from app.models.facial_recognition.pipeline import FacialRecognitionPipeline
from ..schemas import ModelType, has_profiling from ..schemas import ModelTask, ModelType, Predictor, has_profiling
from .base import InferenceModel
class ModelCache: class ModelCache:
@ -31,11 +31,13 @@ class ModelCache:
if profiling: if profiling:
plugins.append(TimingPlugin()) plugins.append(TimingPlugin())
self.revalidate_enable = revalidate self.should_revalidate = revalidate
self.cache = SimpleMemoryCache(timeout=timeout, plugins=plugins, namespace=None) self.cache = SimpleMemoryCache(timeout=timeout, plugins=plugins, namespace=None)
async def get(self, model_name: str, model_type: ModelType, **model_kwargs: Any) -> InferenceModel: async def get(
self, model_name: str, model_type: ModelType, model_task: ModelTask, **model_kwargs: Any
) -> Predictor:
""" """
Args: Args:
model_name: Name of model in the model hub used for the task. model_name: Name of model in the model hub used for the task.
@ -45,17 +47,38 @@ class ModelCache:
model: The requested model. model: The requested model.
""" """
key = f"{model_name}{model_type.value}{model_kwargs.get('mode', '')}" key = f"{model_name}{model_type.value}{model_task.value}"
async with OptimisticLock(self.cache, key) as lock: async with OptimisticLock(self.cache, key) as lock:
model: InferenceModel | None = await self.cache.get(key) model: Predictor | None = await self.cache.get(key)
if model is None: if model is None:
model = from_model_type(model_type, model_name, **model_kwargs) if model_type == ModelType.PIPELINE:
model = await self._get_pipeline(model_name, model_task, **model_kwargs)
else:
model = from_model_type(model_name, model_type, model_task, **model_kwargs)
await lock.cas(model, ttl=model_kwargs.get("ttl", None)) await lock.cas(model, ttl=model_kwargs.get("ttl", None))
elif self.revalidate_enable: elif self.should_revalidate:
await self.revalidate(key, model_kwargs.get("ttl", None)) await self.revalidate(key, model_kwargs.get("ttl", None))
return model return model
async def _get_pipeline(self, model_name: str, model_task: ModelTask, **model_kwargs: Any) -> Predictor:
"""
Args:
model_name: Name of model in the model hub used for the task.
model_type: Model type or task, which determines which model zoo is used.
Returns:
model: The requested model.
"""
match model_task:
case ModelTask.FACIAL_RECOGNITION:
det_model: Any = await self.get(model_name, ModelType.DETECTION, model_task, **model_kwargs)
rec_model: Any = await self.get(model_name, ModelType.RECOGNITION, model_task, **model_kwargs)
return FacialRecognitionPipeline(det_model, rec_model)
case _:
raise ValueError(f"Unknown model task: {model_task}")
async def get_profiling(self) -> dict[str, float] | None: async def get_profiling(self) -> dict[str, float] | None:
if not has_profiling(self.cache): if not has_profiling(self.cache):
return None return None

View File

@ -1,189 +0,0 @@
import json
from abc import abstractmethod
from functools import cached_property
from io import BytesIO
from pathlib import Path
from typing import Any, Literal
import numpy as np
from numpy.typing import NDArray
from PIL import Image
from tokenizers import Encoding, Tokenizer
from app.config import clean_name, log
from app.models.transforms import crop, get_pil_resampling, normalize, resize, to_numpy
from app.schemas import ModelType
from .base import InferenceModel
class BaseCLIPEncoder(InferenceModel):
_model_type = ModelType.CLIP
def __init__(
self,
model_name: str,
cache_dir: Path | str | None = None,
mode: Literal["text", "vision"] | None = None,
**model_kwargs: Any,
) -> None:
self.mode = mode
super().__init__(model_name, cache_dir, **model_kwargs)
def _load(self) -> None:
if self.mode == "text" or self.mode is None:
log.debug(f"Loading clip text model '{self.model_name}'")
self.text_model = self._make_session(self.textual_path)
log.debug(f"Loaded clip text model '{self.model_name}'")
if self.mode == "vision" or self.mode is None:
log.debug(f"Loading clip vision model '{self.model_name}'")
self.vision_model = self._make_session(self.visual_path)
log.debug(f"Loaded clip vision model '{self.model_name}'")
def _predict(self, image_or_text: Image.Image | str) -> NDArray[np.float32]:
if isinstance(image_or_text, bytes):
image_or_text = Image.open(BytesIO(image_or_text))
match image_or_text:
case Image.Image():
if self.mode == "text":
raise TypeError("Cannot encode image as text-only model")
outputs: NDArray[np.float32] = self.vision_model.run(None, self.transform(image_or_text))[0][0]
case str():
if self.mode == "vision":
raise TypeError("Cannot encode text as vision-only model")
outputs = self.text_model.run(None, self.tokenize(image_or_text))[0][0]
case _:
raise TypeError(f"Expected Image or str, but got: {type(image_or_text)}")
return outputs
@abstractmethod
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
pass
@abstractmethod
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
pass
@property
def textual_dir(self) -> Path:
return self.cache_dir / "textual"
@property
def visual_dir(self) -> Path:
return self.cache_dir / "visual"
@property
def model_cfg_path(self) -> Path:
return self.cache_dir / "config.json"
@property
def textual_path(self) -> Path:
return self.textual_dir / f"model.{self.preferred_runtime}"
@property
def visual_path(self) -> Path:
return self.visual_dir / f"model.{self.preferred_runtime}"
@property
def tokenizer_file_path(self) -> Path:
return self.textual_dir / "tokenizer.json"
@property
def tokenizer_cfg_path(self) -> Path:
return self.textual_dir / "tokenizer_config.json"
@property
def preprocess_cfg_path(self) -> Path:
return self.visual_dir / "preprocess_cfg.json"
@property
def cached(self) -> bool:
return self.textual_path.is_file() and self.visual_path.is_file()
@cached_property
def model_cfg(self) -> dict[str, Any]:
log.debug(f"Loading model config for CLIP model '{self.model_name}'")
model_cfg: dict[str, Any] = json.load(self.model_cfg_path.open())
log.debug(f"Loaded model config for CLIP model '{self.model_name}'")
return model_cfg
@cached_property
def tokenizer_file(self) -> dict[str, Any]:
log.debug(f"Loading tokenizer file for CLIP model '{self.model_name}'")
tokenizer_file: dict[str, Any] = json.load(self.tokenizer_file_path.open())
log.debug(f"Loaded tokenizer file for CLIP model '{self.model_name}'")
return tokenizer_file
@cached_property
def tokenizer_cfg(self) -> dict[str, Any]:
log.debug(f"Loading tokenizer config for CLIP model '{self.model_name}'")
tokenizer_cfg: dict[str, Any] = json.load(self.tokenizer_cfg_path.open())
log.debug(f"Loaded tokenizer config for CLIP model '{self.model_name}'")
return tokenizer_cfg
@cached_property
def preprocess_cfg(self) -> dict[str, Any]:
log.debug(f"Loading visual preprocessing config for CLIP model '{self.model_name}'")
preprocess_cfg: dict[str, Any] = json.load(self.preprocess_cfg_path.open())
log.debug(f"Loaded visual preprocessing config for CLIP model '{self.model_name}'")
return preprocess_cfg
class OpenCLIPEncoder(BaseCLIPEncoder):
def __init__(
self,
model_name: str,
cache_dir: Path | str | None = None,
mode: Literal["text", "vision"] | None = None,
**model_kwargs: Any,
) -> None:
super().__init__(clean_name(model_name), cache_dir, mode, **model_kwargs)
def _load(self) -> None:
super()._load()
self._load_tokenizer()
size: list[int] | int = self.preprocess_cfg["size"]
self.size = size[0] if isinstance(size, list) else size
self.resampling = get_pil_resampling(self.preprocess_cfg["interpolation"])
self.mean = np.array(self.preprocess_cfg["mean"], dtype=np.float32)
self.std = np.array(self.preprocess_cfg["std"], dtype=np.float32)
def _load_tokenizer(self) -> Tokenizer:
log.debug(f"Loading tokenizer for CLIP model '{self.model_name}'")
text_cfg: dict[str, Any] = self.model_cfg["text_cfg"]
context_length: int = text_cfg.get("context_length", 77)
pad_token: str = self.tokenizer_cfg["pad_token"]
self.tokenizer: Tokenizer = Tokenizer.from_file(self.tokenizer_file_path.as_posix())
pad_id: int = self.tokenizer.token_to_id(pad_token)
self.tokenizer.enable_padding(length=context_length, pad_token=pad_token, pad_id=pad_id)
self.tokenizer.enable_truncation(max_length=context_length)
log.debug(f"Loaded tokenizer for CLIP model '{self.model_name}'")
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
tokens: Encoding = self.tokenizer.encode(text)
return {"text": np.array([tokens.ids], dtype=np.int32)}
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
image = resize(image, self.size)
image = crop(image, self.size)
image_np = to_numpy(image)
image_np = normalize(image_np, self.mean, self.std)
return {"image": np.expand_dims(image_np.transpose(2, 0, 1), 0)}
class MCLIPEncoder(OpenCLIPEncoder):
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
tokens: Encoding = self.tokenizer.encode(text)
return {
"input_ids": np.array([tokens.ids], dtype=np.int32),
"attention_mask": np.array([tokens.attention_mask], dtype=np.int32),
}

View File

@ -0,0 +1,111 @@
import json
from abc import abstractmethod
from functools import cached_property
from pathlib import Path
from typing import Any
import numpy as np
from numpy.typing import NDArray
from tokenizers import Encoding, Tokenizer
from app.config import log
from app.schemas import ModelSession, ModelTask, ModelType
from app.models.base import InferenceModel
class BaseCLIPTextualEncoder(InferenceModel):
_model_task = ModelTask.SEARCH
_model_type = ModelType.TEXTUAL
def _predict(self, inputs: str, **kwargs: Any) -> NDArray[np.float32]:
res: NDArray[np.float32] = self.session.run(None, self.tokenize(inputs))[0][0]
return res
def _load(self) -> ModelSession:
log.debug(f"Loading tokenizer for CLIP model '{self.model_name}'")
self.tokenizer = self._load_tokenizer()
log.debug(f"Loaded tokenizer for CLIP model '{self.model_name}'")
return super()._load()
@abstractmethod
def _load_tokenizer(self) -> Tokenizer:
pass
@abstractmethod
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
pass
@property
def model_dir(self) -> Path:
return self.cache_dir / "textual"
@property
def model_cfg_path(self) -> Path:
return self.cache_dir / "config.json"
@property
def model_path(self) -> Path:
return self.model_dir / f"model.{self.preferred_runtime}"
@property
def tokenizer_file_path(self) -> Path:
return self.model_dir / "tokenizer.json"
@property
def tokenizer_cfg_path(self) -> Path:
return self.model_dir / "tokenizer_config.json"
@property
def cached(self) -> bool:
return self.model_path.is_file()
@cached_property
def model_cfg(self) -> dict[str, Any]:
log.debug(f"Loading model config for CLIP model '{self.model_name}'")
model_cfg: dict[str, Any] = json.load(self.model_cfg_path.open())
log.debug(f"Loaded model config for CLIP model '{self.model_name}'")
return model_cfg
@cached_property
def tokenizer_file(self) -> dict[str, Any]:
log.debug(f"Loading tokenizer file for CLIP model '{self.model_name}'")
tokenizer_file: dict[str, Any] = json.load(self.tokenizer_file_path.open())
log.debug(f"Loaded tokenizer file for CLIP model '{self.model_name}'")
return tokenizer_file
@cached_property
def tokenizer_cfg(self) -> dict[str, Any]:
log.debug(f"Loading tokenizer config for CLIP model '{self.model_name}'")
tokenizer_cfg: dict[str, Any] = json.load(self.tokenizer_cfg_path.open())
log.debug(f"Loaded tokenizer config for CLIP model '{self.model_name}'")
return tokenizer_cfg
class OpenClipTextualEncoder(BaseCLIPTextualEncoder):
def _load_tokenizer(self) -> Tokenizer:
text_cfg: dict[str, Any] = self.model_cfg["text_cfg"]
context_length: int = text_cfg.get("context_length", 77)
pad_token: str = self.tokenizer_cfg["pad_token"]
tokenizer: Tokenizer = Tokenizer.from_file(self.tokenizer_file_path.as_posix())
pad_id: int = tokenizer.token_to_id(pad_token)
tokenizer.enable_padding(length=context_length, pad_token=pad_token, pad_id=pad_id)
tokenizer.enable_truncation(max_length=context_length)
return tokenizer
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
tokens: Encoding = self.tokenizer.encode(text)
return {"text": np.array([tokens.ids], dtype=np.int32)}
class MClipTextualEncoder(OpenClipTextualEncoder):
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
tokens: Encoding = self.tokenizer.encode(text)
return {
"input_ids": np.array([tokens.ids], dtype=np.int32),
"attention_mask": np.array([tokens.attention_mask], dtype=np.int32),
}

View File

@ -0,0 +1,84 @@
import json
from abc import abstractmethod
from functools import cached_property
from io import BytesIO
from pathlib import Path
from typing import Any
import numpy as np
from numpy.typing import NDArray
from PIL import Image
from app.config import log
from app.models.transforms import crop_pil, get_pil_resampling, normalize, resize_pil, to_numpy
from app.schemas import ModelSession, ModelTask, ModelType
from app.models.base import InferenceModel
class BaseCLIPVisualEncoder(InferenceModel):
_model_task = ModelTask.SEARCH
_model_type = ModelType.VISUAL
def _predict(self, inputs: Image.Image | bytes, **kwargs: Any) -> NDArray[np.float32]:
if isinstance(inputs, bytes):
inputs = Image.open(BytesIO(inputs))
res: NDArray[np.float32] = self.session.run(None, self.transform(inputs))[0][0]
return res
@abstractmethod
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
pass
@property
def model_dir(self) -> Path:
return self.cache_dir / "visual"
@property
def model_cfg_path(self) -> Path:
return self.cache_dir / "config.json"
@property
def model_path(self) -> Path:
return self.model_dir / f"model.{self.preferred_runtime}"
@property
def preprocess_cfg_path(self) -> Path:
return self.model_dir / "preprocess_cfg.json"
@property
def cached(self) -> bool:
return self.model_path.is_file()
@cached_property
def model_cfg(self) -> dict[str, Any]:
log.debug(f"Loading model config for CLIP model '{self.model_name}'")
model_cfg: dict[str, Any] = json.load(self.model_cfg_path.open())
log.debug(f"Loaded model config for CLIP model '{self.model_name}'")
return model_cfg
@cached_property
def preprocess_cfg(self) -> dict[str, Any]:
log.debug(f"Loading visual preprocessing config for CLIP model '{self.model_name}'")
preprocess_cfg: dict[str, Any] = json.load(self.preprocess_cfg_path.open())
log.debug(f"Loaded visual preprocessing config for CLIP model '{self.model_name}'")
return preprocess_cfg
class OpenClipVisualEncoder(BaseCLIPVisualEncoder):
def _load(self) -> ModelSession:
size: list[int] | int = self.preprocess_cfg["size"]
self.size = size[0] if isinstance(size, list) else size
self.resampling = get_pil_resampling(self.preprocess_cfg["interpolation"])
self.mean = np.array(self.preprocess_cfg["mean"], dtype=np.float32)
self.std = np.array(self.preprocess_cfg["std"], dtype=np.float32)
return super()._load()
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
image = resize_pil(image, self.size)
image = crop_pil(image, self.size)
image_np = to_numpy(image)
image_np = normalize(image_np, self.mean, self.std)
return {"image": np.expand_dims(image_np.transpose(2, 0, 1), 0)}

View File

@ -1,4 +1,5 @@
from app.config import clean_name from app.config import clean_name
from app.schemas import ModelSource
_OPENCLIP_MODELS = { _OPENCLIP_MODELS = {
"RN50__openai", "RN50__openai",
@ -54,13 +55,16 @@ _INSIGHTFACE_MODELS = {
SUPPORTED_PROVIDERS = ["CUDAExecutionProvider", "OpenVINOExecutionProvider", "CPUExecutionProvider"] SUPPORTED_PROVIDERS = ["CUDAExecutionProvider", "OpenVINOExecutionProvider", "CPUExecutionProvider"]
def is_openclip(model_name: str) -> bool: def get_model_source(model_name: str) -> ModelSource | None:
return clean_name(model_name) in _OPENCLIP_MODELS cleaned_name = clean_name(model_name)
if cleaned_name in _INSIGHTFACE_MODELS:
return ModelSource.INSIGHTFACE
def is_mclip(model_name: str) -> bool: if cleaned_name in _MCLIP_MODELS:
return clean_name(model_name) in _MCLIP_MODELS return ModelSource.MCLIP
if cleaned_name in _OPENCLIP_MODELS:
return ModelSource.OPENCLIP
def is_insightface(model_name: str) -> bool: return None
return clean_name(model_name) in _INSIGHTFACE_MODELS

View File

@ -1,90 +0,0 @@
from pathlib import Path
from typing import Any
import cv2
import numpy as np
from insightface.model_zoo import ArcFaceONNX, RetinaFace
from insightface.utils.face_align import norm_crop
from numpy.typing import NDArray
from app.config import clean_name
from app.schemas import Face, ModelType, is_ndarray
from .base import InferenceModel
class FaceRecognizer(InferenceModel):
_model_type = ModelType.FACIAL_RECOGNITION
def __init__(
self,
model_name: str,
min_score: float = 0.7,
cache_dir: Path | str | None = None,
**model_kwargs: Any,
) -> None:
self.min_score = model_kwargs.pop("minScore", min_score)
super().__init__(clean_name(model_name), cache_dir, **model_kwargs)
def _load(self) -> None:
self.det_model = RetinaFace(session=self._make_session(self.det_file))
self.rec_model = ArcFaceONNX(
self.rec_file.with_suffix(".onnx").as_posix(),
session=self._make_session(self.rec_file),
)
self.det_model.prepare(
ctx_id=0,
det_thresh=self.min_score,
input_size=(640, 640),
)
self.rec_model.prepare(ctx_id=0)
def _predict(self, image: NDArray[np.uint8] | bytes) -> list[Face]:
if isinstance(image, bytes):
decoded_image = cv2.imdecode(np.frombuffer(image, np.uint8), cv2.IMREAD_COLOR)
else:
decoded_image = image
assert is_ndarray(decoded_image, np.uint8)
bboxes, kpss = self.det_model.detect(decoded_image)
if bboxes.size == 0:
return []
assert is_ndarray(kpss, np.float32)
scores = bboxes[:, 4].tolist()
bboxes = bboxes[:, :4].round().tolist()
results = []
height, width, _ = decoded_image.shape
for (x1, y1, x2, y2), score, kps in zip(bboxes, scores, kpss):
cropped_img = norm_crop(decoded_image, kps)
embedding: NDArray[np.float32] = self.rec_model.get_feat(cropped_img)[0]
face: Face = {
"imageWidth": width,
"imageHeight": height,
"boundingBox": {
"x1": x1,
"y1": y1,
"x2": x2,
"y2": y2,
},
"score": score,
"embedding": embedding,
}
results.append(face)
return results
@property
def cached(self) -> bool:
return self.det_file.is_file() and self.rec_file.is_file()
@property
def det_file(self) -> Path:
return self.cache_dir / "detection" / f"model.{self.preferred_runtime}"
@property
def rec_file(self) -> Path:
return self.cache_dir / "recognition" / f"model.{self.preferred_runtime}"
def configure(self, **model_kwargs: Any) -> None:
self.det_model.det_thresh = model_kwargs.pop("minScore", self.det_model.det_thresh)

View File

@ -0,0 +1,60 @@
from pathlib import Path
from typing import Any
import cv2
import numpy as np
from insightface.model_zoo import RetinaFace
from numpy.typing import NDArray
from app.schemas import DetectedFace, ModelSession, ModelTask, ModelType, is_ndarray
from app.models.base import InferenceModel
class FaceDetector(InferenceModel):
_model_task = ModelTask.FACIAL_RECOGNITION
_model_type = ModelType.DETECTION
def __init__(
self,
model_name: str,
min_score: float = 0.7,
cache_dir: Path | str | None = None,
**model_kwargs: Any,
) -> None:
self.min_score = model_kwargs.pop("minScore", min_score)
super().__init__(model_name, cache_dir, **model_kwargs)
def _load(self) -> ModelSession:
session = self._make_session(self.model_path)
self.det_model = RetinaFace(session=session)
self.det_model.prepare(ctx_id=0, det_thresh=self.min_score, input_size=(640, 640))
return session
def _predict(self, inputs: NDArray[np.uint8] | bytes, **kwargs: Any) -> list[DetectedFace]:
if isinstance(inputs, bytes):
decoded_image = cv2.imdecode(np.frombuffer(inputs, np.uint8), cv2.IMREAD_COLOR)
else:
decoded_image = inputs
assert is_ndarray(decoded_image, np.uint8)
bboxes, landmarks = self.det_model.detect(decoded_image)
assert is_ndarray(bboxes, np.float32)
assert is_ndarray(landmarks, np.float32)
if bboxes.size == 0:
return []
scores: list[float] = bboxes[:, 4].tolist()
bboxes_list: list[list[int]] = bboxes[:, :4].round().tolist()
results: list[DetectedFace] = [
{"box": {"x1": x1, "y1": y1, "x2": x2, "y2": y2}, "score": score, "landmarks": face_landmarks}
for (x1, y1, x2, y2), score, face_landmarks in zip(bboxes_list, scores, landmarks)
]
return results
def configure(self, **kwargs: Any) -> None:
self.det_model.det_thresh = kwargs.pop("minScore", self.det_model.det_thresh)

View File

@ -0,0 +1,31 @@
from typing import Any
import cv2
import numpy as np
from numpy.typing import NDArray
from app.models.facial_recognition.detection import FaceDetector
from app.models.facial_recognition.recognition import FaceRecognizer
from app.schemas import RecognizedFace, is_ndarray
class FacialRecognitionPipeline:
def __init__(self, det_model: FaceDetector, rec_model: FaceRecognizer) -> None:
self.det_model = det_model
self.rec_model = rec_model
self.loaded = False
def load(self) -> None:
self.det_model.load()
self.rec_model.load()
self.loaded = True
def predict(self, inputs: NDArray[np.uint8] | bytes, **kwargs: Any) -> list[RecognizedFace]:
if isinstance(inputs, bytes):
decoded_image = cv2.imdecode(np.frombuffer(inputs, np.uint8), cv2.IMREAD_COLOR)
else:
decoded_image = inputs
assert is_ndarray(decoded_image, np.uint8)
faces = self.det_model.predict(decoded_image, **kwargs)
results: list[RecognizedFace] = self.rec_model.predict(decoded_image, faces=faces, **kwargs)
return results

View File

@ -0,0 +1,65 @@
from pathlib import Path
from typing import Any
import cv2
import numpy as np
from insightface.model_zoo import ArcFaceONNX
from insightface.utils.face_align import norm_crop
from numpy.typing import NDArray
from app.config import clean_name
from app.models.transforms import crop_np, crop_bounding_box, resize_np
from app.schemas import DetectedFace, ModelTask, RecognizedFace, ModelSession, ModelType, is_ndarray
from ..base import InferenceModel
class FaceRecognizer(InferenceModel):
_model_task = ModelTask.FACIAL_RECOGNITION
_model_type = ModelType.RECOGNITION
def __init__(
self,
model_name: str,
min_score: float = 0.7,
cache_dir: Path | str | None = None,
**model_kwargs: Any,
) -> None:
self.min_score = model_kwargs.pop("minScore", min_score)
super().__init__(clean_name(model_name), cache_dir, **model_kwargs)
def _load(self) -> ModelSession:
session = self._make_session(self.model_path)
self.model = ArcFaceONNX(
self.model_path.with_suffix(".onnx").as_posix(),
session=session,
)
return session
# def _predict(self, img: Any, **kwargs: Any) -> Any:
def _predict(
self, inputs: NDArray[np.uint8] | bytes, faces: list[DetectedFace] = [], **kwargs: Any
) -> list[RecognizedFace]:
if isinstance(inputs, bytes):
decoded_image = cv2.imdecode(np.frombuffer(inputs, np.uint8), cv2.IMREAD_COLOR)
else:
decoded_image = inputs
assert is_ndarray(decoded_image, np.float32)
results: list[RecognizedFace] = []
for detected_face in faces:
landmarks = detected_face.get("landmarks", None)
if landmarks is not None:
cropped_img = norm_crop(decoded_image, np.asarray(landmarks))
else:
cropped_img = crop_bounding_box(decoded_image, detected_face["box"])
cropped_img = crop_np(resize_np(cropped_img, 112), 112)
assert is_ndarray(cropped_img, np.uint8)
embedding = self.model.get_feat(cropped_img)[0]
assert is_ndarray(embedding, np.float32)
face: RecognizedFace = {"box": detected_face["box"], "embedding": embedding}
results.append(face)
return results

View File

View File

@ -1,19 +1,54 @@
import cv2
import numpy as np import numpy as np
from numpy.typing import NDArray from numpy.typing import NDArray
from PIL import Image from PIL import Image
from app.schemas import BoundingBox, is_ndarray
_PIL_RESAMPLING_METHODS = {resampling.name.lower(): resampling for resampling in Image.Resampling} _PIL_RESAMPLING_METHODS = {resampling.name.lower(): resampling for resampling in Image.Resampling}
def resize(img: Image.Image, size: int) -> Image.Image: def resize_pil(img: Image.Image, size: int) -> Image.Image:
if img.width < img.height: if img.width < img.height:
return img.resize((size, int((img.height / img.width) * size)), resample=Image.BICUBIC) return img.resize((size, int((img.height / img.width) * size)), resample=Image.BICUBIC)
else: else:
return img.resize((int((img.width / img.height) * size), size), resample=Image.BICUBIC) return img.resize((int((img.width / img.height) * size), size), resample=Image.BICUBIC)
def resize_np(img: NDArray[np.float32], size: int) -> NDArray[np.float32]:
height, width = img.shape[:2]
if width < height:
res = cv2.resize(img, (size, int((height / width) * size)), interpolation=cv2.INTER_CUBIC)
else:
res = cv2.resize(img, (int((width / height) * size), size), interpolation=cv2.INTER_CUBIC)
assert is_ndarray(res, np.float32)
return res
# ported from server
def crop_bounding_box(image: NDArray[np.float32], bbox: BoundingBox, scale: float = 1.0) -> NDArray[np.float32]:
middle_x = (bbox["x1"] + bbox["x2"]) // 2
middle_y = (bbox["y1"] + bbox["y2"]) // 2
target_half_size = int(max((bbox["x2"] - bbox["x1"]) / 2, (bbox["y2"] - bbox["y1"]) / 2) * scale)
new_half_size = min(
middle_x - max(0, middle_x - target_half_size),
middle_y - max(0, middle_y - target_half_size),
min(image.shape[1] - 1, middle_x + target_half_size) - middle_x,
min(image.shape[0] - 1, middle_y + target_half_size) - middle_y,
)
left = middle_x - new_half_size
top = middle_y - new_half_size
width = int(new_half_size * 2)
height = int(new_half_size * 2)
return image[top : top + height, left : left + width]
# https://stackoverflow.com/a/60883103 # https://stackoverflow.com/a/60883103
def crop(img: Image.Image, size: int) -> Image.Image: def crop_pil(img: Image.Image, size: int) -> Image.Image:
left = int((img.size[0] / 2) - (size / 2)) left = int((img.size[0] / 2) - (size / 2))
upper = int((img.size[1] / 2) - (size / 2)) upper = int((img.size[1] / 2) - (size / 2))
right = left + size right = left + size
@ -22,6 +57,16 @@ def crop(img: Image.Image, size: int) -> Image.Image:
return img.crop((left, upper, right, lower)) return img.crop((left, upper, right, lower))
def crop_np(img: NDArray[np.float32], size: int) -> NDArray[np.generic]:
height, width = img.shape[:2]
left = int((width / 2) - (size / 2))
upper = int((height / 2) - (size / 2))
right = left + size
lower = upper + size
return img[upper:lower, left:right]
def to_numpy(img: Image.Image) -> NDArray[np.float32]: def to_numpy(img: Image.Image) -> NDArray[np.float32]:
return np.asarray(img.convert("RGB")).astype(np.float32) / 255.0 return np.asarray(img.convert("RGB")).astype(np.float32) / 255.0

View File

@ -3,7 +3,7 @@ from typing import Any, Protocol, TypedDict, TypeGuard
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
from pydantic import BaseModel from pydantic import BaseModel, Field
class StrEnum(str, Enum): class StrEnum(str, Enum):
@ -28,26 +28,60 @@ class BoundingBox(TypedDict):
y2: int y2: int
class ModelType(StrEnum): class ModelTask(StrEnum):
CLIP = "clip"
FACIAL_RECOGNITION = "facial-recognition" FACIAL_RECOGNITION = "facial-recognition"
SEARCH = "clip"
class ModelRuntime(StrEnum): class ModelType(StrEnum):
ONNX = "onnx" DETECTION = "detection"
PIPELINE = "pipeline"
RECOGNITION = "recognition"
TEXTUAL = "textual"
VISUAL = "visual"
class ModelFormat(StrEnum):
ARMNN = "armnn" ARMNN = "armnn"
ONNX = "onnx"
class ModelSource(StrEnum):
INSIGHTFACE = "insightface"
MCLIP = "mclip"
OPENCLIP = "openclip"
class ModelSession(Protocol):
def run(
self,
output_names: list[str] | None,
input_feed: dict[str, npt.NDArray[np.float32]] | dict[str, npt.NDArray[np.int32]],
run_options: Any = None,
) -> list[npt.NDArray[np.float32]]: ...
class Predictor(Protocol):
loaded: bool
def load(self) -> None: ...
def predict(self, inputs: Any, **model_kwargs: Any) -> Any: ...
class HasProfiling(Protocol): class HasProfiling(Protocol):
profiling: dict[str, float] profiling: dict[str, float]
class Face(TypedDict): class DetectedFace(TypedDict):
boundingBox: BoundingBox box: BoundingBox
embedding: npt.NDArray[np.float32]
imageWidth: int
imageHeight: int
score: float score: float
landmarks: npt.NDArray[np.float32] | None
class RecognizedFace(TypedDict):
box: BoundingBox
embedding: npt.NDArray[np.float32]
def has_profiling(obj: Any) -> TypeGuard[HasProfiling]: def has_profiling(obj: Any) -> TypeGuard[HasProfiling]:

View File

@ -17,13 +17,14 @@ from pytest import MonkeyPatch
from pytest_mock import MockerFixture from pytest_mock import MockerFixture
from app.main import load, preload_models from app.main import load, preload_models
from app.models.clip.textual import MClipTextualEncoder, OpenClipTextualEncoder
from app.models.clip.visual import OpenClipVisualEncoder
from app.models.facial_recognition.recognition import FaceRecognizer
from .config import Settings, log, settings from .config import Settings, log, settings
from .models.base import InferenceModel from .models.base import InferenceModel
from .models.cache import ModelCache from .models.cache import ModelCache
from .models.clip import MCLIPEncoder, OpenCLIPEncoder from .schemas import ModelFormat, ModelTask, ModelType
from .models.facial_recognition import FaceRecognizer
from .schemas import ModelRuntime, ModelType
class TestBase: class TestBase:
@ -35,13 +36,13 @@ class TestBase:
@pytest.mark.providers(CPU_EP) @pytest.mark.providers(CPU_EP)
def test_sets_cpu_provider(self, providers: list[str]) -> None: def test_sets_cpu_provider(self, providers: list[str]) -> None:
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.providers == self.CPU_EP assert encoder.providers == self.CPU_EP
@pytest.mark.providers(CUDA_EP) @pytest.mark.providers(CUDA_EP)
def test_sets_cuda_provider_if_available(self, providers: list[str]) -> None: def test_sets_cuda_provider_if_available(self, providers: list[str]) -> None:
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.providers == self.CUDA_EP assert encoder.providers == self.CUDA_EP
@ -50,7 +51,7 @@ class TestBase:
mocked = mocker.patch("app.models.base.ort.capi._pybind_state") mocked = mocker.patch("app.models.base.ort.capi._pybind_state")
mocked.get_available_openvino_device_ids.return_value = ["GPU.0", "CPU"] mocked.get_available_openvino_device_ids.return_value = ["GPU.0", "CPU"]
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.providers == self.OV_EP assert encoder.providers == self.OV_EP
@ -59,25 +60,25 @@ class TestBase:
mocked = mocker.patch("app.models.base.ort.capi._pybind_state") mocked = mocker.patch("app.models.base.ort.capi._pybind_state")
mocked.get_available_openvino_device_ids.return_value = ["CPU"] mocked.get_available_openvino_device_ids.return_value = ["CPU"]
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.providers == self.CPU_EP assert encoder.providers == self.CPU_EP
@pytest.mark.providers(CUDA_EP_OUT_OF_ORDER) @pytest.mark.providers(CUDA_EP_OUT_OF_ORDER)
def test_sets_providers_in_correct_order(self, providers: list[str]) -> None: def test_sets_providers_in_correct_order(self, providers: list[str]) -> None:
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.providers == self.CUDA_EP assert encoder.providers == self.CUDA_EP
@pytest.mark.providers(TRT_EP) @pytest.mark.providers(TRT_EP)
def test_ignores_unsupported_providers(self, providers: list[str]) -> None: def test_ignores_unsupported_providers(self, providers: list[str]) -> None:
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.providers == self.CUDA_EP assert encoder.providers == self.CUDA_EP
def test_sets_provider_kwarg(self) -> None: def test_sets_provider_kwarg(self) -> None:
providers = ["CUDAExecutionProvider"] providers = ["CUDAExecutionProvider"]
encoder = OpenCLIPEncoder("ViT-B-32__openai", providers=providers) encoder = OpenClipTextualEncoder("ViT-B-32__openai", providers=providers)
assert encoder.providers == providers assert encoder.providers == providers
@ -85,7 +86,9 @@ class TestBase:
mocked = mocker.patch("app.models.base.ort.capi._pybind_state") mocked = mocker.patch("app.models.base.ort.capi._pybind_state")
mocked.get_available_openvino_device_ids.return_value = ["GPU.0", "CPU"] mocked.get_available_openvino_device_ids.return_value = ["GPU.0", "CPU"]
encoder = OpenCLIPEncoder("ViT-B-32__openai", providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"]) encoder = OpenClipTextualEncoder(
"ViT-B-32__openai", providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"]
)
assert encoder.provider_options == [ assert encoder.provider_options == [
{"device_type": "GPU_FP32", "cache_dir": (encoder.cache_dir / "openvino").as_posix()}, {"device_type": "GPU_FP32", "cache_dir": (encoder.cache_dir / "openvino").as_posix()},
@ -93,7 +96,7 @@ class TestBase:
] ]
def test_sets_provider_options_kwarg(self) -> None: def test_sets_provider_options_kwarg(self) -> None:
encoder = OpenCLIPEncoder( encoder = OpenClipTextualEncoder(
"ViT-B-32__openai", "ViT-B-32__openai",
providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"], providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"],
provider_options=[], provider_options=[],
@ -102,7 +105,7 @@ class TestBase:
assert encoder.provider_options == [] assert encoder.provider_options == []
def test_sets_default_sess_options(self) -> None: def test_sets_default_sess_options(self) -> None:
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.sess_options.execution_mode == ort.ExecutionMode.ORT_SEQUENTIAL assert encoder.sess_options.execution_mode == ort.ExecutionMode.ORT_SEQUENTIAL
assert encoder.sess_options.inter_op_num_threads == 1 assert encoder.sess_options.inter_op_num_threads == 1
@ -110,7 +113,9 @@ class TestBase:
assert encoder.sess_options.enable_cpu_mem_arena is False assert encoder.sess_options.enable_cpu_mem_arena is False
def test_sets_default_sess_options_does_not_set_threads_if_non_cpu_and_default_threads(self) -> None: def test_sets_default_sess_options_does_not_set_threads_if_non_cpu_and_default_threads(self) -> None:
encoder = OpenCLIPEncoder("ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"]) encoder = OpenClipTextualEncoder(
"ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"]
)
assert encoder.sess_options.inter_op_num_threads == 0 assert encoder.sess_options.inter_op_num_threads == 0
assert encoder.sess_options.intra_op_num_threads == 0 assert encoder.sess_options.intra_op_num_threads == 0
@ -120,14 +125,16 @@ class TestBase:
mock_settings.model_inter_op_threads = 2 mock_settings.model_inter_op_threads = 2
mock_settings.model_intra_op_threads = 4 mock_settings.model_intra_op_threads = 4
encoder = OpenCLIPEncoder("ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"]) encoder = OpenClipTextualEncoder(
"ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"]
)
assert encoder.sess_options.inter_op_num_threads == 2 assert encoder.sess_options.inter_op_num_threads == 2
assert encoder.sess_options.intra_op_num_threads == 4 assert encoder.sess_options.intra_op_num_threads == 4
def test_sets_sess_options_kwarg(self) -> None: def test_sets_sess_options_kwarg(self) -> None:
sess_options = ort.SessionOptions() sess_options = ort.SessionOptions()
encoder = OpenCLIPEncoder( encoder = OpenClipTextualEncoder(
"ViT-B-32__openai", "ViT-B-32__openai",
providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"], providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"],
provider_options=[], provider_options=[],
@ -137,13 +144,13 @@ class TestBase:
assert sess_options is encoder.sess_options assert sess_options is encoder.sess_options
def test_sets_default_cache_dir(self) -> None: def test_sets_default_cache_dir(self) -> None:
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.cache_dir == Path(settings.cache_folder) / "clip" / "ViT-B-32__openai" assert encoder.cache_dir == Path(settings.cache_folder) / "clip" / "ViT-B-32__openai"
def test_sets_cache_dir_kwarg(self) -> None: def test_sets_cache_dir_kwarg(self) -> None:
cache_dir = Path("/test_cache") cache_dir = Path("/test_cache")
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=cache_dir) encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=cache_dir)
assert encoder.cache_dir == cache_dir assert encoder.cache_dir == cache_dir
@ -151,29 +158,29 @@ class TestBase:
mocker.patch.object(settings, "ann", True) mocker.patch.object(settings, "ann", True)
mocker.patch("ann.ann.is_available", False) mocker.patch("ann.ann.is_available", False)
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.preferred_runtime == ModelRuntime.ONNX assert encoder.preferred_runtime == ModelFormat.ONNX
def test_sets_default_preferred_runtime_to_armnn_if_available(self, mocker: MockerFixture) -> None: def test_sets_default_preferred_runtime_to_armnn_if_available(self, mocker: MockerFixture) -> None:
mocker.patch.object(settings, "ann", True) mocker.patch.object(settings, "ann", True)
mocker.patch("ann.ann.is_available", True) mocker.patch("ann.ann.is_available", True)
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
assert encoder.preferred_runtime == ModelRuntime.ARMNN assert encoder.preferred_runtime == ModelFormat.ARMNN
def test_sets_preferred_runtime_kwarg(self, mocker: MockerFixture) -> None: def test_sets_preferred_runtime_kwarg(self, mocker: MockerFixture) -> None:
mocker.patch.object(settings, "ann", False) mocker.patch.object(settings, "ann", False)
mocker.patch("ann.ann.is_available", False) mocker.patch("ann.ann.is_available", False)
encoder = OpenCLIPEncoder("ViT-B-32__openai", preferred_runtime=ModelRuntime.ARMNN) encoder = OpenClipTextualEncoder("ViT-B-32__openai", preferred_runtime=ModelFormat.ARMNN)
assert encoder.preferred_runtime == ModelRuntime.ARMNN assert encoder.preferred_runtime == ModelFormat.ARMNN
def test_casts_cache_dir_string_to_path(self) -> None: def test_casts_cache_dir_string_to_path(self) -> None:
cache_dir = "/test_cache" cache_dir = "/test_cache"
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=cache_dir) encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=cache_dir)
assert encoder.cache_dir == Path(cache_dir) assert encoder.cache_dir == Path(cache_dir)
@ -186,7 +193,7 @@ class TestBase:
mocker.patch("app.models.base.Path", return_value=mock_cache_dir) mocker.patch("app.models.base.Path", return_value=mock_cache_dir)
info = mocker.spy(log, "info") info = mocker.spy(log, "info")
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir) encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
encoder.clear_cache() encoder.clear_cache()
mock_rmtree.assert_called_once_with(encoder.cache_dir) mock_rmtree.assert_called_once_with(encoder.cache_dir)
@ -201,7 +208,7 @@ class TestBase:
mocker.patch("app.models.base.Path", return_value=mock_cache_dir) mocker.patch("app.models.base.Path", return_value=mock_cache_dir)
warning = mocker.spy(log, "warning") warning = mocker.spy(log, "warning")
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir) encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
encoder.clear_cache() encoder.clear_cache()
mock_rmtree.assert_not_called() mock_rmtree.assert_not_called()
@ -215,7 +222,7 @@ class TestBase:
mock_cache_dir.is_dir.return_value = True mock_cache_dir.is_dir.return_value = True
mocker.patch("app.models.base.Path", return_value=mock_cache_dir) mocker.patch("app.models.base.Path", return_value=mock_cache_dir)
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir) encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
with pytest.raises(RuntimeError): with pytest.raises(RuntimeError):
encoder.clear_cache() encoder.clear_cache()
@ -230,7 +237,7 @@ class TestBase:
mocker.patch("app.models.base.Path", return_value=mock_cache_dir) mocker.patch("app.models.base.Path", return_value=mock_cache_dir)
warning = mocker.spy(log, "warning") warning = mocker.spy(log, "warning")
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir) encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
encoder.clear_cache() encoder.clear_cache()
mock_rmtree.assert_not_called() mock_rmtree.assert_not_called()
@ -245,7 +252,7 @@ class TestBase:
mock_model_path.with_suffix.return_value = mock_model_path mock_model_path.with_suffix.return_value = mock_model_path
mock_ann = mocker.patch("app.models.base.AnnSession") mock_ann = mocker.patch("app.models.base.AnnSession")
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
encoder._make_session(mock_model_path) encoder._make_session(mock_model_path)
mock_ann.assert_called_once() mock_ann.assert_called_once()
@ -263,7 +270,7 @@ class TestBase:
mock_ann = mocker.patch("app.models.base.AnnSession") mock_ann = mocker.patch("app.models.base.AnnSession")
mock_ort = mocker.patch("app.models.base.ort.InferenceSession") mock_ort = mocker.patch("app.models.base.ort.InferenceSession")
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
encoder._make_session(mock_armnn_path) encoder._make_session(mock_armnn_path)
mock_ort.assert_called_once() mock_ort.assert_called_once()
@ -277,7 +284,7 @@ class TestBase:
mock_ann = mocker.patch("app.models.base.AnnSession") mock_ann = mocker.patch("app.models.base.AnnSession")
mock_ort = mocker.patch("app.models.base.ort.InferenceSession") mock_ort = mocker.patch("app.models.base.ort.InferenceSession")
encoder = OpenCLIPEncoder("ViT-B-32__openai") encoder = OpenClipTextualEncoder("ViT-B-32__openai")
with pytest.raises(ValueError): with pytest.raises(ValueError):
encoder._make_session(mock_model_path) encoder._make_session(mock_model_path)
@ -287,7 +294,7 @@ class TestBase:
def test_download(self, mocker: MockerFixture) -> None: def test_download(self, mocker: MockerFixture) -> None:
mock_snapshot_download = mocker.patch("app.models.base.snapshot_download") mock_snapshot_download = mocker.patch("app.models.base.snapshot_download")
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir="/path/to/cache") encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="/path/to/cache")
encoder.download() encoder.download()
mock_snapshot_download.assert_called_once_with( mock_snapshot_download.assert_called_once_with(
@ -301,7 +308,7 @@ class TestBase:
def test_download_downloads_armnn_if_preferred_runtime(self, mocker: MockerFixture) -> None: def test_download_downloads_armnn_if_preferred_runtime(self, mocker: MockerFixture) -> None:
mock_snapshot_download = mocker.patch("app.models.base.snapshot_download") mock_snapshot_download = mocker.patch("app.models.base.snapshot_download")
encoder = OpenCLIPEncoder("ViT-B-32__openai", preferred_runtime=ModelRuntime.ARMNN) encoder = OpenClipTextualEncoder("ViT-B-32__openai", preferred_runtime=ModelFormat.ARMNN)
encoder.download() encoder.download()
mock_snapshot_download.assert_called_once_with( mock_snapshot_download.assert_called_once_with(
@ -323,21 +330,18 @@ class TestCLIP:
mocker: MockerFixture, mocker: MockerFixture,
clip_model_cfg: dict[str, Any], clip_model_cfg: dict[str, Any],
clip_preprocess_cfg: Callable[[Path], dict[str, Any]], clip_preprocess_cfg: Callable[[Path], dict[str, Any]],
clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
) -> None: ) -> None:
mocker.patch.object(OpenCLIPEncoder, "download") mocker.patch.object(OpenClipVisualEncoder, "download")
mocker.patch.object(OpenCLIPEncoder, "model_cfg", clip_model_cfg) mocker.patch.object(OpenClipVisualEncoder, "model_cfg", clip_model_cfg)
mocker.patch.object(OpenCLIPEncoder, "preprocess_cfg", clip_preprocess_cfg) mocker.patch.object(OpenClipVisualEncoder, "preprocess_cfg", clip_preprocess_cfg)
mocker.patch.object(OpenCLIPEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
mocked = mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value mocked = mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
mocked.run.return_value = [[self.embedding]] mocked.run.return_value = [[self.embedding]]
mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True) mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True)
clip_encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="vision") clip_encoder = OpenClipVisualEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="vision")
embedding = clip_encoder.predict(pil_image) embedding = clip_encoder.predict(pil_image)
assert clip_encoder.mode == "vision"
assert isinstance(embedding, np.ndarray) assert isinstance(embedding, np.ndarray)
assert embedding.shape[0] == clip_model_cfg["embed_dim"] assert embedding.shape[0] == clip_model_cfg["embed_dim"]
assert embedding.dtype == np.float32 assert embedding.dtype == np.float32
@ -347,22 +351,19 @@ class TestCLIP:
self, self,
mocker: MockerFixture, mocker: MockerFixture,
clip_model_cfg: dict[str, Any], clip_model_cfg: dict[str, Any],
clip_preprocess_cfg: Callable[[Path], dict[str, Any]],
clip_tokenizer_cfg: Callable[[Path], dict[str, Any]], clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
) -> None: ) -> None:
mocker.patch.object(OpenCLIPEncoder, "download") mocker.patch.object(OpenClipTextualEncoder, "download")
mocker.patch.object(OpenCLIPEncoder, "model_cfg", clip_model_cfg) mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
mocker.patch.object(OpenCLIPEncoder, "preprocess_cfg", clip_preprocess_cfg) mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
mocker.patch.object(OpenCLIPEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
mocked = mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value mocked = mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
mocked.run.return_value = [[self.embedding]] mocked.run.return_value = [[self.embedding]]
mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True) mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True)
clip_encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text") clip_encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text")
embedding = clip_encoder.predict("test search query") embedding = clip_encoder.predict("test search query")
assert clip_encoder.mode == "text"
assert isinstance(embedding, np.ndarray) assert isinstance(embedding, np.ndarray)
assert embedding.shape[0] == clip_model_cfg["embed_dim"] assert embedding.shape[0] == clip_model_cfg["embed_dim"]
assert embedding.dtype == np.float32 assert embedding.dtype == np.float32
@ -372,18 +373,16 @@ class TestCLIP:
self, self,
mocker: MockerFixture, mocker: MockerFixture,
clip_model_cfg: dict[str, Any], clip_model_cfg: dict[str, Any],
clip_preprocess_cfg: Callable[[Path], dict[str, Any]],
clip_tokenizer_cfg: Callable[[Path], dict[str, Any]], clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
) -> None: ) -> None:
mocker.patch.object(OpenCLIPEncoder, "download") mocker.patch.object(OpenClipTextualEncoder, "download")
mocker.patch.object(OpenCLIPEncoder, "model_cfg", clip_model_cfg) mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
mocker.patch.object(OpenCLIPEncoder, "preprocess_cfg", clip_preprocess_cfg) mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
mocker.patch.object(OpenCLIPEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
mock_tokenizer = mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True).return_value mock_tokenizer = mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True).return_value
mock_ids = [randint(0, 50000) for _ in range(77)] mock_ids = [randint(0, 50000) for _ in range(77)]
mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids) mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids)
clip_encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text") clip_encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text")
clip_encoder._load_tokenizer() clip_encoder._load_tokenizer()
tokens = clip_encoder.tokenize("test search query") tokens = clip_encoder.tokenize("test search query")
@ -397,19 +396,17 @@ class TestCLIP:
self, self,
mocker: MockerFixture, mocker: MockerFixture,
clip_model_cfg: dict[str, Any], clip_model_cfg: dict[str, Any],
clip_preprocess_cfg: Callable[[Path], dict[str, Any]],
clip_tokenizer_cfg: Callable[[Path], dict[str, Any]], clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
) -> None: ) -> None:
mocker.patch.object(OpenCLIPEncoder, "download") mocker.patch.object(MClipTextualEncoder, "download")
mocker.patch.object(OpenCLIPEncoder, "model_cfg", clip_model_cfg) mocker.patch.object(MClipTextualEncoder, "model_cfg", clip_model_cfg)
mocker.patch.object(OpenCLIPEncoder, "preprocess_cfg", clip_preprocess_cfg) mocker.patch.object(MClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
mocker.patch.object(OpenCLIPEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
mock_tokenizer = mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True).return_value mock_tokenizer = mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True).return_value
mock_ids = [randint(0, 50000) for _ in range(77)] mock_ids = [randint(0, 50000) for _ in range(77)]
mock_attention_mask = [randint(0, 1) for _ in range(77)] mock_attention_mask = [randint(0, 1) for _ in range(77)]
mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids, attention_mask=mock_attention_mask) mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids, attention_mask=mock_attention_mask)
clip_encoder = MCLIPEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text") clip_encoder = MClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text")
clip_encoder._load_tokenizer() clip_encoder._load_tokenizer()
tokens = clip_encoder.tokenize("test search query") tokens = clip_encoder.tokenize("test search query")
@ -440,12 +437,12 @@ class TestFaceRecognition:
score = np.array([[0.67]] * num_faces).astype(np.float32) score = np.array([[0.67]] * num_faces).astype(np.float32)
kpss = np.random.rand(num_faces, 5, 2).astype(np.float32) kpss = np.random.rand(num_faces, 5, 2).astype(np.float32)
det_model.detect.return_value = (np.concatenate([bbox, score], axis=-1), kpss) det_model.detect.return_value = (np.concatenate([bbox, score], axis=-1), kpss)
face_recognizer.det_model = det_model face_recognizer.model = det_model
rec_model = mock.Mock() rec_model = mock.Mock()
embedding = np.random.rand(num_faces, 512).astype(np.float32) embedding = np.random.rand(num_faces, 512).astype(np.float32)
rec_model.get_feat.return_value = embedding rec_model.get_feat.return_value = embedding
face_recognizer.rec_model = rec_model face_recognizer.model = rec_model
faces = face_recognizer.predict(cv_image) faces = face_recognizer.predict(cv_image)
@ -465,24 +462,28 @@ class TestFaceRecognition:
class TestCache: class TestCache:
async def test_caches(self, mock_get_model: mock.Mock) -> None: async def test_caches(self, mock_get_model: mock.Mock) -> None:
model_cache = ModelCache() model_cache = ModelCache()
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION) await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION)
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION) await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION)
assert len(model_cache.cache._cache) == 1 assert len(model_cache.cache._cache) == 1
mock_get_model.assert_called_once() mock_get_model.assert_called_once()
async def test_kwargs_used(self, mock_get_model: mock.Mock) -> None: async def test_kwargs_used(self, mock_get_model: mock.Mock) -> None:
model_cache = ModelCache() model_cache = ModelCache()
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, cache_dir="test_cache") await model_cache.get(
mock_get_model.assert_called_once_with(ModelType.FACIAL_RECOGNITION, "test_model_name", cache_dir="test_cache") "test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, cache_dir="test_cache"
)
mock_get_model.assert_called_once_with(
ModelTask.FACIAL_RECOGNITION, ModelType.RECOGNITION, "test_model_name", cache_dir="test_cache"
)
async def test_different_clip(self, mock_get_model: mock.Mock) -> None: async def test_different_clip(self, mock_get_model: mock.Mock) -> None:
model_cache = ModelCache() model_cache = ModelCache()
await model_cache.get("test_image_model_name", ModelType.CLIP) await model_cache.get("test_image_model_name", ModelType.VISUAL, ModelTask.SEARCH)
await model_cache.get("test_text_model_name", ModelType.CLIP) await model_cache.get("test_text_model_name", ModelType.TEXTUAL, ModelTask.SEARCH)
mock_get_model.assert_has_calls( mock_get_model.assert_has_calls(
[ [
mock.call(ModelType.CLIP, "test_image_model_name"), mock.call(ModelTask.SEARCH, "test_image_model_name"),
mock.call(ModelType.CLIP, "test_text_model_name"), mock.call(ModelTask.SEARCH, "test_text_model_name"),
] ]
) )
assert len(model_cache.cache._cache) == 2 assert len(model_cache.cache._cache) == 2
@ -490,19 +491,19 @@ class TestCache:
@mock.patch("app.models.cache.OptimisticLock", autospec=True) @mock.patch("app.models.cache.OptimisticLock", autospec=True)
async def test_model_ttl(self, mock_lock_cls: mock.Mock, mock_get_model: mock.Mock) -> None: async def test_model_ttl(self, mock_lock_cls: mock.Mock, mock_get_model: mock.Mock) -> None:
model_cache = ModelCache() model_cache = ModelCache()
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, ttl=100) await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
mock_lock_cls.return_value.__aenter__.return_value.cas.assert_called_with(mock.ANY, ttl=100) mock_lock_cls.return_value.__aenter__.return_value.cas.assert_called_with(mock.ANY, ttl=100)
@mock.patch("app.models.cache.SimpleMemoryCache.expire") @mock.patch("app.models.cache.SimpleMemoryCache.expire")
async def test_revalidate_get(self, mock_cache_expire: mock.Mock, mock_get_model: mock.Mock) -> None: async def test_revalidate_get(self, mock_cache_expire: mock.Mock, mock_get_model: mock.Mock) -> None:
model_cache = ModelCache(revalidate=True) model_cache = ModelCache(revalidate=True)
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, ttl=100) await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, ttl=100) await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
mock_cache_expire.assert_called_once_with(mock.ANY, 100) mock_cache_expire.assert_called_once_with(mock.ANY, 100)
async def test_profiling(self, mock_get_model: mock.Mock) -> None: async def test_profiling(self, mock_get_model: mock.Mock) -> None:
model_cache = ModelCache(profiling=True) model_cache = ModelCache(profiling=True)
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, ttl=100) await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
profiling = await model_cache.get_profiling() profiling = await model_cache.get_profiling()
assert isinstance(profiling, dict) assert isinstance(profiling, dict)
assert profiling == model_cache.cache.profiling assert profiling == model_cache.cache.profiling
@ -510,9 +511,9 @@ class TestCache:
async def test_loads_mclip(self) -> None: async def test_loads_mclip(self) -> None:
model_cache = ModelCache() model_cache = ModelCache()
model = await model_cache.get("XLM-Roberta-Large-Vit-B-32", ModelType.CLIP, mode="text") model = await model_cache.get("XLM-Roberta-Large-Vit-B-32", ModelType.TEXTUAL, ModelTask.SEARCH)
assert isinstance(model, MCLIPEncoder) assert isinstance(model, MClipTextualEncoder)
assert model.model_name == "XLM-Roberta-Large-Vit-B-32" assert model.model_name == "XLM-Roberta-Large-Vit-B-32"
async def test_raises_exception_if_invalid_model_type(self) -> None: async def test_raises_exception_if_invalid_model_type(self) -> None:
@ -520,13 +521,13 @@ class TestCache:
model_cache = ModelCache() model_cache = ModelCache()
with pytest.raises(ValueError): with pytest.raises(ValueError):
await model_cache.get("XLM-Roberta-Large-Vit-B-32", invalid, mode="text") await model_cache.get("XLM-Roberta-Large-Vit-B-32", ModelType.TEXTUAL, invalid)
async def test_raises_exception_if_unknown_model_name(self) -> None: async def test_raises_exception_if_unknown_model_name(self) -> None:
model_cache = ModelCache() model_cache = ModelCache()
with pytest.raises(ValueError): with pytest.raises(ValueError):
await model_cache.get("test_model_name", ModelType.CLIP, mode="text") await model_cache.get("test_model_name", ModelType.TEXTUAL, ModelTask.SEARCH)
async def test_preloads_models(self, monkeypatch: MonkeyPatch, mock_get_model: mock.Mock) -> None: async def test_preloads_models(self, monkeypatch: MonkeyPatch, mock_get_model: mock.Mock) -> None:
os.environ["MACHINE_LEARNING_PRELOAD__CLIP"] = "ViT-B-32__openai" os.environ["MACHINE_LEARNING_PRELOAD__CLIP"] = "ViT-B-32__openai"
@ -541,11 +542,12 @@ class TestCache:
monkeypatch.setattr("app.main.model_cache", model_cache) monkeypatch.setattr("app.main.model_cache", model_cache)
await preload_models(settings.preload) await preload_models(settings.preload)
assert len(model_cache.cache._cache) == 2 assert len(model_cache.cache._cache) == 3
assert mock_get_model.call_count == 2 assert mock_get_model.call_count == 3
await model_cache.get("ViT-B-32__openai", ModelType.CLIP, ttl=100) await model_cache.get("ViT-B-32__openai", ModelType.TEXTUAL, ModelTask.SEARCH, ttl=100)
await model_cache.get("buffalo_s", ModelType.FACIAL_RECOGNITION, ttl=100) await model_cache.get("ViT-B-32__openai", ModelType.VISUAL, ModelTask.SEARCH, ttl=100)
assert mock_get_model.call_count == 2 await model_cache.get("buffalo_s", ModelType.PIPELINE, ModelTask.FACIAL_RECOGNITION, ttl=100)
assert mock_get_model.call_count == 3
@pytest.mark.asyncio @pytest.mark.asyncio
@ -572,7 +574,8 @@ class TestLoad:
async def test_load_clears_cache_and_retries_if_os_error(self) -> None: async def test_load_clears_cache_and_retries_if_os_error(self) -> None:
mock_model = mock.Mock(spec=InferenceModel) mock_model = mock.Mock(spec=InferenceModel)
mock_model.model_name = "test_model_name" mock_model.model_name = "test_model_name"
mock_model.model_type = ModelType.CLIP mock_model.model_type = ModelType.VISUAL
mock_model.model_task = ModelTask.SEARCH
mock_model.load.side_effect = [OSError, None] mock_model.load.side_effect = [OSError, None]
mock_model.loaded = False mock_model.loaded = False