Source code for qualia_core.evaluation.STM32CubeAI

from __future__ import annotations

import logging
import re
import sys
from pathlib import Path
from typing import Any

import numpy as np

from qualia_core.evaluation.Stats import Stats
from qualia_core.typing import TYPE_CHECKING
from qualia_core.utils.process import subprocesstee

from .Evaluator import Evaluator

if TYPE_CHECKING:
    from qualia_core.dataaugmentation.DataAugmentation import DataAugmentation
    from qualia_core.datamodel.RawDataModel import RawDataModel
    from qualia_core.learningframework.LearningFramework import LearningFramework

if sys.version_info >= (3, 12):
    from typing import override
else:
    from typing_extensions import override

logger = logging.getLogger(__name__)


[docs] class STM32CubeAI(Evaluator): def __init__(self, stm32cubeai_args: tuple[str, ...] | None = None, mode: str = '') -> None: super().__init__() self.__mode = mode self.__stm32cubeai_args = stm32cubeai_args if stm32cubeai_args is not None else () # Built-in project for 8.1.0 self.__stm32cubeai_bin = (Path.home() / 'STM32Cube' / 'Repository' / 'Packs' / 'STMicroelectronics' / 'X-CUBE-AI' / '8.1.0' / 'Utilities' / 'linux' / 'stm32ai') @staticmethod def __dataset_to_csv(dataset: RawDataModel, outdir: Path, limit: int | None) -> None: test_x = dataset.sets.test.x test_y = dataset.sets.test.y if limit: test_x = test_x[:limit] test_y = test_y[:limit] test_x = test_x.reshape((test_x.shape[0], -1)) test_y = test_y.reshape((test_y.shape[0], -1)) np.savetxt(outdir / 'testX.csv', test_x, delimiter=',', fmt='%f') np.savetxt(outdir / 'testY.csv', test_y, delimiter=',', fmt='%f') def __validate(self, # noqa: PLR0913 test_x: Path, test_y: Path, modelpath: Path, compression: int | None, outdir: Path, logdir: Path, tag: str) -> tuple[int, dict[int, bytearray]]: cmd = str(self.__stm32cubeai_bin) args = ( 'validate', '--name', 'network', '--model', str(modelpath), '--mode', self.__mode, '--compression', str(compression) if compression else 'none', '--valinput', str(test_x), '--valoutput', str(test_y), '--verbosity', '3', '--workspace', str(outdir / 'workspace'), '--output', str(outdir / tag), '--classifier', '--allocate-inputs', '--allocate-outputs', *self.__stm32cubeai_args, ) logger.info('Running: %s %s', cmd, ' '.join(args)) with (logdir / f'{tag}.txt').open('wb') as logfile: _ = logfile.write(' '.join([str(cmd), *args, '\n']).encode('utf-8')) returncode, outputs = subprocesstee.run(cmd, *args, files={sys.stdout: logfile, sys.stderr: logfile}) return returncode, outputs def __parse_validate_stdout(self, s: str) -> tuple[float | None, float | None]: duration = re.search(r'[\r,\n]\s*duration\s*:\s*([.\d]+)\s*ms.*$', s, re.MULTILINE) if duration is not None: duration = float(duration.group(1)) / 1000 accuracy = re.search(rf'[\r,\n]\s*{self.__mode}\ c-model\ #1\s+([.\d]+)%.*$', s, re.MULTILINE) if accuracy is not None: accuracy = float(accuracy.group(1)) / 100 return duration, accuracy
[docs] @override def evaluate(self, framework: LearningFramework[Any], model_kind: str, dataset: RawDataModel, target: str, tag: str, limit: int | None = None, dataaugmentations: list[DataAugmentation] | None = None) -> Stats | None: if dataaugmentations: logger.error('dataaugmentations not supported for %s', self.__class__.__name__) raise ValueError outdir = Path('out') / 'deploy' / target logdir = Path('out') / 'evaluate' / target logdir.mkdir(parents=True, exist_ok=True) self.__dataset_to_csv(dataset, logdir, limit) return_code, outputs = self.__validate(test_x=logdir / 'testX.csv', test_y=logdir / 'testY.csv', modelpath=Path('out') / 'deploy' / 'stm32cubeai' / f'{tag}.tflite', compression=None, outdir=outdir, logdir=logdir, tag=tag) if return_code != 0: return None duration, accuracy = self.__parse_validate_stdout(outputs[1].decode()) return Stats(avg_time=duration, accuracy=accuracy)