mirror of
https://github.com/immich-app/immich.git
synced 2025-06-23 15:30:51 -04:00
typing be happy.
This commit is contained in:
parent
ebdfe1b7b6
commit
2f7e44aa63
@ -13,8 +13,8 @@ from rknn.rknnpool import rknnPoolExecutor, soc_name
|
|||||||
from ..config import log, settings
|
from ..config import log, settings
|
||||||
|
|
||||||
|
|
||||||
def runInfrence(rknn_lite, input):
|
def runInfrence(rknn_lite: Any, input: NDArray[np.float32]) -> list[NDArray[np.float32]]:
|
||||||
outputs = rknn_lite.inference(inputs=[input], data_format="nchw")
|
outputs: list[NDArray[np.float32]] = rknn_lite.inference(inputs=[input], data_format="nchw")
|
||||||
|
|
||||||
return outputs
|
return outputs
|
||||||
|
|
||||||
@ -23,8 +23,6 @@ class RknnSession:
|
|||||||
def __init__(self, model_path: Path | str):
|
def __init__(self, model_path: Path | str):
|
||||||
self.model_path = Path(str(model_path).replace("model", soc_name))
|
self.model_path = Path(str(model_path).replace("model", soc_name))
|
||||||
self.ort_model_path = Path(str(self.model_path).replace(f"{soc_name}.rknn", "model.onnx"))
|
self.ort_model_path = Path(str(self.model_path).replace(f"{soc_name}.rknn", "model.onnx"))
|
||||||
self.inputs = None
|
|
||||||
self.outputs = None
|
|
||||||
|
|
||||||
if "textual" in str(self.model_path):
|
if "textual" in str(self.model_path):
|
||||||
self.tpe = settings.rknn_textual_threads
|
self.tpe = settings.rknn_textual_threads
|
||||||
@ -36,36 +34,38 @@ class RknnSession:
|
|||||||
log.info(f"Loading RKNN model from {self.model_path} with {self.tpe} threads.")
|
log.info(f"Loading RKNN model from {self.model_path} with {self.tpe} threads.")
|
||||||
self.rknnpool = rknnPoolExecutor(rknnModel=self.model_path.as_posix(), TPEs=self.tpe, func=runInfrence)
|
self.rknnpool = rknnPoolExecutor(rknnModel=self.model_path.as_posix(), TPEs=self.tpe, func=runInfrence)
|
||||||
|
|
||||||
def __del__(self):
|
def __del__(self) -> None:
|
||||||
self.rknnpool.release()
|
self.rknnpool.release()
|
||||||
|
|
||||||
|
def _load_ort_session(self) -> None:
|
||||||
|
self.ort_session = ort.InferenceSession(
|
||||||
|
self.ort_model_path.as_posix(),
|
||||||
|
)
|
||||||
|
self.inputs: list[SessionNode] = self.ort_session.get_inputs()
|
||||||
|
self.outputs: list[SessionNode] = self.ort_session.get_outputs()
|
||||||
|
del self.ort_session
|
||||||
|
|
||||||
def get_inputs(self) -> list[SessionNode]:
|
def get_inputs(self) -> list[SessionNode]:
|
||||||
if not self.inputs:
|
try:
|
||||||
self.ort_session = ort.InferenceSession(
|
return self.inputs
|
||||||
self.ort_model_path.as_posix(),
|
except AttributeError:
|
||||||
)
|
self._load_ort_session()
|
||||||
self.inputs = self.ort_session.get_inputs()
|
return self.inputs
|
||||||
self.outputs = self.ort_session.get_outputs()
|
|
||||||
del self.ort_session
|
|
||||||
return self.inputs
|
|
||||||
|
|
||||||
def get_outputs(self) -> list[SessionNode]:
|
def get_outputs(self) -> list[SessionNode]:
|
||||||
if not self.outputs:
|
try:
|
||||||
self.ort_session = ort.InferenceSession(
|
return self.outputs
|
||||||
self.ort_model_path.as_posix(),
|
except AttributeError:
|
||||||
)
|
self._load_ort_session()
|
||||||
self.inputs = self.ort_session.get_inputs()
|
return self.outputs
|
||||||
self.outputs = self.ort_session.get_outputs()
|
|
||||||
del self.ort_session
|
|
||||||
return self.outputs
|
|
||||||
|
|
||||||
def run(
|
def run(
|
||||||
self,
|
self,
|
||||||
output_names: list[str] | None,
|
output_names: list[str] | None,
|
||||||
input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]],
|
input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]],
|
||||||
run_options: Any = None,
|
run_options: Any = None,
|
||||||
):
|
) -> list[NDArray[np.float32]]:
|
||||||
input_data = [np.ascontiguousarray(v) for v in input_feed.values()][0]
|
input_data: NDArray[np.float32] = np.ascontiguousarray(list(input_feed.values())[0], dtype=np.float32)
|
||||||
self.rknnpool.put(input_data)
|
self.rknnpool.put(input_data)
|
||||||
outputs = self.rknnpool.get()[0]
|
outputs: list[NDArray[np.float32]] = self.rknnpool.get()[0]
|
||||||
return outputs
|
return outputs
|
||||||
|
@ -4,6 +4,8 @@
|
|||||||
import os
|
import os
|
||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
import numpy as np
|
||||||
|
from numpy.typing import NDArray
|
||||||
|
|
||||||
supported_socs = ["rk3562", "rk3566", "rk3568", "rk3576", "rk3588"]
|
supported_socs = ["rk3562", "rk3566", "rk3568", "rk3576", "rk3588"]
|
||||||
coremask_supported_socs = ["rk3576","rk3588"]
|
coremask_supported_socs = ["rk3576","rk3588"]
|
||||||
@ -63,7 +65,7 @@ def initRKNNs(rknnModel="./rknnModel/yolov5s.rknn", TPEs=1):
|
|||||||
|
|
||||||
|
|
||||||
class rknnPoolExecutor:
|
class rknnPoolExecutor:
|
||||||
def __init__(self, rknnModel, TPEs, func):
|
def __init__(self, rknnModel: str, TPEs: int, func):
|
||||||
self.TPEs = TPEs
|
self.TPEs = TPEs
|
||||||
self.queue = Queue()
|
self.queue = Queue()
|
||||||
self.rknnPool = initRKNNs(rknnModel, TPEs)
|
self.rknnPool = initRKNNs(rknnModel, TPEs)
|
||||||
@ -71,17 +73,17 @@ class rknnPoolExecutor:
|
|||||||
self.func = func
|
self.func = func
|
||||||
self.num = 0
|
self.num = 0
|
||||||
|
|
||||||
def put(self, frame):
|
def put(self, frame) -> None:
|
||||||
self.queue.put(self.pool.submit(self.func, self.rknnPool[self.num % self.TPEs], frame))
|
self.queue.put(self.pool.submit(self.func, self.rknnPool[self.num % self.TPEs], frame))
|
||||||
self.num += 1
|
self.num += 1
|
||||||
|
|
||||||
def get(self):
|
def get(self) -> list[list[NDArray[np.float32]], bool]:
|
||||||
if self.queue.empty():
|
if self.queue.empty():
|
||||||
return None, False
|
return None, False
|
||||||
fut = self.queue.get()
|
fut = self.queue.get()
|
||||||
return fut.result(), True
|
return fut.result(), True
|
||||||
|
|
||||||
def release(self):
|
def release(self) -> None:
|
||||||
self.pool.shutdown()
|
self.pool.shutdown()
|
||||||
for rknn_lite in self.rknnPool:
|
for rknn_lite in self.rknnPool:
|
||||||
rknn_lite.release()
|
rknn_lite.release()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user