mirror of
				https://github.com/immich-app/immich.git
				synced 2025-11-03 19:29:32 -05:00 
			
		
		
		
	* remove image tagging * updated lock * fixed tests, improved logging * be nice * fixed tests
		
			
				
	
	
		
			152 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			152 lines
		
	
	
		
			5.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import annotations
 | 
						|
 | 
						|
import pickle
 | 
						|
from abc import ABC, abstractmethod
 | 
						|
from pathlib import Path
 | 
						|
from shutil import rmtree
 | 
						|
from typing import Any
 | 
						|
 | 
						|
import onnxruntime as ort
 | 
						|
from huggingface_hub import snapshot_download
 | 
						|
from typing_extensions import Buffer
 | 
						|
 | 
						|
from ..config import get_cache_dir, get_hf_model_name, log, settings
 | 
						|
from ..schemas import ModelType
 | 
						|
 | 
						|
 | 
						|
class InferenceModel(ABC):
 | 
						|
    _model_type: ModelType
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        model_name: str,
 | 
						|
        cache_dir: Path | str | None = None,
 | 
						|
        inter_op_num_threads: int = settings.model_inter_op_threads,
 | 
						|
        intra_op_num_threads: int = settings.model_intra_op_threads,
 | 
						|
        **model_kwargs: Any,
 | 
						|
    ) -> None:
 | 
						|
        self.model_name = model_name
 | 
						|
        self.loaded = False
 | 
						|
        self._cache_dir = Path(cache_dir) if cache_dir is not None else None
 | 
						|
        self.providers = model_kwargs.pop("providers", ["CPUExecutionProvider"])
 | 
						|
        #  don't pre-allocate more memory than needed
 | 
						|
        self.provider_options = model_kwargs.pop(
 | 
						|
            "provider_options", [{"arena_extend_strategy": "kSameAsRequested"}] * len(self.providers)
 | 
						|
        )
 | 
						|
        log.debug(
 | 
						|
            (
 | 
						|
                f"Setting '{self.model_name}' execution providers to {self.providers} "
 | 
						|
                "in descending order of preference"
 | 
						|
            ),
 | 
						|
        )
 | 
						|
        log.debug(f"Setting execution provider options to {self.provider_options}")
 | 
						|
        self.sess_options = PicklableSessionOptions()
 | 
						|
        # avoid thread contention between models
 | 
						|
        if inter_op_num_threads > 1:
 | 
						|
            self.sess_options.execution_mode = ort.ExecutionMode.ORT_PARALLEL
 | 
						|
 | 
						|
        log.debug(f"Setting execution_mode to {self.sess_options.execution_mode.name}")
 | 
						|
        log.debug(f"Setting inter_op_num_threads to {inter_op_num_threads}")
 | 
						|
        log.debug(f"Setting intra_op_num_threads to {intra_op_num_threads}")
 | 
						|
        self.sess_options.inter_op_num_threads = inter_op_num_threads
 | 
						|
        self.sess_options.intra_op_num_threads = intra_op_num_threads
 | 
						|
        self.sess_options.enable_cpu_mem_arena = False
 | 
						|
 | 
						|
    def download(self) -> None:
 | 
						|
        if not self.cached:
 | 
						|
            log.info(
 | 
						|
                f"Downloading {self.model_type.replace('-', ' ')} model '{self.model_name}'. This may take a while."
 | 
						|
            )
 | 
						|
            self._download()
 | 
						|
 | 
						|
    def load(self) -> None:
 | 
						|
        if self.loaded:
 | 
						|
            return
 | 
						|
        self.download()
 | 
						|
        log.info(f"Loading {self.model_type.replace('-', ' ')} model '{self.model_name}' to memory")
 | 
						|
        self._load()
 | 
						|
        self.loaded = True
 | 
						|
 | 
						|
    def predict(self, inputs: Any, **model_kwargs: Any) -> Any:
 | 
						|
        self.load()
 | 
						|
        if model_kwargs:
 | 
						|
            self.configure(**model_kwargs)
 | 
						|
        return self._predict(inputs)
 | 
						|
 | 
						|
    @abstractmethod
 | 
						|
    def _predict(self, inputs: Any) -> Any:
 | 
						|
        ...
 | 
						|
 | 
						|
    def configure(self, **model_kwargs: Any) -> None:
 | 
						|
        pass
 | 
						|
 | 
						|
    def _download(self) -> None:
 | 
						|
        snapshot_download(
 | 
						|
            get_hf_model_name(self.model_name),
 | 
						|
            cache_dir=self.cache_dir,
 | 
						|
            local_dir=self.cache_dir,
 | 
						|
            local_dir_use_symlinks=False,
 | 
						|
        )
 | 
						|
 | 
						|
    @abstractmethod
 | 
						|
    def _load(self) -> None:
 | 
						|
        ...
 | 
						|
 | 
						|
    @property
 | 
						|
    def model_type(self) -> ModelType:
 | 
						|
        return self._model_type
 | 
						|
 | 
						|
    @property
 | 
						|
    def cache_dir(self) -> Path:
 | 
						|
        return self._cache_dir if self._cache_dir is not None else get_cache_dir(self.model_name, self.model_type)
 | 
						|
 | 
						|
    @cache_dir.setter
 | 
						|
    def cache_dir(self, cache_dir: Path) -> None:
 | 
						|
        self._cache_dir = cache_dir
 | 
						|
 | 
						|
    @property
 | 
						|
    def cached(self) -> bool:
 | 
						|
        return self.cache_dir.exists() and any(self.cache_dir.iterdir())
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def from_model_type(cls, model_type: ModelType, model_name: str, **model_kwargs: Any) -> InferenceModel:
 | 
						|
        subclasses = {subclass._model_type: subclass for subclass in cls.__subclasses__()}
 | 
						|
        if model_type not in subclasses:
 | 
						|
            raise ValueError(f"Unsupported model type: {model_type}")
 | 
						|
 | 
						|
        return subclasses[model_type](model_name, **model_kwargs)
 | 
						|
 | 
						|
    def clear_cache(self) -> None:
 | 
						|
        if not self.cache_dir.exists():
 | 
						|
            log.warn(
 | 
						|
                f"Attempted to clear cache for model '{self.model_name}', but cache directory does not exist",
 | 
						|
            )
 | 
						|
            return
 | 
						|
        if not rmtree.avoids_symlink_attacks:
 | 
						|
            raise RuntimeError("Attempted to clear cache, but rmtree is not safe on this platform")
 | 
						|
 | 
						|
        if self.cache_dir.is_dir():
 | 
						|
            log.info(f"Cleared cache directory for model '{self.model_name}'.")
 | 
						|
            rmtree(self.cache_dir)
 | 
						|
        else:
 | 
						|
            log.warn(
 | 
						|
                (
 | 
						|
                    f"Encountered file instead of directory at cache path "
 | 
						|
                    f"for '{self.model_name}'. Removing file and replacing with a directory."
 | 
						|
                ),
 | 
						|
            )
 | 
						|
            self.cache_dir.unlink()
 | 
						|
        self.cache_dir.mkdir(parents=True, exist_ok=True)
 | 
						|
 | 
						|
 | 
						|
# HF deep copies configs, so we need to make session options picklable
 | 
						|
class PicklableSessionOptions(ort.SessionOptions):  # type: ignore[misc]
 | 
						|
    def __getstate__(self) -> bytes:
 | 
						|
        return pickle.dumps([(attr, getattr(self, attr)) for attr in dir(self) if not callable(getattr(self, attr))])
 | 
						|
 | 
						|
    def __setstate__(self, state: Buffer) -> None:
 | 
						|
        self.__init__()  # type: ignore[misc]
 | 
						|
        attrs: list[tuple[str, Any]] = pickle.loads(state)
 | 
						|
        for attr, val in attrs:
 | 
						|
            setattr(self, attr, val)
 |