diff --git a/machine-learning/app/sessions/rknn.py b/machine-learning/app/sessions/rknn.py index 2f0e7cc0fd..f7fe9db8e6 100644 --- a/machine-learning/app/sessions/rknn.py +++ b/machine-learning/app/sessions/rknn.py @@ -6,7 +6,7 @@ from typing import Any, List import numpy as np import onnxruntime as ort from numpy.typing import NDArray -from rknnlite.api import RKNNLite +from rknn.rknnpool import rknnPoolExecutor from app.models.constants import SUPPORTED_PROVIDERS @@ -14,38 +14,40 @@ from app.schemas import SessionNode from ..config import log, settings +def runInfrence(rknn_lite, input): + outputs = rknn_lite.inference(inputs=[input], data_format='nchw') + + return outputs class RknnSession: def __init__(self, model_path: Path | str): + self.model_path = Path(model_path) - self.ort_model_path = str(self.model_path).replace(".rknn", ".onnx") + log.info(f"Loading RKNN model from {self.model_path} with {1 if 'textual' in str(self.model_path) else 2} threads.") + self.rknnpool = rknnPoolExecutor( + rknnModel=self.model_path.as_posix(), + TPEs= 1 if 'textual' in str(self.model_path) else 2, + func=runInfrence) - self.rknn = RKNNLite() - - log.info(f"Loading RKNN model from {self.model_path}") self.ort_session = ort.InferenceSession( self.ort_model_path, ) + self.inputs = self.ort_session.get_inputs() + self.outputs = self.ort_session.get_outputs() - # ret = self.rknn.load_onnx(self.model_path) - print('--> Load RKNN model') - ret = self.rknn.load_rknn(self.model_path.as_posix()) - if ret != 0: - raise RuntimeError("Failed to load RKNN model") + del self.ort_session - ret = self.rknn.init_runtime() - if ret != 0: - raise RuntimeError("Failed to initialize RKNN runtime") + + def __del__(self): + self.rknnpool.release() def get_inputs(self) -> list[SessionNode]: - inputs: list[SessionNode] = self.ort_session.get_inputs() - return inputs + return self.inputs def get_outputs(self) -> list[SessionNode]: - outputs: list[SessionNode] = self.ort_session.get_outputs() - return outputs + return self.outputs def run( self, @@ -55,15 +57,7 @@ class RknnSession: ): input_data = [np.ascontiguousarray(v) for v in input_feed.values()][0] - - - # log.info(f"Running inference on RKNN model") - outputs = self.rknn.inference(inputs=[input_data], data_format='nchw') - - # log.info("inputs:") - # log.info(input_data) - # log.info("outputs:") - # log.info(outputs) - # log.info("RKNN END") + self.rknnpool.put(input_data) + outputs = self.rknnpool.get()[0] return outputs diff --git a/machine-learning/rknn/rknnpool.py b/machine-learning/rknn/rknnpool.py new file mode 100644 index 0000000000..787cbb22cb --- /dev/null +++ b/machine-learning/rknn/rknnpool.py @@ -0,0 +1,53 @@ +# This code is from leafqycc/rknn-multi-threaded +# Following Apache License 2.0 + +from queue import Queue +from rknnlite.api import RKNNLite +from concurrent.futures import ThreadPoolExecutor, as_completed + + +def initRKNN(rknnModel="./rknnModel/yolov5s.rknn", id=0): + rknn_lite = RKNNLite() + ret = rknn_lite.load_rknn(rknnModel) + if ret != 0: + print("Load RKNN rknnModel failed") + exit(ret) + ret = rknn_lite.init_runtime() + if ret != 0: + print("Init runtime environment failed") + exit(ret) + print(rknnModel, "\t\tdone") + return rknn_lite + + +def initRKNNs(rknnModel="./rknnModel/yolov5s.rknn", TPEs=1): + rknn_list = [] + for i in range(TPEs): + rknn_list.append(initRKNN(rknnModel, i % 3)) + return rknn_list + + +class rknnPoolExecutor(): + def __init__(self, rknnModel, TPEs, func): + self.TPEs = TPEs + self.queue = Queue() + self.rknnPool = initRKNNs(rknnModel, TPEs) + self.pool = ThreadPoolExecutor(max_workers=TPEs) + self.func = func + self.num = 0 + + def put(self, frame): + self.queue.put(self.pool.submit( + self.func, self.rknnPool[self.num % self.TPEs], frame)) + self.num += 1 + + def get(self): + if self.queue.empty(): + return None, False + fut = self.queue.get() + return fut.result(), True + + def release(self): + self.pool.shutdown() + for rknn_lite in self.rknnPool: + rknn_lite.release() \ No newline at end of file