forked from Cutlery/immich
Compare commits
1 Commits
main
...
refactor/m
Author | SHA1 | Date | |
---|---|---|---|
|
c6a1d7b4f7 |
@ -12,8 +12,6 @@ from rich.logging import RichHandler
|
||||
from uvicorn import Server
|
||||
from uvicorn.workers import UvicornWorker
|
||||
|
||||
from .schemas import ModelType
|
||||
|
||||
|
||||
class PreloadModelData(BaseModel):
|
||||
clip: str | None
|
||||
@ -21,7 +19,7 @@ class PreloadModelData(BaseModel):
|
||||
|
||||
|
||||
class Settings(BaseSettings):
|
||||
cache_folder: str = "/cache"
|
||||
cache_folder: Path = Path("/cache")
|
||||
model_ttl: int = 300
|
||||
model_ttl_poll_s: int = 10
|
||||
host: str = "0.0.0.0"
|
||||
@ -55,14 +53,6 @@ def clean_name(model_name: str) -> str:
|
||||
return model_name.split("/")[-1].translate(_clean_name)
|
||||
|
||||
|
||||
def get_cache_dir(model_name: str, model_type: ModelType) -> Path:
|
||||
return Path(settings.cache_folder) / model_type.value / clean_name(model_name)
|
||||
|
||||
|
||||
def get_hf_model_name(model_name: str) -> str:
|
||||
return f"immich-app/{clean_name(model_name)}"
|
||||
|
||||
|
||||
LOG_LEVELS: dict[str, int] = {
|
||||
"critical": logging.ERROR,
|
||||
"error": logging.ERROR,
|
||||
|
@ -6,22 +6,21 @@ import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from contextlib import asynccontextmanager
|
||||
from functools import partial
|
||||
from typing import Any, AsyncGenerator, Callable, Iterator
|
||||
from zipfile import BadZipFile
|
||||
|
||||
import orjson
|
||||
from fastapi import Depends, FastAPI, Form, HTTPException, UploadFile
|
||||
from fastapi.responses import ORJSONResponse
|
||||
from onnxruntime.capi.onnxruntime_pybind11_state import InvalidProtobuf, NoSuchFile
|
||||
from starlette.formparsers import MultiPartParser
|
||||
|
||||
from app.models.base import InferenceModel
|
||||
|
||||
from .config import PreloadModelData, log, settings
|
||||
from .models.cache import ModelCache
|
||||
from .schemas import (
|
||||
MessageResponse,
|
||||
ModelTask,
|
||||
ModelType,
|
||||
Predictor,
|
||||
TextResponse,
|
||||
)
|
||||
|
||||
@ -63,12 +62,21 @@ async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]:
|
||||
gc.collect()
|
||||
|
||||
|
||||
async def preload_models(preload_models: PreloadModelData) -> None:
|
||||
log.info(f"Preloading models: {preload_models}")
|
||||
if preload_models.clip is not None:
|
||||
await load(await model_cache.get(preload_models.clip, ModelType.CLIP))
|
||||
if preload_models.facial_recognition is not None:
|
||||
await load(await model_cache.get(preload_models.facial_recognition, ModelType.FACIAL_RECOGNITION))
|
||||
async def preload_models(preload: PreloadModelData) -> None:
|
||||
log.info(f"Preloading models: {preload}")
|
||||
if preload.clip is not None:
|
||||
model = await model_cache.get(preload.clip, ModelType.TEXTUAL, ModelTask.SEARCH)
|
||||
await load(model)
|
||||
|
||||
model = await model_cache.get(preload.clip, ModelType.VISUAL, ModelTask.SEARCH)
|
||||
await load(model)
|
||||
|
||||
if preload.facial_recognition is not None:
|
||||
model = await model_cache.get(preload.facial_recognition, ModelType.DETECTION, ModelTask.FACIAL_RECOGNITION)
|
||||
await load(model)
|
||||
|
||||
model = await model_cache.get(preload.facial_recognition, ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION)
|
||||
await load(model)
|
||||
|
||||
|
||||
def update_state() -> Iterator[None]:
|
||||
@ -98,6 +106,7 @@ def ping() -> str:
|
||||
async def predict(
|
||||
model_name: str = Form(alias="modelName"),
|
||||
model_type: ModelType = Form(alias="modelType"),
|
||||
model_task: ModelTask = Form(alias="modelTask"),
|
||||
options: str = Form(default="{}"),
|
||||
text: str | None = Form(default=None),
|
||||
image: UploadFile | None = None,
|
||||
@ -113,37 +122,28 @@ async def predict(
|
||||
except orjson.JSONDecodeError:
|
||||
raise HTTPException(400, f"Invalid options JSON: {options}")
|
||||
|
||||
model = await load(await model_cache.get(model_name, model_type, ttl=settings.model_ttl, **kwargs))
|
||||
model.configure(**kwargs)
|
||||
outputs = await run(model.predict, inputs)
|
||||
model = await model_cache.get(model_name, model_type, model_task, ttl=settings.model_ttl, **kwargs)
|
||||
model = await load(model)
|
||||
outputs = await run(model.predict, inputs, **kwargs)
|
||||
return ORJSONResponse(outputs)
|
||||
|
||||
|
||||
async def run(func: Callable[..., Any], inputs: Any) -> Any:
|
||||
async def run(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
|
||||
if thread_pool is None:
|
||||
return func(inputs)
|
||||
return await asyncio.get_running_loop().run_in_executor(thread_pool, func, inputs)
|
||||
return func(*args, **kwargs)
|
||||
partial_func = partial(func, *args, **kwargs)
|
||||
return await asyncio.get_running_loop().run_in_executor(thread_pool, partial_func)
|
||||
|
||||
|
||||
async def load(model: InferenceModel) -> InferenceModel:
|
||||
async def load(model: Predictor) -> Predictor:
|
||||
if model.loaded:
|
||||
return model
|
||||
|
||||
def _load(model: InferenceModel) -> None:
|
||||
def _load(model: Predictor) -> Predictor:
|
||||
with lock:
|
||||
model.load()
|
||||
|
||||
try:
|
||||
await run(_load, model)
|
||||
return model
|
||||
except (OSError, InvalidProtobuf, BadZipFile, NoSuchFile):
|
||||
log.warning(
|
||||
(
|
||||
f"Failed to load {model.model_type.replace('_', ' ')} model '{model.model_name}'."
|
||||
"Clearing cache and retrying."
|
||||
)
|
||||
)
|
||||
model.clear_cache()
|
||||
|
||||
await run(_load, model)
|
||||
return model
|
||||
|
||||
|
@ -1,24 +1,31 @@
|
||||
from typing import Any
|
||||
|
||||
from app.schemas import ModelType
|
||||
from app.models.clip.textual import MClipTextualEncoder, OpenClipTextualEncoder
|
||||
from app.models.clip.visual import OpenClipVisualEncoder
|
||||
from app.schemas import ModelSource, ModelTask, ModelType, Predictor
|
||||
|
||||
from .base import InferenceModel
|
||||
from .clip import MCLIPEncoder, OpenCLIPEncoder
|
||||
from .constants import is_insightface, is_mclip, is_openclip
|
||||
from .facial_recognition import FaceRecognizer
|
||||
from .constants import get_model_source
|
||||
from .facial_recognition.detection import FaceDetector
|
||||
from .facial_recognition.recognition import FaceRecognizer
|
||||
|
||||
|
||||
def from_model_type(model_type: ModelType, model_name: str, **model_kwargs: Any) -> InferenceModel:
|
||||
match model_type:
|
||||
case ModelType.CLIP:
|
||||
if is_openclip(model_name):
|
||||
return OpenCLIPEncoder(model_name, **model_kwargs)
|
||||
elif is_mclip(model_name):
|
||||
return MCLIPEncoder(model_name, **model_kwargs)
|
||||
case ModelType.FACIAL_RECOGNITION:
|
||||
if is_insightface(model_name):
|
||||
def from_model_type(model_name: str, model_type: ModelType, model_task: ModelTask, **model_kwargs: Any) -> Predictor:
|
||||
source = get_model_source(model_name)
|
||||
match source, model_type, model_task:
|
||||
case ModelSource.OPENCLIP | ModelSource.MCLIP, ModelType.VISUAL, ModelTask.SEARCH:
|
||||
return OpenClipVisualEncoder(model_name, **model_kwargs)
|
||||
|
||||
case ModelSource.OPENCLIP, ModelType.TEXTUAL, ModelTask.SEARCH:
|
||||
return OpenClipTextualEncoder(model_name, **model_kwargs)
|
||||
|
||||
case ModelSource.MCLIP, ModelType.TEXTUAL, ModelTask.SEARCH:
|
||||
return MClipTextualEncoder(model_name, **model_kwargs)
|
||||
|
||||
case ModelSource.INSIGHTFACE, ModelType.DETECTION, ModelTask.FACIAL_RECOGNITION:
|
||||
return FaceDetector(model_name, **model_kwargs)
|
||||
|
||||
case ModelSource.INSIGHTFACE, ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION:
|
||||
return FaceRecognizer(model_name, **model_kwargs)
|
||||
case _:
|
||||
raise ValueError(f"Unknown model type {model_type}")
|
||||
|
||||
raise ValueError(f"Unknown {model_type} model {model_name}")
|
||||
case _:
|
||||
raise ValueError(f"Unknown model combination: {source}, {model_type}, {model_task}")
|
||||
|
@ -3,21 +3,24 @@ from __future__ import annotations
|
||||
from abc import ABC, abstractmethod
|
||||
from pathlib import Path
|
||||
from shutil import rmtree
|
||||
from typing import Any
|
||||
from typing import Any, ClassVar
|
||||
from zipfile import BadZipFile
|
||||
|
||||
import onnxruntime as ort
|
||||
from onnxruntime.capi.onnxruntime_pybind11_state import InvalidProtobuf, NoSuchFile
|
||||
from huggingface_hub import snapshot_download
|
||||
|
||||
import ann.ann
|
||||
from app.models.constants import SUPPORTED_PROVIDERS
|
||||
|
||||
from ..config import get_cache_dir, get_hf_model_name, log, settings
|
||||
from ..schemas import ModelRuntime, ModelType
|
||||
from ..config import clean_name, log, settings
|
||||
from ..schemas import ModelFormat, ModelSession, ModelTask, ModelType
|
||||
from .ann import AnnSession
|
||||
|
||||
|
||||
class InferenceModel(ABC):
|
||||
_model_type: ModelType
|
||||
_model_task: ClassVar[ModelTask]
|
||||
_model_type: ClassVar[ModelType]
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@ -26,16 +29,16 @@ class InferenceModel(ABC):
|
||||
providers: list[str] | None = None,
|
||||
provider_options: list[dict[str, Any]] | None = None,
|
||||
sess_options: ort.SessionOptions | None = None,
|
||||
preferred_runtime: ModelRuntime | None = None,
|
||||
preferred_format: ModelFormat | None = None,
|
||||
**model_kwargs: Any,
|
||||
) -> None:
|
||||
self.loaded = False
|
||||
self.model_name = model_name
|
||||
self.model_name = clean_name(model_name)
|
||||
self.cache_dir = Path(cache_dir) if cache_dir is not None else self.cache_dir_default
|
||||
self.providers = providers if providers is not None else self.providers_default
|
||||
self.provider_options = provider_options if provider_options is not None else self.provider_options_default
|
||||
self.sess_options = sess_options if sess_options is not None else self.sess_options_default
|
||||
self.preferred_runtime = preferred_runtime if preferred_runtime is not None else self.preferred_runtime_default
|
||||
self.preferred_runtime = preferred_format if preferred_format is not None else self.preferred_runtime_default
|
||||
|
||||
def download(self) -> None:
|
||||
if not self.cached:
|
||||
@ -47,35 +50,47 @@ class InferenceModel(ABC):
|
||||
def load(self) -> None:
|
||||
if self.loaded:
|
||||
return
|
||||
|
||||
try:
|
||||
self.download()
|
||||
log.info(f"Loading {self.model_type.replace('-', ' ')} model '{self.model_name}' to memory")
|
||||
self._load()
|
||||
self.session = self._load()
|
||||
except (OSError, InvalidProtobuf, BadZipFile, NoSuchFile):
|
||||
log.warning(
|
||||
(
|
||||
f"Failed to load {self.model_type.replace('_', ' ')} model '{self.model_name}'."
|
||||
"Clearing cache and retrying."
|
||||
)
|
||||
)
|
||||
self.clear_cache()
|
||||
self.download()
|
||||
self.session = self._load()
|
||||
self.loaded = True
|
||||
|
||||
def predict(self, inputs: Any, **model_kwargs: Any) -> Any:
|
||||
self.load()
|
||||
if model_kwargs:
|
||||
self.configure(**model_kwargs)
|
||||
return self._predict(inputs)
|
||||
return self._predict(inputs, **model_kwargs)
|
||||
|
||||
@abstractmethod
|
||||
def _predict(self, inputs: Any) -> Any: ...
|
||||
def _predict(self, inputs: Any, **model_kwargs: Any) -> Any: ...
|
||||
|
||||
def configure(self, **model_kwargs: Any) -> None:
|
||||
def configure(self, **kwargs: Any) -> None:
|
||||
pass
|
||||
|
||||
def _download(self) -> None:
|
||||
ignore_patterns = [] if self.preferred_runtime == ModelRuntime.ARMNN else ["*.armnn"]
|
||||
ignore_patterns = [] if self.preferred_runtime == ModelFormat.ARMNN else ["*.armnn"]
|
||||
snapshot_download(
|
||||
get_hf_model_name(self.model_name),
|
||||
f"immich-app/{clean_name(self.model_name)}",
|
||||
cache_dir=self.cache_dir,
|
||||
local_dir=self.cache_dir,
|
||||
local_dir_use_symlinks=False,
|
||||
ignore_patterns=ignore_patterns,
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def _load(self) -> None: ...
|
||||
def _load(self) -> ModelSession:
|
||||
return self._make_session(self.model_path)
|
||||
|
||||
def clear_cache(self) -> None:
|
||||
if not self.cache_dir.exists():
|
||||
@ -99,7 +114,7 @@ class InferenceModel(ABC):
|
||||
self.cache_dir.unlink()
|
||||
self.cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _make_session(self, model_path: Path) -> AnnSession | ort.InferenceSession:
|
||||
def _make_session(self, model_path: Path) -> ModelSession:
|
||||
if not model_path.is_file():
|
||||
onnx_path = model_path.with_suffix(".onnx")
|
||||
if not onnx_path.is_file():
|
||||
@ -124,6 +139,14 @@ class InferenceModel(ABC):
|
||||
raise ValueError(f"Unsupported model file type: {model_path.suffix}")
|
||||
return session
|
||||
|
||||
@property
|
||||
def model_path(self) -> Path:
|
||||
return self.cache_dir / self.model_type.value / f"model.{self.preferred_runtime}"
|
||||
|
||||
@property
|
||||
def model_task(self) -> ModelTask:
|
||||
return self._model_task
|
||||
|
||||
@property
|
||||
def model_type(self) -> ModelType:
|
||||
return self._model_type
|
||||
@ -138,11 +161,11 @@ class InferenceModel(ABC):
|
||||
|
||||
@property
|
||||
def cache_dir_default(self) -> Path:
|
||||
return get_cache_dir(self.model_name, self.model_type)
|
||||
return settings.cache_folder / self.model_task.value / self.model_name
|
||||
|
||||
@property
|
||||
def cached(self) -> bool:
|
||||
return self.cache_dir.is_dir() and any(self.cache_dir.iterdir())
|
||||
return self.model_path.is_file()
|
||||
|
||||
@property
|
||||
def providers(self) -> list[str]:
|
||||
@ -226,14 +249,14 @@ class InferenceModel(ABC):
|
||||
return sess_options
|
||||
|
||||
@property
|
||||
def preferred_runtime(self) -> ModelRuntime:
|
||||
def preferred_runtime(self) -> ModelFormat:
|
||||
return self._preferred_runtime
|
||||
|
||||
@preferred_runtime.setter
|
||||
def preferred_runtime(self, preferred_runtime: ModelRuntime) -> None:
|
||||
def preferred_runtime(self, preferred_runtime: ModelFormat) -> None:
|
||||
log.debug(f"Setting preferred runtime to {preferred_runtime}")
|
||||
self._preferred_runtime = preferred_runtime
|
||||
|
||||
@property
|
||||
def preferred_runtime_default(self) -> ModelRuntime:
|
||||
return ModelRuntime.ARMNN if ann.ann.is_available and settings.ann else ModelRuntime.ONNX
|
||||
def preferred_runtime_default(self) -> ModelFormat:
|
||||
return ModelFormat.ARMNN if ann.ann.is_available and settings.ann else ModelFormat.ONNX
|
||||
|
@ -5,9 +5,9 @@ from aiocache.lock import OptimisticLock
|
||||
from aiocache.plugins import TimingPlugin
|
||||
|
||||
from app.models import from_model_type
|
||||
from app.models.facial_recognition.pipeline import FacialRecognitionPipeline
|
||||
|
||||
from ..schemas import ModelType, has_profiling
|
||||
from .base import InferenceModel
|
||||
from ..schemas import ModelTask, ModelType, Predictor, has_profiling
|
||||
|
||||
|
||||
class ModelCache:
|
||||
@ -31,11 +31,13 @@ class ModelCache:
|
||||
if profiling:
|
||||
plugins.append(TimingPlugin())
|
||||
|
||||
self.revalidate_enable = revalidate
|
||||
self.should_revalidate = revalidate
|
||||
|
||||
self.cache = SimpleMemoryCache(timeout=timeout, plugins=plugins, namespace=None)
|
||||
|
||||
async def get(self, model_name: str, model_type: ModelType, **model_kwargs: Any) -> InferenceModel:
|
||||
async def get(
|
||||
self, model_name: str, model_type: ModelType, model_task: ModelTask, **model_kwargs: Any
|
||||
) -> Predictor:
|
||||
"""
|
||||
Args:
|
||||
model_name: Name of model in the model hub used for the task.
|
||||
@ -45,17 +47,38 @@ class ModelCache:
|
||||
model: The requested model.
|
||||
"""
|
||||
|
||||
key = f"{model_name}{model_type.value}{model_kwargs.get('mode', '')}"
|
||||
key = f"{model_name}{model_type.value}{model_task.value}"
|
||||
|
||||
async with OptimisticLock(self.cache, key) as lock:
|
||||
model: InferenceModel | None = await self.cache.get(key)
|
||||
model: Predictor | None = await self.cache.get(key)
|
||||
if model is None:
|
||||
model = from_model_type(model_type, model_name, **model_kwargs)
|
||||
if model_type == ModelType.PIPELINE:
|
||||
model = await self._get_pipeline(model_name, model_task, **model_kwargs)
|
||||
else:
|
||||
model = from_model_type(model_name, model_type, model_task, **model_kwargs)
|
||||
await lock.cas(model, ttl=model_kwargs.get("ttl", None))
|
||||
elif self.revalidate_enable:
|
||||
elif self.should_revalidate:
|
||||
await self.revalidate(key, model_kwargs.get("ttl", None))
|
||||
return model
|
||||
|
||||
async def _get_pipeline(self, model_name: str, model_task: ModelTask, **model_kwargs: Any) -> Predictor:
|
||||
"""
|
||||
Args:
|
||||
model_name: Name of model in the model hub used for the task.
|
||||
model_type: Model type or task, which determines which model zoo is used.
|
||||
|
||||
Returns:
|
||||
model: The requested model.
|
||||
"""
|
||||
match model_task:
|
||||
case ModelTask.FACIAL_RECOGNITION:
|
||||
det_model: Any = await self.get(model_name, ModelType.DETECTION, model_task, **model_kwargs)
|
||||
rec_model: Any = await self.get(model_name, ModelType.RECOGNITION, model_task, **model_kwargs)
|
||||
return FacialRecognitionPipeline(det_model, rec_model)
|
||||
|
||||
case _:
|
||||
raise ValueError(f"Unknown model task: {model_task}")
|
||||
|
||||
async def get_profiling(self) -> dict[str, float] | None:
|
||||
if not has_profiling(self.cache):
|
||||
return None
|
||||
|
@ -1,189 +0,0 @@
|
||||
import json
|
||||
from abc import abstractmethod
|
||||
from functools import cached_property
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal
|
||||
|
||||
import numpy as np
|
||||
from numpy.typing import NDArray
|
||||
from PIL import Image
|
||||
from tokenizers import Encoding, Tokenizer
|
||||
|
||||
from app.config import clean_name, log
|
||||
from app.models.transforms import crop, get_pil_resampling, normalize, resize, to_numpy
|
||||
from app.schemas import ModelType
|
||||
|
||||
from .base import InferenceModel
|
||||
|
||||
|
||||
class BaseCLIPEncoder(InferenceModel):
|
||||
_model_type = ModelType.CLIP
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_name: str,
|
||||
cache_dir: Path | str | None = None,
|
||||
mode: Literal["text", "vision"] | None = None,
|
||||
**model_kwargs: Any,
|
||||
) -> None:
|
||||
self.mode = mode
|
||||
super().__init__(model_name, cache_dir, **model_kwargs)
|
||||
|
||||
def _load(self) -> None:
|
||||
if self.mode == "text" or self.mode is None:
|
||||
log.debug(f"Loading clip text model '{self.model_name}'")
|
||||
self.text_model = self._make_session(self.textual_path)
|
||||
log.debug(f"Loaded clip text model '{self.model_name}'")
|
||||
|
||||
if self.mode == "vision" or self.mode is None:
|
||||
log.debug(f"Loading clip vision model '{self.model_name}'")
|
||||
self.vision_model = self._make_session(self.visual_path)
|
||||
log.debug(f"Loaded clip vision model '{self.model_name}'")
|
||||
|
||||
def _predict(self, image_or_text: Image.Image | str) -> NDArray[np.float32]:
|
||||
if isinstance(image_or_text, bytes):
|
||||
image_or_text = Image.open(BytesIO(image_or_text))
|
||||
|
||||
match image_or_text:
|
||||
case Image.Image():
|
||||
if self.mode == "text":
|
||||
raise TypeError("Cannot encode image as text-only model")
|
||||
outputs: NDArray[np.float32] = self.vision_model.run(None, self.transform(image_or_text))[0][0]
|
||||
case str():
|
||||
if self.mode == "vision":
|
||||
raise TypeError("Cannot encode text as vision-only model")
|
||||
outputs = self.text_model.run(None, self.tokenize(image_or_text))[0][0]
|
||||
case _:
|
||||
raise TypeError(f"Expected Image or str, but got: {type(image_or_text)}")
|
||||
|
||||
return outputs
|
||||
|
||||
@abstractmethod
|
||||
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
|
||||
pass
|
||||
|
||||
@property
|
||||
def textual_dir(self) -> Path:
|
||||
return self.cache_dir / "textual"
|
||||
|
||||
@property
|
||||
def visual_dir(self) -> Path:
|
||||
return self.cache_dir / "visual"
|
||||
|
||||
@property
|
||||
def model_cfg_path(self) -> Path:
|
||||
return self.cache_dir / "config.json"
|
||||
|
||||
@property
|
||||
def textual_path(self) -> Path:
|
||||
return self.textual_dir / f"model.{self.preferred_runtime}"
|
||||
|
||||
@property
|
||||
def visual_path(self) -> Path:
|
||||
return self.visual_dir / f"model.{self.preferred_runtime}"
|
||||
|
||||
@property
|
||||
def tokenizer_file_path(self) -> Path:
|
||||
return self.textual_dir / "tokenizer.json"
|
||||
|
||||
@property
|
||||
def tokenizer_cfg_path(self) -> Path:
|
||||
return self.textual_dir / "tokenizer_config.json"
|
||||
|
||||
@property
|
||||
def preprocess_cfg_path(self) -> Path:
|
||||
return self.visual_dir / "preprocess_cfg.json"
|
||||
|
||||
@property
|
||||
def cached(self) -> bool:
|
||||
return self.textual_path.is_file() and self.visual_path.is_file()
|
||||
|
||||
@cached_property
|
||||
def model_cfg(self) -> dict[str, Any]:
|
||||
log.debug(f"Loading model config for CLIP model '{self.model_name}'")
|
||||
model_cfg: dict[str, Any] = json.load(self.model_cfg_path.open())
|
||||
log.debug(f"Loaded model config for CLIP model '{self.model_name}'")
|
||||
return model_cfg
|
||||
|
||||
@cached_property
|
||||
def tokenizer_file(self) -> dict[str, Any]:
|
||||
log.debug(f"Loading tokenizer file for CLIP model '{self.model_name}'")
|
||||
tokenizer_file: dict[str, Any] = json.load(self.tokenizer_file_path.open())
|
||||
log.debug(f"Loaded tokenizer file for CLIP model '{self.model_name}'")
|
||||
return tokenizer_file
|
||||
|
||||
@cached_property
|
||||
def tokenizer_cfg(self) -> dict[str, Any]:
|
||||
log.debug(f"Loading tokenizer config for CLIP model '{self.model_name}'")
|
||||
tokenizer_cfg: dict[str, Any] = json.load(self.tokenizer_cfg_path.open())
|
||||
log.debug(f"Loaded tokenizer config for CLIP model '{self.model_name}'")
|
||||
return tokenizer_cfg
|
||||
|
||||
@cached_property
|
||||
def preprocess_cfg(self) -> dict[str, Any]:
|
||||
log.debug(f"Loading visual preprocessing config for CLIP model '{self.model_name}'")
|
||||
preprocess_cfg: dict[str, Any] = json.load(self.preprocess_cfg_path.open())
|
||||
log.debug(f"Loaded visual preprocessing config for CLIP model '{self.model_name}'")
|
||||
return preprocess_cfg
|
||||
|
||||
|
||||
class OpenCLIPEncoder(BaseCLIPEncoder):
|
||||
def __init__(
|
||||
self,
|
||||
model_name: str,
|
||||
cache_dir: Path | str | None = None,
|
||||
mode: Literal["text", "vision"] | None = None,
|
||||
**model_kwargs: Any,
|
||||
) -> None:
|
||||
super().__init__(clean_name(model_name), cache_dir, mode, **model_kwargs)
|
||||
|
||||
def _load(self) -> None:
|
||||
super()._load()
|
||||
self._load_tokenizer()
|
||||
|
||||
size: list[int] | int = self.preprocess_cfg["size"]
|
||||
self.size = size[0] if isinstance(size, list) else size
|
||||
|
||||
self.resampling = get_pil_resampling(self.preprocess_cfg["interpolation"])
|
||||
self.mean = np.array(self.preprocess_cfg["mean"], dtype=np.float32)
|
||||
self.std = np.array(self.preprocess_cfg["std"], dtype=np.float32)
|
||||
|
||||
def _load_tokenizer(self) -> Tokenizer:
|
||||
log.debug(f"Loading tokenizer for CLIP model '{self.model_name}'")
|
||||
|
||||
text_cfg: dict[str, Any] = self.model_cfg["text_cfg"]
|
||||
context_length: int = text_cfg.get("context_length", 77)
|
||||
pad_token: str = self.tokenizer_cfg["pad_token"]
|
||||
|
||||
self.tokenizer: Tokenizer = Tokenizer.from_file(self.tokenizer_file_path.as_posix())
|
||||
|
||||
pad_id: int = self.tokenizer.token_to_id(pad_token)
|
||||
self.tokenizer.enable_padding(length=context_length, pad_token=pad_token, pad_id=pad_id)
|
||||
self.tokenizer.enable_truncation(max_length=context_length)
|
||||
|
||||
log.debug(f"Loaded tokenizer for CLIP model '{self.model_name}'")
|
||||
|
||||
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
|
||||
tokens: Encoding = self.tokenizer.encode(text)
|
||||
return {"text": np.array([tokens.ids], dtype=np.int32)}
|
||||
|
||||
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
|
||||
image = resize(image, self.size)
|
||||
image = crop(image, self.size)
|
||||
image_np = to_numpy(image)
|
||||
image_np = normalize(image_np, self.mean, self.std)
|
||||
return {"image": np.expand_dims(image_np.transpose(2, 0, 1), 0)}
|
||||
|
||||
|
||||
class MCLIPEncoder(OpenCLIPEncoder):
|
||||
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
|
||||
tokens: Encoding = self.tokenizer.encode(text)
|
||||
return {
|
||||
"input_ids": np.array([tokens.ids], dtype=np.int32),
|
||||
"attention_mask": np.array([tokens.attention_mask], dtype=np.int32),
|
||||
}
|
111
machine-learning/app/models/clip/textual.py
Normal file
111
machine-learning/app/models/clip/textual.py
Normal file
@ -0,0 +1,111 @@
|
||||
import json
|
||||
from abc import abstractmethod
|
||||
from functools import cached_property
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
from numpy.typing import NDArray
|
||||
from tokenizers import Encoding, Tokenizer
|
||||
|
||||
from app.config import log
|
||||
from app.schemas import ModelSession, ModelTask, ModelType
|
||||
|
||||
from app.models.base import InferenceModel
|
||||
|
||||
|
||||
class BaseCLIPTextualEncoder(InferenceModel):
|
||||
_model_task = ModelTask.SEARCH
|
||||
_model_type = ModelType.TEXTUAL
|
||||
|
||||
def _predict(self, inputs: str, **kwargs: Any) -> NDArray[np.float32]:
|
||||
res: NDArray[np.float32] = self.session.run(None, self.tokenize(inputs))[0][0]
|
||||
return res
|
||||
|
||||
def _load(self) -> ModelSession:
|
||||
log.debug(f"Loading tokenizer for CLIP model '{self.model_name}'")
|
||||
self.tokenizer = self._load_tokenizer()
|
||||
log.debug(f"Loaded tokenizer for CLIP model '{self.model_name}'")
|
||||
|
||||
return super()._load()
|
||||
|
||||
@abstractmethod
|
||||
def _load_tokenizer(self) -> Tokenizer:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
|
||||
pass
|
||||
|
||||
@property
|
||||
def model_dir(self) -> Path:
|
||||
return self.cache_dir / "textual"
|
||||
|
||||
@property
|
||||
def model_cfg_path(self) -> Path:
|
||||
return self.cache_dir / "config.json"
|
||||
|
||||
@property
|
||||
def model_path(self) -> Path:
|
||||
return self.model_dir / f"model.{self.preferred_runtime}"
|
||||
|
||||
@property
|
||||
def tokenizer_file_path(self) -> Path:
|
||||
return self.model_dir / "tokenizer.json"
|
||||
|
||||
@property
|
||||
def tokenizer_cfg_path(self) -> Path:
|
||||
return self.model_dir / "tokenizer_config.json"
|
||||
|
||||
@property
|
||||
def cached(self) -> bool:
|
||||
return self.model_path.is_file()
|
||||
|
||||
@cached_property
|
||||
def model_cfg(self) -> dict[str, Any]:
|
||||
log.debug(f"Loading model config for CLIP model '{self.model_name}'")
|
||||
model_cfg: dict[str, Any] = json.load(self.model_cfg_path.open())
|
||||
log.debug(f"Loaded model config for CLIP model '{self.model_name}'")
|
||||
return model_cfg
|
||||
|
||||
@cached_property
|
||||
def tokenizer_file(self) -> dict[str, Any]:
|
||||
log.debug(f"Loading tokenizer file for CLIP model '{self.model_name}'")
|
||||
tokenizer_file: dict[str, Any] = json.load(self.tokenizer_file_path.open())
|
||||
log.debug(f"Loaded tokenizer file for CLIP model '{self.model_name}'")
|
||||
return tokenizer_file
|
||||
|
||||
@cached_property
|
||||
def tokenizer_cfg(self) -> dict[str, Any]:
|
||||
log.debug(f"Loading tokenizer config for CLIP model '{self.model_name}'")
|
||||
tokenizer_cfg: dict[str, Any] = json.load(self.tokenizer_cfg_path.open())
|
||||
log.debug(f"Loaded tokenizer config for CLIP model '{self.model_name}'")
|
||||
return tokenizer_cfg
|
||||
|
||||
|
||||
class OpenClipTextualEncoder(BaseCLIPTextualEncoder):
|
||||
def _load_tokenizer(self) -> Tokenizer:
|
||||
text_cfg: dict[str, Any] = self.model_cfg["text_cfg"]
|
||||
context_length: int = text_cfg.get("context_length", 77)
|
||||
pad_token: str = self.tokenizer_cfg["pad_token"]
|
||||
|
||||
tokenizer: Tokenizer = Tokenizer.from_file(self.tokenizer_file_path.as_posix())
|
||||
|
||||
pad_id: int = tokenizer.token_to_id(pad_token)
|
||||
tokenizer.enable_padding(length=context_length, pad_token=pad_token, pad_id=pad_id)
|
||||
tokenizer.enable_truncation(max_length=context_length)
|
||||
|
||||
return tokenizer
|
||||
|
||||
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
|
||||
tokens: Encoding = self.tokenizer.encode(text)
|
||||
return {"text": np.array([tokens.ids], dtype=np.int32)}
|
||||
|
||||
|
||||
class MClipTextualEncoder(OpenClipTextualEncoder):
|
||||
def tokenize(self, text: str) -> dict[str, NDArray[np.int32]]:
|
||||
tokens: Encoding = self.tokenizer.encode(text)
|
||||
return {
|
||||
"input_ids": np.array([tokens.ids], dtype=np.int32),
|
||||
"attention_mask": np.array([tokens.attention_mask], dtype=np.int32),
|
||||
}
|
84
machine-learning/app/models/clip/visual.py
Normal file
84
machine-learning/app/models/clip/visual.py
Normal file
@ -0,0 +1,84 @@
|
||||
import json
|
||||
from abc import abstractmethod
|
||||
from functools import cached_property
|
||||
from io import BytesIO
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
from numpy.typing import NDArray
|
||||
from PIL import Image
|
||||
|
||||
from app.config import log
|
||||
from app.models.transforms import crop_pil, get_pil_resampling, normalize, resize_pil, to_numpy
|
||||
from app.schemas import ModelSession, ModelTask, ModelType
|
||||
|
||||
from app.models.base import InferenceModel
|
||||
|
||||
|
||||
class BaseCLIPVisualEncoder(InferenceModel):
|
||||
_model_task = ModelTask.SEARCH
|
||||
_model_type = ModelType.VISUAL
|
||||
|
||||
def _predict(self, inputs: Image.Image | bytes, **kwargs: Any) -> NDArray[np.float32]:
|
||||
if isinstance(inputs, bytes):
|
||||
inputs = Image.open(BytesIO(inputs))
|
||||
res: NDArray[np.float32] = self.session.run(None, self.transform(inputs))[0][0]
|
||||
return res
|
||||
|
||||
@abstractmethod
|
||||
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
|
||||
pass
|
||||
|
||||
@property
|
||||
def model_dir(self) -> Path:
|
||||
return self.cache_dir / "visual"
|
||||
|
||||
@property
|
||||
def model_cfg_path(self) -> Path:
|
||||
return self.cache_dir / "config.json"
|
||||
|
||||
@property
|
||||
def model_path(self) -> Path:
|
||||
return self.model_dir / f"model.{self.preferred_runtime}"
|
||||
|
||||
@property
|
||||
def preprocess_cfg_path(self) -> Path:
|
||||
return self.model_dir / "preprocess_cfg.json"
|
||||
|
||||
@property
|
||||
def cached(self) -> bool:
|
||||
return self.model_path.is_file()
|
||||
|
||||
@cached_property
|
||||
def model_cfg(self) -> dict[str, Any]:
|
||||
log.debug(f"Loading model config for CLIP model '{self.model_name}'")
|
||||
model_cfg: dict[str, Any] = json.load(self.model_cfg_path.open())
|
||||
log.debug(f"Loaded model config for CLIP model '{self.model_name}'")
|
||||
return model_cfg
|
||||
|
||||
@cached_property
|
||||
def preprocess_cfg(self) -> dict[str, Any]:
|
||||
log.debug(f"Loading visual preprocessing config for CLIP model '{self.model_name}'")
|
||||
preprocess_cfg: dict[str, Any] = json.load(self.preprocess_cfg_path.open())
|
||||
log.debug(f"Loaded visual preprocessing config for CLIP model '{self.model_name}'")
|
||||
return preprocess_cfg
|
||||
|
||||
|
||||
class OpenClipVisualEncoder(BaseCLIPVisualEncoder):
|
||||
def _load(self) -> ModelSession:
|
||||
size: list[int] | int = self.preprocess_cfg["size"]
|
||||
self.size = size[0] if isinstance(size, list) else size
|
||||
|
||||
self.resampling = get_pil_resampling(self.preprocess_cfg["interpolation"])
|
||||
self.mean = np.array(self.preprocess_cfg["mean"], dtype=np.float32)
|
||||
self.std = np.array(self.preprocess_cfg["std"], dtype=np.float32)
|
||||
|
||||
return super()._load()
|
||||
|
||||
def transform(self, image: Image.Image) -> dict[str, NDArray[np.float32]]:
|
||||
image = resize_pil(image, self.size)
|
||||
image = crop_pil(image, self.size)
|
||||
image_np = to_numpy(image)
|
||||
image_np = normalize(image_np, self.mean, self.std)
|
||||
return {"image": np.expand_dims(image_np.transpose(2, 0, 1), 0)}
|
@ -1,4 +1,5 @@
|
||||
from app.config import clean_name
|
||||
from app.schemas import ModelSource
|
||||
|
||||
_OPENCLIP_MODELS = {
|
||||
"RN50__openai",
|
||||
@ -54,13 +55,16 @@ _INSIGHTFACE_MODELS = {
|
||||
SUPPORTED_PROVIDERS = ["CUDAExecutionProvider", "OpenVINOExecutionProvider", "CPUExecutionProvider"]
|
||||
|
||||
|
||||
def is_openclip(model_name: str) -> bool:
|
||||
return clean_name(model_name) in _OPENCLIP_MODELS
|
||||
def get_model_source(model_name: str) -> ModelSource | None:
|
||||
cleaned_name = clean_name(model_name)
|
||||
|
||||
if cleaned_name in _INSIGHTFACE_MODELS:
|
||||
return ModelSource.INSIGHTFACE
|
||||
|
||||
def is_mclip(model_name: str) -> bool:
|
||||
return clean_name(model_name) in _MCLIP_MODELS
|
||||
if cleaned_name in _MCLIP_MODELS:
|
||||
return ModelSource.MCLIP
|
||||
|
||||
if cleaned_name in _OPENCLIP_MODELS:
|
||||
return ModelSource.OPENCLIP
|
||||
|
||||
def is_insightface(model_name: str) -> bool:
|
||||
return clean_name(model_name) in _INSIGHTFACE_MODELS
|
||||
return None
|
||||
|
@ -1,90 +0,0 @@
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from insightface.model_zoo import ArcFaceONNX, RetinaFace
|
||||
from insightface.utils.face_align import norm_crop
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from app.config import clean_name
|
||||
from app.schemas import Face, ModelType, is_ndarray
|
||||
|
||||
from .base import InferenceModel
|
||||
|
||||
|
||||
class FaceRecognizer(InferenceModel):
|
||||
_model_type = ModelType.FACIAL_RECOGNITION
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_name: str,
|
||||
min_score: float = 0.7,
|
||||
cache_dir: Path | str | None = None,
|
||||
**model_kwargs: Any,
|
||||
) -> None:
|
||||
self.min_score = model_kwargs.pop("minScore", min_score)
|
||||
super().__init__(clean_name(model_name), cache_dir, **model_kwargs)
|
||||
|
||||
def _load(self) -> None:
|
||||
self.det_model = RetinaFace(session=self._make_session(self.det_file))
|
||||
self.rec_model = ArcFaceONNX(
|
||||
self.rec_file.with_suffix(".onnx").as_posix(),
|
||||
session=self._make_session(self.rec_file),
|
||||
)
|
||||
|
||||
self.det_model.prepare(
|
||||
ctx_id=0,
|
||||
det_thresh=self.min_score,
|
||||
input_size=(640, 640),
|
||||
)
|
||||
self.rec_model.prepare(ctx_id=0)
|
||||
|
||||
def _predict(self, image: NDArray[np.uint8] | bytes) -> list[Face]:
|
||||
if isinstance(image, bytes):
|
||||
decoded_image = cv2.imdecode(np.frombuffer(image, np.uint8), cv2.IMREAD_COLOR)
|
||||
else:
|
||||
decoded_image = image
|
||||
assert is_ndarray(decoded_image, np.uint8)
|
||||
bboxes, kpss = self.det_model.detect(decoded_image)
|
||||
if bboxes.size == 0:
|
||||
return []
|
||||
assert is_ndarray(kpss, np.float32)
|
||||
|
||||
scores = bboxes[:, 4].tolist()
|
||||
bboxes = bboxes[:, :4].round().tolist()
|
||||
|
||||
results = []
|
||||
height, width, _ = decoded_image.shape
|
||||
for (x1, y1, x2, y2), score, kps in zip(bboxes, scores, kpss):
|
||||
cropped_img = norm_crop(decoded_image, kps)
|
||||
embedding: NDArray[np.float32] = self.rec_model.get_feat(cropped_img)[0]
|
||||
face: Face = {
|
||||
"imageWidth": width,
|
||||
"imageHeight": height,
|
||||
"boundingBox": {
|
||||
"x1": x1,
|
||||
"y1": y1,
|
||||
"x2": x2,
|
||||
"y2": y2,
|
||||
},
|
||||
"score": score,
|
||||
"embedding": embedding,
|
||||
}
|
||||
results.append(face)
|
||||
return results
|
||||
|
||||
@property
|
||||
def cached(self) -> bool:
|
||||
return self.det_file.is_file() and self.rec_file.is_file()
|
||||
|
||||
@property
|
||||
def det_file(self) -> Path:
|
||||
return self.cache_dir / "detection" / f"model.{self.preferred_runtime}"
|
||||
|
||||
@property
|
||||
def rec_file(self) -> Path:
|
||||
return self.cache_dir / "recognition" / f"model.{self.preferred_runtime}"
|
||||
|
||||
def configure(self, **model_kwargs: Any) -> None:
|
||||
self.det_model.det_thresh = model_kwargs.pop("minScore", self.det_model.det_thresh)
|
60
machine-learning/app/models/facial_recognition/detection.py
Normal file
60
machine-learning/app/models/facial_recognition/detection.py
Normal file
@ -0,0 +1,60 @@
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from insightface.model_zoo import RetinaFace
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from app.schemas import DetectedFace, ModelSession, ModelTask, ModelType, is_ndarray
|
||||
|
||||
from app.models.base import InferenceModel
|
||||
|
||||
|
||||
class FaceDetector(InferenceModel):
|
||||
_model_task = ModelTask.FACIAL_RECOGNITION
|
||||
_model_type = ModelType.DETECTION
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_name: str,
|
||||
min_score: float = 0.7,
|
||||
cache_dir: Path | str | None = None,
|
||||
**model_kwargs: Any,
|
||||
) -> None:
|
||||
self.min_score = model_kwargs.pop("minScore", min_score)
|
||||
super().__init__(model_name, cache_dir, **model_kwargs)
|
||||
|
||||
def _load(self) -> ModelSession:
|
||||
session = self._make_session(self.model_path)
|
||||
self.det_model = RetinaFace(session=session)
|
||||
self.det_model.prepare(ctx_id=0, det_thresh=self.min_score, input_size=(640, 640))
|
||||
|
||||
return session
|
||||
|
||||
def _predict(self, inputs: NDArray[np.uint8] | bytes, **kwargs: Any) -> list[DetectedFace]:
|
||||
if isinstance(inputs, bytes):
|
||||
decoded_image = cv2.imdecode(np.frombuffer(inputs, np.uint8), cv2.IMREAD_COLOR)
|
||||
else:
|
||||
decoded_image = inputs
|
||||
assert is_ndarray(decoded_image, np.uint8)
|
||||
|
||||
bboxes, landmarks = self.det_model.detect(decoded_image)
|
||||
assert is_ndarray(bboxes, np.float32)
|
||||
assert is_ndarray(landmarks, np.float32)
|
||||
|
||||
if bboxes.size == 0:
|
||||
return []
|
||||
|
||||
scores: list[float] = bboxes[:, 4].tolist()
|
||||
bboxes_list: list[list[int]] = bboxes[:, :4].round().tolist()
|
||||
|
||||
results: list[DetectedFace] = [
|
||||
{"box": {"x1": x1, "y1": y1, "x2": x2, "y2": y2}, "score": score, "landmarks": face_landmarks}
|
||||
for (x1, y1, x2, y2), score, face_landmarks in zip(bboxes_list, scores, landmarks)
|
||||
]
|
||||
|
||||
return results
|
||||
|
||||
def configure(self, **kwargs: Any) -> None:
|
||||
self.det_model.det_thresh = kwargs.pop("minScore", self.det_model.det_thresh)
|
31
machine-learning/app/models/facial_recognition/pipeline.py
Normal file
31
machine-learning/app/models/facial_recognition/pipeline.py
Normal file
@ -0,0 +1,31 @@
|
||||
from typing import Any
|
||||
import cv2
|
||||
import numpy as np
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from app.models.facial_recognition.detection import FaceDetector
|
||||
from app.models.facial_recognition.recognition import FaceRecognizer
|
||||
from app.schemas import RecognizedFace, is_ndarray
|
||||
|
||||
|
||||
class FacialRecognitionPipeline:
|
||||
def __init__(self, det_model: FaceDetector, rec_model: FaceRecognizer) -> None:
|
||||
self.det_model = det_model
|
||||
self.rec_model = rec_model
|
||||
self.loaded = False
|
||||
|
||||
def load(self) -> None:
|
||||
self.det_model.load()
|
||||
self.rec_model.load()
|
||||
self.loaded = True
|
||||
|
||||
def predict(self, inputs: NDArray[np.uint8] | bytes, **kwargs: Any) -> list[RecognizedFace]:
|
||||
if isinstance(inputs, bytes):
|
||||
decoded_image = cv2.imdecode(np.frombuffer(inputs, np.uint8), cv2.IMREAD_COLOR)
|
||||
else:
|
||||
decoded_image = inputs
|
||||
assert is_ndarray(decoded_image, np.uint8)
|
||||
|
||||
faces = self.det_model.predict(decoded_image, **kwargs)
|
||||
results: list[RecognizedFace] = self.rec_model.predict(decoded_image, faces=faces, **kwargs)
|
||||
return results
|
@ -0,0 +1,65 @@
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import cv2
|
||||
import numpy as np
|
||||
from insightface.model_zoo import ArcFaceONNX
|
||||
from insightface.utils.face_align import norm_crop
|
||||
from numpy.typing import NDArray
|
||||
|
||||
from app.config import clean_name
|
||||
from app.models.transforms import crop_np, crop_bounding_box, resize_np
|
||||
from app.schemas import DetectedFace, ModelTask, RecognizedFace, ModelSession, ModelType, is_ndarray
|
||||
|
||||
from ..base import InferenceModel
|
||||
|
||||
|
||||
class FaceRecognizer(InferenceModel):
|
||||
_model_task = ModelTask.FACIAL_RECOGNITION
|
||||
_model_type = ModelType.RECOGNITION
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
model_name: str,
|
||||
min_score: float = 0.7,
|
||||
cache_dir: Path | str | None = None,
|
||||
**model_kwargs: Any,
|
||||
) -> None:
|
||||
self.min_score = model_kwargs.pop("minScore", min_score)
|
||||
super().__init__(clean_name(model_name), cache_dir, **model_kwargs)
|
||||
|
||||
def _load(self) -> ModelSession:
|
||||
session = self._make_session(self.model_path)
|
||||
self.model = ArcFaceONNX(
|
||||
self.model_path.with_suffix(".onnx").as_posix(),
|
||||
session=session,
|
||||
)
|
||||
return session
|
||||
|
||||
# def _predict(self, img: Any, **kwargs: Any) -> Any:
|
||||
def _predict(
|
||||
self, inputs: NDArray[np.uint8] | bytes, faces: list[DetectedFace] = [], **kwargs: Any
|
||||
) -> list[RecognizedFace]:
|
||||
if isinstance(inputs, bytes):
|
||||
decoded_image = cv2.imdecode(np.frombuffer(inputs, np.uint8), cv2.IMREAD_COLOR)
|
||||
else:
|
||||
decoded_image = inputs
|
||||
assert is_ndarray(decoded_image, np.float32)
|
||||
|
||||
results: list[RecognizedFace] = []
|
||||
for detected_face in faces:
|
||||
landmarks = detected_face.get("landmarks", None)
|
||||
if landmarks is not None:
|
||||
cropped_img = norm_crop(decoded_image, np.asarray(landmarks))
|
||||
else:
|
||||
cropped_img = crop_bounding_box(decoded_image, detected_face["box"])
|
||||
cropped_img = crop_np(resize_np(cropped_img, 112), 112)
|
||||
assert is_ndarray(cropped_img, np.uint8)
|
||||
|
||||
embedding = self.model.get_feat(cropped_img)[0]
|
||||
assert is_ndarray(embedding, np.float32)
|
||||
|
||||
face: RecognizedFace = {"box": detected_face["box"], "embedding": embedding}
|
||||
results.append(face)
|
||||
|
||||
return results
|
0
machine-learning/app/models/session.py
Normal file
0
machine-learning/app/models/session.py
Normal file
@ -1,19 +1,54 @@
|
||||
import cv2
|
||||
import numpy as np
|
||||
from numpy.typing import NDArray
|
||||
from PIL import Image
|
||||
|
||||
from app.schemas import BoundingBox, is_ndarray
|
||||
|
||||
_PIL_RESAMPLING_METHODS = {resampling.name.lower(): resampling for resampling in Image.Resampling}
|
||||
|
||||
|
||||
def resize(img: Image.Image, size: int) -> Image.Image:
|
||||
def resize_pil(img: Image.Image, size: int) -> Image.Image:
|
||||
if img.width < img.height:
|
||||
return img.resize((size, int((img.height / img.width) * size)), resample=Image.BICUBIC)
|
||||
else:
|
||||
return img.resize((int((img.width / img.height) * size), size), resample=Image.BICUBIC)
|
||||
|
||||
|
||||
def resize_np(img: NDArray[np.float32], size: int) -> NDArray[np.float32]:
|
||||
height, width = img.shape[:2]
|
||||
if width < height:
|
||||
res = cv2.resize(img, (size, int((height / width) * size)), interpolation=cv2.INTER_CUBIC)
|
||||
else:
|
||||
res = cv2.resize(img, (int((width / height) * size), size), interpolation=cv2.INTER_CUBIC)
|
||||
assert is_ndarray(res, np.float32)
|
||||
return res
|
||||
|
||||
|
||||
# ported from server
|
||||
def crop_bounding_box(image: NDArray[np.float32], bbox: BoundingBox, scale: float = 1.0) -> NDArray[np.float32]:
|
||||
middle_x = (bbox["x1"] + bbox["x2"]) // 2
|
||||
middle_y = (bbox["y1"] + bbox["y2"]) // 2
|
||||
|
||||
target_half_size = int(max((bbox["x2"] - bbox["x1"]) / 2, (bbox["y2"] - bbox["y1"]) / 2) * scale)
|
||||
|
||||
new_half_size = min(
|
||||
middle_x - max(0, middle_x - target_half_size),
|
||||
middle_y - max(0, middle_y - target_half_size),
|
||||
min(image.shape[1] - 1, middle_x + target_half_size) - middle_x,
|
||||
min(image.shape[0] - 1, middle_y + target_half_size) - middle_y,
|
||||
)
|
||||
|
||||
left = middle_x - new_half_size
|
||||
top = middle_y - new_half_size
|
||||
width = int(new_half_size * 2)
|
||||
height = int(new_half_size * 2)
|
||||
|
||||
return image[top : top + height, left : left + width]
|
||||
|
||||
|
||||
# https://stackoverflow.com/a/60883103
|
||||
def crop(img: Image.Image, size: int) -> Image.Image:
|
||||
def crop_pil(img: Image.Image, size: int) -> Image.Image:
|
||||
left = int((img.size[0] / 2) - (size / 2))
|
||||
upper = int((img.size[1] / 2) - (size / 2))
|
||||
right = left + size
|
||||
@ -22,6 +57,16 @@ def crop(img: Image.Image, size: int) -> Image.Image:
|
||||
return img.crop((left, upper, right, lower))
|
||||
|
||||
|
||||
def crop_np(img: NDArray[np.float32], size: int) -> NDArray[np.generic]:
|
||||
height, width = img.shape[:2]
|
||||
left = int((width / 2) - (size / 2))
|
||||
upper = int((height / 2) - (size / 2))
|
||||
right = left + size
|
||||
lower = upper + size
|
||||
|
||||
return img[upper:lower, left:right]
|
||||
|
||||
|
||||
def to_numpy(img: Image.Image) -> NDArray[np.float32]:
|
||||
return np.asarray(img.convert("RGB")).astype(np.float32) / 255.0
|
||||
|
||||
|
@ -3,7 +3,7 @@ from typing import Any, Protocol, TypedDict, TypeGuard
|
||||
|
||||
import numpy as np
|
||||
import numpy.typing as npt
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class StrEnum(str, Enum):
|
||||
@ -28,26 +28,60 @@ class BoundingBox(TypedDict):
|
||||
y2: int
|
||||
|
||||
|
||||
class ModelType(StrEnum):
|
||||
CLIP = "clip"
|
||||
class ModelTask(StrEnum):
|
||||
FACIAL_RECOGNITION = "facial-recognition"
|
||||
SEARCH = "clip"
|
||||
|
||||
|
||||
class ModelRuntime(StrEnum):
|
||||
ONNX = "onnx"
|
||||
class ModelType(StrEnum):
|
||||
DETECTION = "detection"
|
||||
PIPELINE = "pipeline"
|
||||
RECOGNITION = "recognition"
|
||||
TEXTUAL = "textual"
|
||||
VISUAL = "visual"
|
||||
|
||||
|
||||
class ModelFormat(StrEnum):
|
||||
ARMNN = "armnn"
|
||||
ONNX = "onnx"
|
||||
|
||||
|
||||
class ModelSource(StrEnum):
|
||||
INSIGHTFACE = "insightface"
|
||||
MCLIP = "mclip"
|
||||
OPENCLIP = "openclip"
|
||||
|
||||
|
||||
class ModelSession(Protocol):
|
||||
def run(
|
||||
self,
|
||||
output_names: list[str] | None,
|
||||
input_feed: dict[str, npt.NDArray[np.float32]] | dict[str, npt.NDArray[np.int32]],
|
||||
run_options: Any = None,
|
||||
) -> list[npt.NDArray[np.float32]]: ...
|
||||
|
||||
|
||||
class Predictor(Protocol):
|
||||
loaded: bool
|
||||
|
||||
def load(self) -> None: ...
|
||||
|
||||
def predict(self, inputs: Any, **model_kwargs: Any) -> Any: ...
|
||||
|
||||
|
||||
class HasProfiling(Protocol):
|
||||
profiling: dict[str, float]
|
||||
|
||||
|
||||
class Face(TypedDict):
|
||||
boundingBox: BoundingBox
|
||||
embedding: npt.NDArray[np.float32]
|
||||
imageWidth: int
|
||||
imageHeight: int
|
||||
class DetectedFace(TypedDict):
|
||||
box: BoundingBox
|
||||
score: float
|
||||
landmarks: npt.NDArray[np.float32] | None
|
||||
|
||||
|
||||
class RecognizedFace(TypedDict):
|
||||
box: BoundingBox
|
||||
embedding: npt.NDArray[np.float32]
|
||||
|
||||
|
||||
def has_profiling(obj: Any) -> TypeGuard[HasProfiling]:
|
||||
|
@ -17,13 +17,14 @@ from pytest import MonkeyPatch
|
||||
from pytest_mock import MockerFixture
|
||||
|
||||
from app.main import load, preload_models
|
||||
from app.models.clip.textual import MClipTextualEncoder, OpenClipTextualEncoder
|
||||
from app.models.clip.visual import OpenClipVisualEncoder
|
||||
from app.models.facial_recognition.recognition import FaceRecognizer
|
||||
|
||||
from .config import Settings, log, settings
|
||||
from .models.base import InferenceModel
|
||||
from .models.cache import ModelCache
|
||||
from .models.clip import MCLIPEncoder, OpenCLIPEncoder
|
||||
from .models.facial_recognition import FaceRecognizer
|
||||
from .schemas import ModelRuntime, ModelType
|
||||
from .schemas import ModelFormat, ModelTask, ModelType
|
||||
|
||||
|
||||
class TestBase:
|
||||
@ -35,13 +36,13 @@ class TestBase:
|
||||
|
||||
@pytest.mark.providers(CPU_EP)
|
||||
def test_sets_cpu_provider(self, providers: list[str]) -> None:
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.providers == self.CPU_EP
|
||||
|
||||
@pytest.mark.providers(CUDA_EP)
|
||||
def test_sets_cuda_provider_if_available(self, providers: list[str]) -> None:
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.providers == self.CUDA_EP
|
||||
|
||||
@ -50,7 +51,7 @@ class TestBase:
|
||||
mocked = mocker.patch("app.models.base.ort.capi._pybind_state")
|
||||
mocked.get_available_openvino_device_ids.return_value = ["GPU.0", "CPU"]
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.providers == self.OV_EP
|
||||
|
||||
@ -59,25 +60,25 @@ class TestBase:
|
||||
mocked = mocker.patch("app.models.base.ort.capi._pybind_state")
|
||||
mocked.get_available_openvino_device_ids.return_value = ["CPU"]
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.providers == self.CPU_EP
|
||||
|
||||
@pytest.mark.providers(CUDA_EP_OUT_OF_ORDER)
|
||||
def test_sets_providers_in_correct_order(self, providers: list[str]) -> None:
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.providers == self.CUDA_EP
|
||||
|
||||
@pytest.mark.providers(TRT_EP)
|
||||
def test_ignores_unsupported_providers(self, providers: list[str]) -> None:
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.providers == self.CUDA_EP
|
||||
|
||||
def test_sets_provider_kwarg(self) -> None:
|
||||
providers = ["CUDAExecutionProvider"]
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", providers=providers)
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", providers=providers)
|
||||
|
||||
assert encoder.providers == providers
|
||||
|
||||
@ -85,7 +86,9 @@ class TestBase:
|
||||
mocked = mocker.patch("app.models.base.ort.capi._pybind_state")
|
||||
mocked.get_available_openvino_device_ids.return_value = ["GPU.0", "CPU"]
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"])
|
||||
encoder = OpenClipTextualEncoder(
|
||||
"ViT-B-32__openai", providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"]
|
||||
)
|
||||
|
||||
assert encoder.provider_options == [
|
||||
{"device_type": "GPU_FP32", "cache_dir": (encoder.cache_dir / "openvino").as_posix()},
|
||||
@ -93,7 +96,7 @@ class TestBase:
|
||||
]
|
||||
|
||||
def test_sets_provider_options_kwarg(self) -> None:
|
||||
encoder = OpenCLIPEncoder(
|
||||
encoder = OpenClipTextualEncoder(
|
||||
"ViT-B-32__openai",
|
||||
providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"],
|
||||
provider_options=[],
|
||||
@ -102,7 +105,7 @@ class TestBase:
|
||||
assert encoder.provider_options == []
|
||||
|
||||
def test_sets_default_sess_options(self) -> None:
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.sess_options.execution_mode == ort.ExecutionMode.ORT_SEQUENTIAL
|
||||
assert encoder.sess_options.inter_op_num_threads == 1
|
||||
@ -110,7 +113,9 @@ class TestBase:
|
||||
assert encoder.sess_options.enable_cpu_mem_arena is False
|
||||
|
||||
def test_sets_default_sess_options_does_not_set_threads_if_non_cpu_and_default_threads(self) -> None:
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
|
||||
encoder = OpenClipTextualEncoder(
|
||||
"ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"]
|
||||
)
|
||||
|
||||
assert encoder.sess_options.inter_op_num_threads == 0
|
||||
assert encoder.sess_options.intra_op_num_threads == 0
|
||||
@ -120,14 +125,16 @@ class TestBase:
|
||||
mock_settings.model_inter_op_threads = 2
|
||||
mock_settings.model_intra_op_threads = 4
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"])
|
||||
encoder = OpenClipTextualEncoder(
|
||||
"ViT-B-32__openai", providers=["CUDAExecutionProvider", "CPUExecutionProvider"]
|
||||
)
|
||||
|
||||
assert encoder.sess_options.inter_op_num_threads == 2
|
||||
assert encoder.sess_options.intra_op_num_threads == 4
|
||||
|
||||
def test_sets_sess_options_kwarg(self) -> None:
|
||||
sess_options = ort.SessionOptions()
|
||||
encoder = OpenCLIPEncoder(
|
||||
encoder = OpenClipTextualEncoder(
|
||||
"ViT-B-32__openai",
|
||||
providers=["OpenVINOExecutionProvider", "CPUExecutionProvider"],
|
||||
provider_options=[],
|
||||
@ -137,13 +144,13 @@ class TestBase:
|
||||
assert sess_options is encoder.sess_options
|
||||
|
||||
def test_sets_default_cache_dir(self) -> None:
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.cache_dir == Path(settings.cache_folder) / "clip" / "ViT-B-32__openai"
|
||||
|
||||
def test_sets_cache_dir_kwarg(self) -> None:
|
||||
cache_dir = Path("/test_cache")
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=cache_dir)
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=cache_dir)
|
||||
|
||||
assert encoder.cache_dir == cache_dir
|
||||
|
||||
@ -151,29 +158,29 @@ class TestBase:
|
||||
mocker.patch.object(settings, "ann", True)
|
||||
mocker.patch("ann.ann.is_available", False)
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.preferred_runtime == ModelRuntime.ONNX
|
||||
assert encoder.preferred_runtime == ModelFormat.ONNX
|
||||
|
||||
def test_sets_default_preferred_runtime_to_armnn_if_available(self, mocker: MockerFixture) -> None:
|
||||
mocker.patch.object(settings, "ann", True)
|
||||
mocker.patch("ann.ann.is_available", True)
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
|
||||
assert encoder.preferred_runtime == ModelRuntime.ARMNN
|
||||
assert encoder.preferred_runtime == ModelFormat.ARMNN
|
||||
|
||||
def test_sets_preferred_runtime_kwarg(self, mocker: MockerFixture) -> None:
|
||||
mocker.patch.object(settings, "ann", False)
|
||||
mocker.patch("ann.ann.is_available", False)
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", preferred_runtime=ModelRuntime.ARMNN)
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", preferred_runtime=ModelFormat.ARMNN)
|
||||
|
||||
assert encoder.preferred_runtime == ModelRuntime.ARMNN
|
||||
assert encoder.preferred_runtime == ModelFormat.ARMNN
|
||||
|
||||
def test_casts_cache_dir_string_to_path(self) -> None:
|
||||
cache_dir = "/test_cache"
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=cache_dir)
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=cache_dir)
|
||||
|
||||
assert encoder.cache_dir == Path(cache_dir)
|
||||
|
||||
@ -186,7 +193,7 @@ class TestBase:
|
||||
mocker.patch("app.models.base.Path", return_value=mock_cache_dir)
|
||||
info = mocker.spy(log, "info")
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
|
||||
encoder.clear_cache()
|
||||
|
||||
mock_rmtree.assert_called_once_with(encoder.cache_dir)
|
||||
@ -201,7 +208,7 @@ class TestBase:
|
||||
mocker.patch("app.models.base.Path", return_value=mock_cache_dir)
|
||||
warning = mocker.spy(log, "warning")
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
|
||||
encoder.clear_cache()
|
||||
|
||||
mock_rmtree.assert_not_called()
|
||||
@ -215,7 +222,7 @@ class TestBase:
|
||||
mock_cache_dir.is_dir.return_value = True
|
||||
mocker.patch("app.models.base.Path", return_value=mock_cache_dir)
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
|
||||
with pytest.raises(RuntimeError):
|
||||
encoder.clear_cache()
|
||||
|
||||
@ -230,7 +237,7 @@ class TestBase:
|
||||
mocker.patch("app.models.base.Path", return_value=mock_cache_dir)
|
||||
warning = mocker.spy(log, "warning")
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir=mock_cache_dir)
|
||||
encoder.clear_cache()
|
||||
|
||||
mock_rmtree.assert_not_called()
|
||||
@ -245,7 +252,7 @@ class TestBase:
|
||||
mock_model_path.with_suffix.return_value = mock_model_path
|
||||
mock_ann = mocker.patch("app.models.base.AnnSession")
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
encoder._make_session(mock_model_path)
|
||||
|
||||
mock_ann.assert_called_once()
|
||||
@ -263,7 +270,7 @@ class TestBase:
|
||||
mock_ann = mocker.patch("app.models.base.AnnSession")
|
||||
mock_ort = mocker.patch("app.models.base.ort.InferenceSession")
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
encoder._make_session(mock_armnn_path)
|
||||
|
||||
mock_ort.assert_called_once()
|
||||
@ -277,7 +284,7 @@ class TestBase:
|
||||
mock_ann = mocker.patch("app.models.base.AnnSession")
|
||||
mock_ort = mocker.patch("app.models.base.ort.InferenceSession")
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai")
|
||||
with pytest.raises(ValueError):
|
||||
encoder._make_session(mock_model_path)
|
||||
|
||||
@ -287,7 +294,7 @@ class TestBase:
|
||||
def test_download(self, mocker: MockerFixture) -> None:
|
||||
mock_snapshot_download = mocker.patch("app.models.base.snapshot_download")
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir="/path/to/cache")
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="/path/to/cache")
|
||||
encoder.download()
|
||||
|
||||
mock_snapshot_download.assert_called_once_with(
|
||||
@ -301,7 +308,7 @@ class TestBase:
|
||||
def test_download_downloads_armnn_if_preferred_runtime(self, mocker: MockerFixture) -> None:
|
||||
mock_snapshot_download = mocker.patch("app.models.base.snapshot_download")
|
||||
|
||||
encoder = OpenCLIPEncoder("ViT-B-32__openai", preferred_runtime=ModelRuntime.ARMNN)
|
||||
encoder = OpenClipTextualEncoder("ViT-B-32__openai", preferred_runtime=ModelFormat.ARMNN)
|
||||
encoder.download()
|
||||
|
||||
mock_snapshot_download.assert_called_once_with(
|
||||
@ -323,21 +330,18 @@ class TestCLIP:
|
||||
mocker: MockerFixture,
|
||||
clip_model_cfg: dict[str, Any],
|
||||
clip_preprocess_cfg: Callable[[Path], dict[str, Any]],
|
||||
clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
|
||||
) -> None:
|
||||
mocker.patch.object(OpenCLIPEncoder, "download")
|
||||
mocker.patch.object(OpenCLIPEncoder, "model_cfg", clip_model_cfg)
|
||||
mocker.patch.object(OpenCLIPEncoder, "preprocess_cfg", clip_preprocess_cfg)
|
||||
mocker.patch.object(OpenCLIPEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
|
||||
mocker.patch.object(OpenClipVisualEncoder, "download")
|
||||
mocker.patch.object(OpenClipVisualEncoder, "model_cfg", clip_model_cfg)
|
||||
mocker.patch.object(OpenClipVisualEncoder, "preprocess_cfg", clip_preprocess_cfg)
|
||||
|
||||
mocked = mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
|
||||
mocked.run.return_value = [[self.embedding]]
|
||||
mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True)
|
||||
|
||||
clip_encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="vision")
|
||||
clip_encoder = OpenClipVisualEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="vision")
|
||||
embedding = clip_encoder.predict(pil_image)
|
||||
|
||||
assert clip_encoder.mode == "vision"
|
||||
assert isinstance(embedding, np.ndarray)
|
||||
assert embedding.shape[0] == clip_model_cfg["embed_dim"]
|
||||
assert embedding.dtype == np.float32
|
||||
@ -347,22 +351,19 @@ class TestCLIP:
|
||||
self,
|
||||
mocker: MockerFixture,
|
||||
clip_model_cfg: dict[str, Any],
|
||||
clip_preprocess_cfg: Callable[[Path], dict[str, Any]],
|
||||
clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
|
||||
) -> None:
|
||||
mocker.patch.object(OpenCLIPEncoder, "download")
|
||||
mocker.patch.object(OpenCLIPEncoder, "model_cfg", clip_model_cfg)
|
||||
mocker.patch.object(OpenCLIPEncoder, "preprocess_cfg", clip_preprocess_cfg)
|
||||
mocker.patch.object(OpenCLIPEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
|
||||
mocker.patch.object(OpenClipTextualEncoder, "download")
|
||||
mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
|
||||
mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
|
||||
|
||||
mocked = mocker.patch.object(InferenceModel, "_make_session", autospec=True).return_value
|
||||
mocked.run.return_value = [[self.embedding]]
|
||||
mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True)
|
||||
|
||||
clip_encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text")
|
||||
clip_encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text")
|
||||
embedding = clip_encoder.predict("test search query")
|
||||
|
||||
assert clip_encoder.mode == "text"
|
||||
assert isinstance(embedding, np.ndarray)
|
||||
assert embedding.shape[0] == clip_model_cfg["embed_dim"]
|
||||
assert embedding.dtype == np.float32
|
||||
@ -372,18 +373,16 @@ class TestCLIP:
|
||||
self,
|
||||
mocker: MockerFixture,
|
||||
clip_model_cfg: dict[str, Any],
|
||||
clip_preprocess_cfg: Callable[[Path], dict[str, Any]],
|
||||
clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
|
||||
) -> None:
|
||||
mocker.patch.object(OpenCLIPEncoder, "download")
|
||||
mocker.patch.object(OpenCLIPEncoder, "model_cfg", clip_model_cfg)
|
||||
mocker.patch.object(OpenCLIPEncoder, "preprocess_cfg", clip_preprocess_cfg)
|
||||
mocker.patch.object(OpenCLIPEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
|
||||
mocker.patch.object(OpenClipTextualEncoder, "download")
|
||||
mocker.patch.object(OpenClipTextualEncoder, "model_cfg", clip_model_cfg)
|
||||
mocker.patch.object(OpenClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
|
||||
mock_tokenizer = mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True).return_value
|
||||
mock_ids = [randint(0, 50000) for _ in range(77)]
|
||||
mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids)
|
||||
|
||||
clip_encoder = OpenCLIPEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text")
|
||||
clip_encoder = OpenClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text")
|
||||
clip_encoder._load_tokenizer()
|
||||
tokens = clip_encoder.tokenize("test search query")
|
||||
|
||||
@ -397,19 +396,17 @@ class TestCLIP:
|
||||
self,
|
||||
mocker: MockerFixture,
|
||||
clip_model_cfg: dict[str, Any],
|
||||
clip_preprocess_cfg: Callable[[Path], dict[str, Any]],
|
||||
clip_tokenizer_cfg: Callable[[Path], dict[str, Any]],
|
||||
) -> None:
|
||||
mocker.patch.object(OpenCLIPEncoder, "download")
|
||||
mocker.patch.object(OpenCLIPEncoder, "model_cfg", clip_model_cfg)
|
||||
mocker.patch.object(OpenCLIPEncoder, "preprocess_cfg", clip_preprocess_cfg)
|
||||
mocker.patch.object(OpenCLIPEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
|
||||
mocker.patch.object(MClipTextualEncoder, "download")
|
||||
mocker.patch.object(MClipTextualEncoder, "model_cfg", clip_model_cfg)
|
||||
mocker.patch.object(MClipTextualEncoder, "tokenizer_cfg", clip_tokenizer_cfg)
|
||||
mock_tokenizer = mocker.patch("app.models.clip.Tokenizer.from_file", autospec=True).return_value
|
||||
mock_ids = [randint(0, 50000) for _ in range(77)]
|
||||
mock_attention_mask = [randint(0, 1) for _ in range(77)]
|
||||
mock_tokenizer.encode.return_value = SimpleNamespace(ids=mock_ids, attention_mask=mock_attention_mask)
|
||||
|
||||
clip_encoder = MCLIPEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text")
|
||||
clip_encoder = MClipTextualEncoder("ViT-B-32__openai", cache_dir="test_cache", mode="text")
|
||||
clip_encoder._load_tokenizer()
|
||||
tokens = clip_encoder.tokenize("test search query")
|
||||
|
||||
@ -440,12 +437,12 @@ class TestFaceRecognition:
|
||||
score = np.array([[0.67]] * num_faces).astype(np.float32)
|
||||
kpss = np.random.rand(num_faces, 5, 2).astype(np.float32)
|
||||
det_model.detect.return_value = (np.concatenate([bbox, score], axis=-1), kpss)
|
||||
face_recognizer.det_model = det_model
|
||||
face_recognizer.model = det_model
|
||||
|
||||
rec_model = mock.Mock()
|
||||
embedding = np.random.rand(num_faces, 512).astype(np.float32)
|
||||
rec_model.get_feat.return_value = embedding
|
||||
face_recognizer.rec_model = rec_model
|
||||
face_recognizer.model = rec_model
|
||||
|
||||
faces = face_recognizer.predict(cv_image)
|
||||
|
||||
@ -465,24 +462,28 @@ class TestFaceRecognition:
|
||||
class TestCache:
|
||||
async def test_caches(self, mock_get_model: mock.Mock) -> None:
|
||||
model_cache = ModelCache()
|
||||
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION)
|
||||
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION)
|
||||
await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION)
|
||||
await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION)
|
||||
assert len(model_cache.cache._cache) == 1
|
||||
mock_get_model.assert_called_once()
|
||||
|
||||
async def test_kwargs_used(self, mock_get_model: mock.Mock) -> None:
|
||||
model_cache = ModelCache()
|
||||
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, cache_dir="test_cache")
|
||||
mock_get_model.assert_called_once_with(ModelType.FACIAL_RECOGNITION, "test_model_name", cache_dir="test_cache")
|
||||
await model_cache.get(
|
||||
"test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, cache_dir="test_cache"
|
||||
)
|
||||
mock_get_model.assert_called_once_with(
|
||||
ModelTask.FACIAL_RECOGNITION, ModelType.RECOGNITION, "test_model_name", cache_dir="test_cache"
|
||||
)
|
||||
|
||||
async def test_different_clip(self, mock_get_model: mock.Mock) -> None:
|
||||
model_cache = ModelCache()
|
||||
await model_cache.get("test_image_model_name", ModelType.CLIP)
|
||||
await model_cache.get("test_text_model_name", ModelType.CLIP)
|
||||
await model_cache.get("test_image_model_name", ModelType.VISUAL, ModelTask.SEARCH)
|
||||
await model_cache.get("test_text_model_name", ModelType.TEXTUAL, ModelTask.SEARCH)
|
||||
mock_get_model.assert_has_calls(
|
||||
[
|
||||
mock.call(ModelType.CLIP, "test_image_model_name"),
|
||||
mock.call(ModelType.CLIP, "test_text_model_name"),
|
||||
mock.call(ModelTask.SEARCH, "test_image_model_name"),
|
||||
mock.call(ModelTask.SEARCH, "test_text_model_name"),
|
||||
]
|
||||
)
|
||||
assert len(model_cache.cache._cache) == 2
|
||||
@ -490,19 +491,19 @@ class TestCache:
|
||||
@mock.patch("app.models.cache.OptimisticLock", autospec=True)
|
||||
async def test_model_ttl(self, mock_lock_cls: mock.Mock, mock_get_model: mock.Mock) -> None:
|
||||
model_cache = ModelCache()
|
||||
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, ttl=100)
|
||||
await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
|
||||
mock_lock_cls.return_value.__aenter__.return_value.cas.assert_called_with(mock.ANY, ttl=100)
|
||||
|
||||
@mock.patch("app.models.cache.SimpleMemoryCache.expire")
|
||||
async def test_revalidate_get(self, mock_cache_expire: mock.Mock, mock_get_model: mock.Mock) -> None:
|
||||
model_cache = ModelCache(revalidate=True)
|
||||
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, ttl=100)
|
||||
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, ttl=100)
|
||||
await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
|
||||
await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
|
||||
mock_cache_expire.assert_called_once_with(mock.ANY, 100)
|
||||
|
||||
async def test_profiling(self, mock_get_model: mock.Mock) -> None:
|
||||
model_cache = ModelCache(profiling=True)
|
||||
await model_cache.get("test_model_name", ModelType.FACIAL_RECOGNITION, ttl=100)
|
||||
await model_cache.get("test_model_name", ModelType.RECOGNITION, ModelTask.FACIAL_RECOGNITION, ttl=100)
|
||||
profiling = await model_cache.get_profiling()
|
||||
assert isinstance(profiling, dict)
|
||||
assert profiling == model_cache.cache.profiling
|
||||
@ -510,9 +511,9 @@ class TestCache:
|
||||
async def test_loads_mclip(self) -> None:
|
||||
model_cache = ModelCache()
|
||||
|
||||
model = await model_cache.get("XLM-Roberta-Large-Vit-B-32", ModelType.CLIP, mode="text")
|
||||
model = await model_cache.get("XLM-Roberta-Large-Vit-B-32", ModelType.TEXTUAL, ModelTask.SEARCH)
|
||||
|
||||
assert isinstance(model, MCLIPEncoder)
|
||||
assert isinstance(model, MClipTextualEncoder)
|
||||
assert model.model_name == "XLM-Roberta-Large-Vit-B-32"
|
||||
|
||||
async def test_raises_exception_if_invalid_model_type(self) -> None:
|
||||
@ -520,13 +521,13 @@ class TestCache:
|
||||
model_cache = ModelCache()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
await model_cache.get("XLM-Roberta-Large-Vit-B-32", invalid, mode="text")
|
||||
await model_cache.get("XLM-Roberta-Large-Vit-B-32", ModelType.TEXTUAL, invalid)
|
||||
|
||||
async def test_raises_exception_if_unknown_model_name(self) -> None:
|
||||
model_cache = ModelCache()
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
await model_cache.get("test_model_name", ModelType.CLIP, mode="text")
|
||||
await model_cache.get("test_model_name", ModelType.TEXTUAL, ModelTask.SEARCH)
|
||||
|
||||
async def test_preloads_models(self, monkeypatch: MonkeyPatch, mock_get_model: mock.Mock) -> None:
|
||||
os.environ["MACHINE_LEARNING_PRELOAD__CLIP"] = "ViT-B-32__openai"
|
||||
@ -541,11 +542,12 @@ class TestCache:
|
||||
monkeypatch.setattr("app.main.model_cache", model_cache)
|
||||
|
||||
await preload_models(settings.preload)
|
||||
assert len(model_cache.cache._cache) == 2
|
||||
assert mock_get_model.call_count == 2
|
||||
await model_cache.get("ViT-B-32__openai", ModelType.CLIP, ttl=100)
|
||||
await model_cache.get("buffalo_s", ModelType.FACIAL_RECOGNITION, ttl=100)
|
||||
assert mock_get_model.call_count == 2
|
||||
assert len(model_cache.cache._cache) == 3
|
||||
assert mock_get_model.call_count == 3
|
||||
await model_cache.get("ViT-B-32__openai", ModelType.TEXTUAL, ModelTask.SEARCH, ttl=100)
|
||||
await model_cache.get("ViT-B-32__openai", ModelType.VISUAL, ModelTask.SEARCH, ttl=100)
|
||||
await model_cache.get("buffalo_s", ModelType.PIPELINE, ModelTask.FACIAL_RECOGNITION, ttl=100)
|
||||
assert mock_get_model.call_count == 3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@ -572,7 +574,8 @@ class TestLoad:
|
||||
async def test_load_clears_cache_and_retries_if_os_error(self) -> None:
|
||||
mock_model = mock.Mock(spec=InferenceModel)
|
||||
mock_model.model_name = "test_model_name"
|
||||
mock_model.model_type = ModelType.CLIP
|
||||
mock_model.model_type = ModelType.VISUAL
|
||||
mock_model.model_task = ModelTask.SEARCH
|
||||
mock_model.load.side_effect = [OSError, None]
|
||||
mock_model.loaded = False
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user