mirror of
				https://github.com/immich-app/immich.git
				synced 2025-11-04 03:27:09 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			77 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			77 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import annotations
 | 
						|
 | 
						|
from pathlib import Path
 | 
						|
from typing import Any, NamedTuple
 | 
						|
 | 
						|
import numpy as np
 | 
						|
from numpy.typing import NDArray
 | 
						|
 | 
						|
from immich_ml.config import log, settings
 | 
						|
from immich_ml.schemas import SessionNode
 | 
						|
 | 
						|
from .rknnpool import RknnPoolExecutor, is_available, soc_name
 | 
						|
 | 
						|
is_available = is_available and settings.rknn
 | 
						|
model_prefix = Path("rknpu") / soc_name if is_available and soc_name is not None else None
 | 
						|
 | 
						|
 | 
						|
def run_inference(rknn_lite: Any, input: list[NDArray[np.float32]]) -> list[NDArray[np.float32]]:
 | 
						|
    outputs: list[NDArray[np.float32]] = rknn_lite.inference(inputs=input, data_format="nchw")
 | 
						|
    return outputs
 | 
						|
 | 
						|
 | 
						|
input_output_mapping: dict[str, dict[str, Any]] = {
 | 
						|
    "detection": {
 | 
						|
        "input": {"norm_tensor:0": (1, 3, 640, 640)},
 | 
						|
        "output": {
 | 
						|
            "norm_tensor:1": (12800, 1),
 | 
						|
            "norm_tensor:2": (3200, 1),
 | 
						|
            "norm_tensor:3": (800, 1),
 | 
						|
            "norm_tensor:4": (12800, 4),
 | 
						|
            "norm_tensor:5": (3200, 4),
 | 
						|
            "norm_tensor:6": (800, 4),
 | 
						|
            "norm_tensor:7": (12800, 10),
 | 
						|
            "norm_tensor:8": (3200, 10),
 | 
						|
            "norm_tensor:9": (800, 10),
 | 
						|
        },
 | 
						|
    },
 | 
						|
    "recognition": {"input": {"norm_tensor:0": (1, 3, 112, 112)}, "output": {"norm_tensor:1": (1, 512)}},
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
class RknnSession:
 | 
						|
    def __init__(self, model_path: Path) -> None:
 | 
						|
        self.model_type = "detection" if "detection" in model_path.parts else "recognition"
 | 
						|
        self.tpe = settings.rknn_threads
 | 
						|
 | 
						|
        log.info(f"Loading RKNN model from {model_path} with {self.tpe} threads.")
 | 
						|
        self.rknnpool = RknnPoolExecutor(model_path=model_path.as_posix(), tpes=self.tpe, func=run_inference)
 | 
						|
        log.info(f"Loaded RKNN model from {model_path} with {self.tpe} threads.")
 | 
						|
 | 
						|
    def get_inputs(self) -> list[SessionNode]:
 | 
						|
        return [RknnNode(name=k, shape=v) for k, v in input_output_mapping[self.model_type]["input"].items()]
 | 
						|
 | 
						|
    def get_outputs(self) -> list[SessionNode]:
 | 
						|
        return [RknnNode(name=k, shape=v) for k, v in input_output_mapping[self.model_type]["output"].items()]
 | 
						|
 | 
						|
    def run(
 | 
						|
        self,
 | 
						|
        output_names: list[str] | None,
 | 
						|
        input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]],
 | 
						|
        run_options: Any = None,
 | 
						|
    ) -> list[NDArray[np.float32]]:
 | 
						|
        input_data: list[NDArray[np.float32]] = [np.ascontiguousarray(v) for v in input_feed.values()]
 | 
						|
        self.rknnpool.put(input_data)
 | 
						|
        res = self.rknnpool.get()
 | 
						|
        if res is None:
 | 
						|
            raise RuntimeError("RKNN inference failed!")
 | 
						|
        return res
 | 
						|
 | 
						|
 | 
						|
class RknnNode(NamedTuple):
 | 
						|
    name: str | None
 | 
						|
    shape: tuple[int, ...]
 | 
						|
 | 
						|
 | 
						|
__all__ = ["RknnSession", "RknnNode", "is_available", "soc_name", "model_prefix"]
 |