forked from Cutlery/immich
		
	
		
			
				
	
	
		
			91 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			91 lines
		
	
	
		
			2.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from pathlib import Path
 | 
						|
from typing import Any
 | 
						|
 | 
						|
import cv2
 | 
						|
import numpy as np
 | 
						|
from insightface.model_zoo import ArcFaceONNX, RetinaFace
 | 
						|
from insightface.utils.face_align import norm_crop
 | 
						|
from numpy.typing import NDArray
 | 
						|
 | 
						|
from app.config import clean_name
 | 
						|
from app.schemas import Face, ModelType, is_ndarray
 | 
						|
 | 
						|
from .base import InferenceModel
 | 
						|
 | 
						|
 | 
						|
class FaceRecognizer(InferenceModel):
 | 
						|
    _model_type = ModelType.FACIAL_RECOGNITION
 | 
						|
 | 
						|
    def __init__(
 | 
						|
        self,
 | 
						|
        model_name: str,
 | 
						|
        min_score: float = 0.7,
 | 
						|
        cache_dir: Path | str | None = None,
 | 
						|
        **model_kwargs: Any,
 | 
						|
    ) -> None:
 | 
						|
        self.min_score = model_kwargs.pop("minScore", min_score)
 | 
						|
        super().__init__(clean_name(model_name), cache_dir, **model_kwargs)
 | 
						|
 | 
						|
    def _load(self) -> None:
 | 
						|
        self.det_model = RetinaFace(session=self._make_session(self.det_file))
 | 
						|
        self.rec_model = ArcFaceONNX(
 | 
						|
            self.rec_file.with_suffix(".onnx").as_posix(),
 | 
						|
            session=self._make_session(self.rec_file),
 | 
						|
        )
 | 
						|
 | 
						|
        self.det_model.prepare(
 | 
						|
            ctx_id=0,
 | 
						|
            det_thresh=self.min_score,
 | 
						|
            input_size=(640, 640),
 | 
						|
        )
 | 
						|
        self.rec_model.prepare(ctx_id=0)
 | 
						|
 | 
						|
    def _predict(self, image: NDArray[np.uint8] | bytes) -> list[Face]:
 | 
						|
        if isinstance(image, bytes):
 | 
						|
            decoded_image = cv2.imdecode(np.frombuffer(image, np.uint8), cv2.IMREAD_COLOR)
 | 
						|
        else:
 | 
						|
            decoded_image = image
 | 
						|
        assert is_ndarray(decoded_image, np.uint8)
 | 
						|
        bboxes, kpss = self.det_model.detect(decoded_image)
 | 
						|
        if bboxes.size == 0:
 | 
						|
            return []
 | 
						|
        assert is_ndarray(kpss, np.float32)
 | 
						|
 | 
						|
        scores = bboxes[:, 4].tolist()
 | 
						|
        bboxes = bboxes[:, :4].round().tolist()
 | 
						|
 | 
						|
        results = []
 | 
						|
        height, width, _ = decoded_image.shape
 | 
						|
        for (x1, y1, x2, y2), score, kps in zip(bboxes, scores, kpss):
 | 
						|
            cropped_img = norm_crop(decoded_image, kps)
 | 
						|
            embedding: NDArray[np.float32] = self.rec_model.get_feat(cropped_img)[0]
 | 
						|
            face: Face = {
 | 
						|
                "imageWidth": width,
 | 
						|
                "imageHeight": height,
 | 
						|
                "boundingBox": {
 | 
						|
                    "x1": x1,
 | 
						|
                    "y1": y1,
 | 
						|
                    "x2": x2,
 | 
						|
                    "y2": y2,
 | 
						|
                },
 | 
						|
                "score": score,
 | 
						|
                "embedding": embedding,
 | 
						|
            }
 | 
						|
            results.append(face)
 | 
						|
        return results
 | 
						|
 | 
						|
    @property
 | 
						|
    def cached(self) -> bool:
 | 
						|
        return self.det_file.is_file() and self.rec_file.is_file()
 | 
						|
 | 
						|
    @property
 | 
						|
    def det_file(self) -> Path:
 | 
						|
        return self.cache_dir / "detection" / f"model.{self.preferred_runtime}"
 | 
						|
 | 
						|
    @property
 | 
						|
    def rec_file(self) -> Path:
 | 
						|
        return self.cache_dir / "recognition" / f"model.{self.preferred_runtime}"
 | 
						|
 | 
						|
    def configure(self, **model_kwargs: Any) -> None:
 | 
						|
        self.det_model.det_thresh = model_kwargs.pop("minScore", self.det_model.det_thresh)
 |