From 2f7e44aa6355b22f9b67c6e064db641183e4142a Mon Sep 17 00:00:00 2001 From: yoni13 Date: Mon, 13 Jan 2025 18:24:12 +0800 Subject: [PATCH] typing be happy. --- machine-learning/app/sessions/rknn.py | 48 +++++++++++++-------------- machine-learning/rknn/rknnpool.py | 10 +++--- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/machine-learning/app/sessions/rknn.py b/machine-learning/app/sessions/rknn.py index 421a6971f9..863a00f1a2 100644 --- a/machine-learning/app/sessions/rknn.py +++ b/machine-learning/app/sessions/rknn.py @@ -13,8 +13,8 @@ from rknn.rknnpool import rknnPoolExecutor, soc_name from ..config import log, settings -def runInfrence(rknn_lite, input): - outputs = rknn_lite.inference(inputs=[input], data_format="nchw") +def runInfrence(rknn_lite: Any, input: NDArray[np.float32]) -> list[NDArray[np.float32]]: + outputs: list[NDArray[np.float32]] = rknn_lite.inference(inputs=[input], data_format="nchw") return outputs @@ -23,8 +23,6 @@ class RknnSession: def __init__(self, model_path: Path | str): self.model_path = Path(str(model_path).replace("model", soc_name)) self.ort_model_path = Path(str(self.model_path).replace(f"{soc_name}.rknn", "model.onnx")) - self.inputs = None - self.outputs = None if "textual" in str(self.model_path): self.tpe = settings.rknn_textual_threads @@ -36,36 +34,38 @@ class RknnSession: log.info(f"Loading RKNN model from {self.model_path} with {self.tpe} threads.") self.rknnpool = rknnPoolExecutor(rknnModel=self.model_path.as_posix(), TPEs=self.tpe, func=runInfrence) - def __del__(self): + def __del__(self) -> None: self.rknnpool.release() + def _load_ort_session(self) -> None: + self.ort_session = ort.InferenceSession( + self.ort_model_path.as_posix(), + ) + self.inputs: list[SessionNode] = self.ort_session.get_inputs() + self.outputs: list[SessionNode] = self.ort_session.get_outputs() + del self.ort_session + def get_inputs(self) -> list[SessionNode]: - if not self.inputs: - self.ort_session = ort.InferenceSession( - self.ort_model_path.as_posix(), - ) - self.inputs = self.ort_session.get_inputs() - self.outputs = self.ort_session.get_outputs() - del self.ort_session - return self.inputs + try: + return self.inputs + except AttributeError: + self._load_ort_session() + return self.inputs def get_outputs(self) -> list[SessionNode]: - if not self.outputs: - self.ort_session = ort.InferenceSession( - self.ort_model_path.as_posix(), - ) - self.inputs = self.ort_session.get_inputs() - self.outputs = self.ort_session.get_outputs() - del self.ort_session - return self.outputs + try: + return self.outputs + except AttributeError: + self._load_ort_session() + return self.outputs def run( self, output_names: list[str] | None, input_feed: dict[str, NDArray[np.float32]] | dict[str, NDArray[np.int32]], run_options: Any = None, - ): - input_data = [np.ascontiguousarray(v) for v in input_feed.values()][0] + ) -> list[NDArray[np.float32]]: + input_data: NDArray[np.float32] = np.ascontiguousarray(list(input_feed.values())[0], dtype=np.float32) self.rknnpool.put(input_data) - outputs = self.rknnpool.get()[0] + outputs: list[NDArray[np.float32]] = self.rknnpool.get()[0] return outputs diff --git a/machine-learning/rknn/rknnpool.py b/machine-learning/rknn/rknnpool.py index 64e4af9f7c..293ef9bd8e 100644 --- a/machine-learning/rknn/rknnpool.py +++ b/machine-learning/rknn/rknnpool.py @@ -4,6 +4,8 @@ import os from concurrent.futures import ThreadPoolExecutor from queue import Queue +import numpy as np +from numpy.typing import NDArray supported_socs = ["rk3562", "rk3566", "rk3568", "rk3576", "rk3588"] coremask_supported_socs = ["rk3576","rk3588"] @@ -63,7 +65,7 @@ def initRKNNs(rknnModel="./rknnModel/yolov5s.rknn", TPEs=1): class rknnPoolExecutor: - def __init__(self, rknnModel, TPEs, func): + def __init__(self, rknnModel: str, TPEs: int, func): self.TPEs = TPEs self.queue = Queue() self.rknnPool = initRKNNs(rknnModel, TPEs) @@ -71,17 +73,17 @@ class rknnPoolExecutor: self.func = func self.num = 0 - def put(self, frame): + def put(self, frame) -> None: self.queue.put(self.pool.submit(self.func, self.rknnPool[self.num % self.TPEs], frame)) self.num += 1 - def get(self): + def get(self) -> list[list[NDArray[np.float32]], bool]: if self.queue.empty(): return None, False fut = self.queue.get() return fut.result(), True - def release(self): + def release(self) -> None: self.pool.shutdown() for rknn_lite in self.rknnPool: rknn_lite.release()