Source code for qualia_core.dataset.EllcieHAR

from __future__ import annotations

import sys
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Final

from qualia_core.datamodel import TimeSample
from qualia_core.datamodel.har import Activities, Activity, HARDataModel, Subject
from qualia_core.datamodel.sensor import Accelerometer, Barometer, Gyroscope
from qualia_core.utils.file import CSVReader, DirectoryReader

from .HARDataset import HARDataset

if TYPE_CHECKING:
    from qualia_core.datamodel.sensor.Sensor import Sensor

if sys.version_info >= (3, 12):
    from typing import override
else:
    from typing_extensions import override

[docs] class EllcieHAR(HARDataset):
[docs] @dataclass class Row: T: str Ax: str Ay: str Az: str Gx: str Gy: str Gz: str P: str CLASS: str
activitylist: Final[dict[str, Activities]] = { 'STANDING': Activities.STANDING, 'STAND_TO_SIT': Activities.STAND_TO_SIT, 'SITTING': Activities.SITTING, 'SIT_TO_STAND': Activities.SIT_TO_STAND, 'WALKING': Activities.WALKING, 'SIT_TO_LIE': Activities.SIT_TO_LIE, 'LYING': Activities.LYING, 'LIE_TO_SIT': Activities.LIE_TO_SIT, 'WALKING_UPSTAIRS': Activities.WALKING_UPSTAIRS, 'WALKING_DOWNSTAIRS': Activities.WALKING_DOWNSTAIRS, 'RUNNING': Activities.RUNNING, 'DRINKING': Activities.DRINKING, 'DRIVING': Activities.DRIVING, } def __init__(self, path: str = '', variant: str = '1', files: list[str] | None = None) -> None: super().__init__() self.__dr = DirectoryReader() self.__cr: CSVReader[EllcieHAR.Row] = CSVReader() self.__path = Path(path) self.__variant = variant self.__files = files if self.__variant not in ['PACK-2', 'UCA-EHAR']: raise ValueError('Unsupported variant ' + variant) self.sets.remove('valid') @override def __call__(self) -> HARDataModel: files = self.__dr.read(self.__path, ext='.csv') filesd = [self.__cr.read(f, delimiter=';', labels=EllcieHAR.Row) for f in files] subjects: dict[str, Subject] = {} for fd in filesd: name = fd.filename if self.__files is not None and not any(s.lower() in name.name.lower() for s in self.__files): continue data = fd.content subject_name = name.stem if self.__variant == 'PACK-2': subject_name = subject_name.split('_')[-2] elif self.__variant == 'UCA-EHAR': subject_name = subject_name.split('_')[1] if subject_name not in subjects: subjects[subject_name] = Subject(subject_name, []) activities = subjects.setdefault(subject_name, Subject(subject_name, [])).activities lastact = data[0].CLASS firstactt = float(data[0].T) samples: list[TimeSample] = [] for d in data: if lastact != d.CLASS: if lastact != '': # Drop unlabelled samples activities.append(Activity(self.activitylist[lastact], samples)) samples = [] lastact = d.CLASS firstactt = float(d.T) absolutet = float(d.T) sensors: list[Sensor] = [] if d.Ax != '': # acc/gyro should be synced sensors.append(Accelerometer(float(d.Ax), float(d.Ay), float(d.Az))) sensors.append(Gyroscope(float(d.Gx), float(d.Gy), float(d.Gz))) if d.P != '': sensors.append(Barometer(float(d.P))) sample = TimeSample(t=absolutet - firstactt, sensors=sensors) samples.append(sample) if d.CLASS != '': # Drop unlabelled samples activities.append(Activity(self.activitylist[d.CLASS], samples)) return HARDataModel(sets=HARDataModel.Sets(train=list(subjects.values())), name=self.name) @property @override def name(self) -> str: return f'{super().name}_{self.__variant}'