"""Provide the EnergyEstimationMetric postprocessing module based on Lemaire et al., 2022."""
from __future__ import annotations
import dataclasses
import functools
import logging
import math
import sys
from dataclasses import dataclass
from typing import Any, Callable, Final, Literal, NamedTuple, cast
import torch
from qualia_core.learningmodel.pytorch.layers import Add
from qualia_core.learningmodel.pytorch.layers.GlobalSumPool1d import GlobalSumPool1d
from qualia_core.learningmodel.pytorch.layers.GlobalSumPool2d import GlobalSumPool2d
from qualia_core.learningmodel.pytorch.layers.quantized_layers import QuantizedIdentity, QuantizedLinear
from qualia_core.learningmodel.pytorch.layers.quantized_layers1d import QuantizedBatchNorm1d, QuantizedConv1d
from qualia_core.learningmodel.pytorch.layers.quantized_layers2d import QuantizedBatchNorm2d, QuantizedConv2d
from qualia_core.learningmodel.pytorch.layers.QuantizedAdd import QuantizedAdd
from qualia_core.learningmodel.pytorch.layers.QuantizedGlobalSumPool1d import QuantizedGlobalSumPool1d
from qualia_core.learningmodel.pytorch.layers.QuantizedGlobalSumPool2d import QuantizedGlobalSumPool2d
from qualia_core.learningmodel.pytorch.layers.Quantizer import Quantizer
from qualia_core.postprocessing.PostProcessing import PostProcessing
from qualia_core.typing import TYPE_CHECKING, ModelConfigDict
from qualia_core.utils.logger import Logger
from qualia_core.utils.logger.CSVFormatter import CSVFormatter
from torch import nn
from qualia_plugin_snn.learningframework.SpikingJelly import SpikingJelly
from qualia_plugin_snn.learningmodel.pytorch.layers.spikingjelly.Add import Add as SNNAdd
from qualia_plugin_snn.learningmodel.pytorch.layers.spikingjelly.GlobalSumPool1d import GlobalSumPool1d as SNNGlobalSumPool1d
from qualia_plugin_snn.learningmodel.pytorch.layers.spikingjelly.GlobalSumPool2d import GlobalSumPool2d as SNNGlobalSumPool2d
from qualia_plugin_snn.learningmodel.pytorch.layers.spikingjelly.QuantizedAdd import QuantizedAdd as SNNQuantizedAdd
from qualia_plugin_snn.learningmodel.pytorch.layers.spikingjelly.QuantizedGlobalSumPool1d import (
QuantizedGlobalSumPool1d as SNNQuantizedGlobalSumPool1d,
)
from qualia_plugin_snn.learningmodel.pytorch.layers.spikingjelly.QuantizedGlobalSumPool2d import (
QuantizedGlobalSumPool2d as SNNQuantizedGlobalSumPool2d,
)
from qualia_plugin_snn.learningmodel.pytorch.SNN import SNN
# We are inside a TYPE_CHECKING block but our custom TYPE_CHECKING constant triggers TCH001-TCH003 so ignore them
if TYPE_CHECKING:
from collections.abc import Generator
from qualia_codegen_core.graph import ModelGraph
from qualia_codegen_core.graph.layers import TAddLayer, TBaseLayer, TConvLayer, TDenseLayer
from qualia_core.datamodel.RawDataModel import RawData
from qualia_core.qualia import TrainResult
from torch.types import Number
if sys.version_info >= (3, 12):
from typing import override
else:
from typing_extensions import override
logger = logging.getLogger(__name__)
[docs]
@dataclass
class SpikeCounter:
"""Holds the statistics and properties of an input or output tensor of a layer after inference.
:param spike_count: Number of spikes recorded
:param tensor_sum: Sum of elements of the tensor
:param size: Number of elements in the tensor
:param binary: True if all elements are binary (0 or 1), False otherwise
:param sample_count: Number of times the tensor has been updated
"""
spike_count: Number
tensor_sum: Number
size: int
binary: bool
sample_count: int
[docs]
class EnergyEstimationMetricLoggerFields(NamedTuple):
"""Interface object for CSV logging.
Should contain the same fields as :class:`EnergyMetrics` and returned by :meth:`EnergyMetrics.asnamedtuple`.
:param name: Layer name
:param mem_pot: Energy for potential memory read/write
:param mem_weights: Energy for weights memory read
:param mem_bias: Energy for bias memory read
:param mem_io: Energy for input/output memory read/write
:param ops: Energy for synaptic operations
:param addr: Energy for event address computation
:param input_spikerate: Average input spike rate per timestep
:param output_spikerate: Average output spike rate per timestep
:param input_is_binary: If input tensor only contains binary values, i.e., spikes
:param output_is_binary: If output tensor only contains binary values, i.e., spikes
:param is_sj: If the layer is a SpikingJelly layer and has been processed as part of a Spiking Neural Network
"""
#: Layer name
name: str
#: Energy for potential memory read/write
mem_pot: float
#: Energy for weights memory read
mem_weights: float
#: Energy for bias memory read
mem_bias: float
#: Energy for input/output memory read/write
mem_io: float
#: Energy for potential, weights, bias and input/output read/write, i.e., Erdram + Ewrram
mem_total: float
#: Energy for synaptic operations
ops: float
#: Energy for event address computation
addr: float
#: Energy for synaptic operations and event address computation, i.e., Eops+addr
opsaddr: float
#: Total energy, i.e., energy for memory read/write, synaptic operations and event adress computation
total: float
#: Average input spike rate per timestep
input_spikerate: float
#: Average output spike rate per timestep
output_spikerate: float
#: Input count per timestep
input_count: Number
#: Output count per timestep
output_count: Number
#: If input tensor only contains binary values, i.e., spikes
input_is_binary: bool
#: If output tensor only contains binary values, i.e., spikes
output_is_binary: bool
#: If the layer is a SpikingJelly layer and has been processed as part of a Spiking Neural Network
is_sj: bool | Literal['Hybrid']
[docs]
@dataclass
class EnergyMetrics:
"""Holds the computed average energy per inference for each layer.
:param name: Layer name
:param mem_pot: Energy for potential memory read/write
:param mem_weights: Energy for weights memory read
:param mem_bias: Energy for bias memory read
:param mem_io: Energy for input/output memory read/write
:param ops: Energy for synaptic operations
:param addr: Energy for event address computation
:param input_spikerate: Average input spike rate per timestep
:param output_spikerate: Average output spike rate per timestep
:param input_count: Input count per timestep
:param output_count: Output count per timestep
:param input_is_binary: If input tensor only contains binary values, i.e., spikes
:param output_is_binary: If output tensor only contains binary values, i.e., spikes
:param is_sj: If the layer is a SpikingJelly layer and has been processed as part of a Spiking Neural Network
"""
#: Layer name
name: str
#: Energy for potential memory read/write
mem_pot: float
#: Energy for weights memory read
mem_weights: float
#: Energy for bias memory read
mem_bias: float
#: Energy for input/output memory read/write
mem_io: float
#: Energy for synaptic operations
ops: float
#: Energy for event address computation
addr: float
#: Average input spike rate per timestep
input_spikerate: float | None
#: Average output spike rate per timestep
output_spikerate: float | None
#: Input count per timestep
input_count: Number | None
#: Output count per timestep
output_count: Number | None
#: If input tensor only contains binary values, i.e., spikes
input_is_binary: bool
#: If output tensor only contains binary values, i.e., spikes
output_is_binary: bool
#: If the layer is a SpikingJelly layer and has been processed as part of a Spiking Neural Network
is_sj: bool | Literal['Hybrid']
@property
def mem_total(self) -> float:
"""Energy for potential, weights, bias and input/output read/write. Erdram + Ewrram."""
return self.mem_pot + self.mem_weights + self.mem_bias + self.mem_io
@property
def opsaddr(self) -> float:
"""Energy for synaptic operations and event address computation. Eops+addr."""
return self.ops + self.addr
@property
def total(self) -> float:
"""Total energy. Energy for memory read/write, synaptic operations and event adress computation."""
return self.mem_total + self.opsaddr
[docs]
def asnamedtuple(self) -> EnergyEstimationMetricLoggerFields:
"""Return the data from this class as a NamedTuple for use with the CSV logger.
Instanciate a :class:`EnergyEstimationMetricLoggerFields` object with
all of this class fields and properties and return it.
:return: the :class:`EnergyEstimationMetricLoggerFields` with all data from this object copied into it
"""
return EnergyEstimationMetricLoggerFields(**dataclasses.asdict(self),
mem_total=self.mem_total,
opsaddr=self.opsaddr,
total=self.total)
[docs]
class EnergyEstimationMetric(PostProcessing[nn.Module]):
r"""Analytical energy estimation metric.
From `An Analytical Estimation of Spiking Neural Networks Energy Efficiency <https://arxiv.org/abs/2210.13107>`_,
Lemaire et al. ICONIP2022.
.. code-block:: bibtex
@inproceedings{EnergyEstimationMetricICONIP2022,
title = {An Analytical Estimation of Spiking Neural Networks Energy Efficiency},
author = {Lemaire, Edgar and Cordone, Loïc and Castagnetti, Andrea
and Novac, Pierre-Emmanuel and Courtois, Jonathan and Miramond, Benoît},
booktitle = {Proceedings of the 29th International Conference on Neural Information Processing},
pages = {574--587},
year = {2023},
doi = {10.1007/978-3-031-30105-6_48},
series = {ICONIP},
}
Supports sequential (non-residual) formal and spiking convolutional neural networks with the following layers:
* :class:`torch.nn.Conv1d`
* :class:`torch.nn.Conv2d`
* :class:`torch.nn.Linear`
* :class:`torch.nn.ReLU` for formal neural networks
* :class:`spikingjelly.activation_based.neuron.IFNode` for spiking neural networks
* :class:`spikingjelly.activation_based.neuron.LIFNode` for spiking neural networks
"""
energy_values: Final[dict[int, dict[str, float]]] = {
8: {'add': 0.03, 'mul': 0.2}, # values for 8-bit
32: {'add': 0.1, 'mul': 3.1}, # values for 32-bit
}
"""
Energy values for different bit widths and operations.
Default from `Computing’s Energy Problem (and what we can do about it) <https://gwern.net/doc/cs/hardware/2014-horowitz-2.pdf>`_,
Mark Horowitz, ISSCC 2014.
:meta public:
""" # noqa: RUF001
[docs]
def __init__(self,
mem_width: int,
fifo_size: int = 0,
total_spikerate_exclude_nonbinary: bool = True, # noqa: FBT001, FBT002
op_estimation_type: dict[str, str] | None = None,
sram_estimation_type: str | None = None) -> None:
"""Construct :class:`qualia_plugin_snn.postprocessing.EnergyEstimationMetric.EnergyEstimationMetric`.
:param mem_width: Memory access size in bits, e.g. 16 for 16-bit quantization
:param fifo_size: Size of the input/output FIFOs for each layer in SPLEAT
:param total_spikerate_exclude_nonbinary: If True, exclude non-binary inputs/outputs from total spikerate computation
:param op_estimation_type: Optional estimation type for the energy values, one of 'ICONIP', 'saturation', 'linear',
'quadratic', defaults to 'ICONIP', see :meth:`_set_energy_values` and :meth:`_set_op_estimation_type`
:param sram_estimation_type: Optional SRAM estimation algorithm, 'old' (ICONIP2022) or 'new' (T. Louis), defaults to
'old', see :meth:``_e_ram`
"""
super().__init__()
self._mem_width = mem_width
self._fifo_size = fifo_size
self._total_spikerate_exclude_nonbinary = total_spikerate_exclude_nonbinary
self._op_estimation_type = op_estimation_type
self._set_energy_values(mem_width, op_estimation_type)
self._sram_estimation_type = sram_estimation_type
[docs]
def _set_op_estimation_type(self, bit_width: int, op_type: str, op_estimation_type: str | int) -> float:
"""Set the estimation type for the energy values.
If op_estimation_type is not in ['ICONIP', 'saturation', 'linear', 'quadratic'], raise ValueError.
If op_estimation_type is ''ICONIP', use the energy values from the ICONIP 2022 paper (i.e. 32-bit values).
If op_estimation_type is 'saturation', use self.energy_values[8][type] for 8-bit and below, and
self.energy_values[32][type] for bit widths between 9 and 32.
If op_estimation_type is 'linear', use self.energy_values[8][type] and self.energy_values[32][type]
to estimate the energy values by solving a linear equation: y = m*bit_width + c.
If op_estimation_type is 'quadratic', use self.energy_values[8][type] and self.energy_values[32][type]
to estimate the energy values by solving a quadratic equation: y = a*bit_width^2 + b*bit_width + c.
:meta public:
:param bit_width: Bit width to compute energy for
:param op_type: Operation type, e.g., 'add' or 'mul'
:param op_estimation_type: The estimation type for the energy values, one of 'ICONIP', 'saturation', 'linear', 'quadratic'
:return: Energy for the given bit width, operation and estimation type
:raise ValueError: When ``op_estimation_type`` is invalid,
or ``bit_width`` is out of bounds for 'saturation' ``op_estimation_type``
"""
# Check for valid op_estimation_type
if op_estimation_type not in ['ICONIP','saturation', 'linear', 'quadratic']:
logger.error("Invalid op_estimation_type. Must be one of ['ICONIP','saturation', 'linear', 'quadratic']")
raise ValueError
# Get the 8-bit and 32-bit energy values for the given type
energy_8bit = self.energy_values[8][op_type]
energy_32bit = self.energy_values[32][op_type]
if op_estimation_type == 'ICONIP':
# For ICONIP, return the 32-bit value for bit_width <= 32
energy = energy_32bit
elif op_estimation_type == 'saturation':
# For saturation, return value for the next closest bitwidth
for target_bitwidth, energy_value in self.energy_values.items():
if bit_width <= target_bitwidth:
return energy_value[op_type]
logger.error('Bit width out of supported range (1-32) for saturation estimation.')
raise ValueError
elif op_estimation_type == 'linear':
# For linear, solve the equation y = m*bit_width + c
# Use (x1, y1) = (8, energy_8bit) and (x2, y2) = (32, energy_32bit) to find m and c
x1, y1 = 8, energy_8bit
x2, y2 = 32, energy_32bit
# Solve for slope (m) and intercept (c)
m = (y2 - y1) / (x2 - x1)
c = y1 - m * x1
# Estimate energy for the given bit_width
energy = m * bit_width + c
elif op_estimation_type == 'quadratic':
# For quadratic, solve the equation y = a*bit_width^2 + b*bit_width + c
# We assume two points (8, energy_8bit) and (32, energy_32bit), plus assume c = 0 (or another known value)
x1, y1 = 8, energy_8bit
x2, y2 = 32, energy_32bit
# We need a third point to solve a quadratic equation. We assume (bit_width=0, energy=0) as a reference point.
x0, y0 = 0, 0 # This assumes no energy consumption at 0 bits (can be modified if you have another reference)
import numpy as np
# Solve the system of equations for m_a * (a, b, c) = m_b to find a, b, and c
m_a = np.array([
[x0**2, x0, 1],
[x1**2, x1, 1],
[x2**2, x2, 1],
], dtype=np.float32)
m_b = np.array([y0, y1, y2], dtype=np.float32)
# Solve for [a, b, c]
a, b, c = np.linalg.solve(m_a, m_b)
# Estimate energy for the given bit_width using quadratic equation
energy = a * bit_width**2 + b * bit_width + c
else:
logger.error("Invalid op_estimation_type. Must be one of ['ICONIP', 'saturation', 'linear', 'quadratic']")
raise ValueError
return energy
[docs]
def _set_energy_values(self, bit_width: int, op_estimation_type: dict[str, str] | None) -> None:
"""Set the operation energy values for the given bit width.
If ``op_estimation_type`` is set, use :meth:`_set_op_estimation_type` to infer the values according to the wanted bit width
and type.
Otherwise, if ``bit_width`` is in :attr:`energy_values` use the predefined energy values in :attr:`energy_values`.
Otherwise, defaults to using predefined 32-bit values from :attr:`energy_values`.
:meta public:
:param bit_width: The bit width for the energy values.
:param op_estimation_type: The estimation type for the energy values.
"""
if bit_width in self.energy_values and op_estimation_type is None:
self._m_e_add = self.energy_values[bit_width]['add']
self._m_e_mul = self.energy_values[bit_width]['mul']
logger.info('Using %s-bit energy values from predefined values in Mark Horowitz, ISSCC 2014', bit_width)
elif op_estimation_type is not None:
if 'add' in op_estimation_type and 'mul' in op_estimation_type:
self._m_e_add = self._set_op_estimation_type(bit_width, 'add', op_estimation_type['add'])
self._m_e_mul = self._set_op_estimation_type(bit_width, 'mul', op_estimation_type['mul'])
logger.info('Using %s-bit energy values estimated using %s.', bit_width, op_estimation_type)
else:
logger.error("op_estimation_type must contain 'add' and 'mul' keys.")
raise ValueError
else:
self._m_e_add = self.energy_values[32]['add']
self._m_e_mul = self.energy_values[32]['mul']
#######
# FNN #
#######
[docs]
def _rdin_conv_fnn(self, layer: TConvLayer) -> int:
"""Count number of read operations for the input of a convolutional layer in a formal neural network.
Eq. 3: Cin × Cout × Hout × Wout × Wkernel × Hkernel.
:meta public:
:param layer: A convolutional layer
:return: Number of read operations for the input of a convolutional layer in a formal neural network
""" # noqa: RUF002
return layer.input_shape[0][-1] * math.prod(layer.output_shape[0][1:]) * math.prod(layer.kernel_size)
[docs]
def _e_rdin_conv_fnn(self, layer: TConvLayer, e_rdram: Callable[[int], float]) -> float:
"""Compute energy for read operations for the input of a convolution layer in a formal neural network.
:meta public:
:param layer: A convolutional layer
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Energy for read operations for the input of a convolution layer in a formal neural network
"""
return self._rdin_conv_fnn(layer) * e_rdram(math.prod(layer.input_shape[0][1:]))
[docs]
def _rdin_fc_fnn(self, layer: TDenseLayer) -> int:
"""Count number of read operations for the input of a fully-connected layer in a formal neural network.
Eq. 4: Nin.
:meta public:
:param layer: A fully-connected layer
:return: Number of read operations for the input of a fully-connected layer in a formal neural network
"""
return layer.input_shape[0][-1]
[docs]
def _e_rdin_fc_fnn(self, layer: TDenseLayer, e_rdram: Callable[[int], float]) -> float:
"""Compute energy for read operations for the input of a fully-connected layer in a formal neural network.
:meta public:
:param layer: A fully-connected layer
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Energy for read operations for the input of a fully-connected layer in a formal neural network
"""
return self._rdin_fc_fnn(layer) * e_rdram(math.prod(layer.input_shape[0][1:]))
[docs]
def _rdin_add_fnn(self, layer: TAddLayer) -> int:
"""Count number of read operations for the input of an add layer in a formal neural network.
#InLayers × Nin.
:meta public:
:param layer: An add layer
:return: Number of read operations for the input of an add layer in a formal neural network
""" # noqa: RUF002
return sum(sum(input_shape[1:]) for input_shape in layer.input_shape)
[docs]
def _e_rdin_add_fnn(self, layer: TAddLayer, e_rdram: Callable[[int], float]) -> float:
"""Compute energy for read operations for the input of an add layer in a formal neural network.
:meta public:
:param layer: An add layer
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Energy for read operations for the input of an add layer in a formal neural network
"""
return self._rdin_add_fnn(layer) * e_rdram(math.prod(layer.input_shape[0][1:]))
[docs]
def _rdweights_conv_fnn(self, layer: TConvLayer) -> int:
"""Count number of read operations for the weights of a convolutional layer (excluding bias) in a formal neural network.
Eq. 6.1f: Cin × Wkernel × Hkernel × Cout × Wout × Hout.
:meta public:
:param layer: A convolutional layer
:return: Number of read operations for the weights of a convolutional layer (excluding bias) in a formal neural network
""" # noqa: RUF002
return layer.input_shape[0][-1] * math.prod(layer.kernel_size) * math.prod(layer.output_shape[0][1:])
[docs]
def _e_rdweights_conv_fnn(self, layer: TConvLayer, e_rdram: Callable[[int], float]) -> float:
"""Compute energy for read operations for the weights of a convolutional layer (excluding bias) in a formal neural network.
:meta public:
:param layer: A convolutional layer
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Energy for read operations for the weights of a convolutional layer (excluding bias) in a formal neural network
"""
return self._rdweights_conv_fnn(layer) * e_rdram(layer.kernel.size)
[docs]
def _rdbias_conv_fnn(self, layer: TConvLayer) -> int:
"""Count number of read operations for the biases of a convolutional layer in a formal neural network.
Eq. 6.1s: Cout × Wout × Hout.
:meta public:
:param layer: A convolutional layer
:return: Number of read operations for the biases of a convolutional layer in a formal neural network
""" # noqa: RUF002
return math.prod(layer.output_shape[0][1:])
[docs]
def _e_rdbias_conv_fnn(self, layer: TConvLayer, e_rdram: Callable[[int], float]) -> float:
"""Compute energy for read operations for the biases of a convolutional layer in a formal neural network.
:meta public:
:param layer: A convolutional layer
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Energy for read operations for the biases of a convolutional layer in a formal neural network or 0 if the layer
does not use biases
"""
if layer.use_bias:
return self._rdbias_conv_fnn(layer) * e_rdram(layer.bias.size)
return 0
[docs]
def _rdweights_fc_fnn(self, layer: TDenseLayer) -> int:
"""Count number of read operations for the weights of a fully-connected layer (excluding bias) in a formal neural network.
Eq. 6.2f: Nin × Nout.
:meta public:
:param layer: A fully-connected layer
:return: Number of read operations for the weights of a fully-connected layer (excluding bias) in a formal neural network
""" # noqa: RUF002
return layer.input_shape[0][-1] * layer.output_shape[0][-1]
[docs]
def _e_rdweights_fc_fnn(self, layer: TDenseLayer, e_rdram: Callable[[int], float]) -> float:
"""Compute energy for read operations for weights of a fully-connected layer (excluding bias) in a formal neural network.
:meta public:
:param layer: A fully-connected layer
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Energy for read operations for the weights of a fully-connected layer (excluding bias) in a formal neural network
"""
return self._rdweights_fc_fnn(layer) * e_rdram(layer.kernel.size)
[docs]
def _rdbias_fc_fnn(self, layer: TDenseLayer) -> int:
"""Count number of read operations for the biases of a fully-connected layer in a formal neural network.
Eq. 6.2s: Nout.
:meta public:
:param layer: A fully-connected layer
:return: Number of read operations for the biases of a fully-connected layer in a formal neural network
"""
return layer.output_shape[0][-1]
[docs]
def _e_rdbias_fc_fnn(self, layer: TDenseLayer, e_rdram: Callable[[int], float]) -> float:
"""Compute energy for read operations for the biases of a fully-connected layer in a formal neural network.
:meta public:
:param layer: A fully-connected layer
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Energy for read operations for the biases of a fully-connected layer in a formal neural network or 0 if the layer
does not use biases
"""
if layer.use_bias:
return self._rdbias_fc_fnn(layer) * e_rdram(layer.bias.size)
return 0
[docs]
def _wrout_conv_fnn(self, layer: TConvLayer) -> int:
"""Count number of write operations for the output of a convolutional layer in a formal neural network.
Eq. 11: Cout × Hout × Wout.
:meta public:
:param layer: A convolutional layer
:return: Number of write operations for the output of a convolutional layer in a formal neural network
""" # noqa: RUF002
return math.prod(layer.output_shape[0][1:])
[docs]
def _e_wrout_conv_fnn(self, layer: TConvLayer, e_wrram: Callable[[int], float]) -> float:
"""Compute energy for write operations for the output of a convolutional layer in a formal neural network.
:meta public:
:param layer: A convolutional layer
:param e_rdram: Function to compute memory write access energy for a given memory size
:return: Energy for write operations for the output of a convolutional layer in a formal neural network
"""
return self._wrout_conv_fnn(layer) * e_wrram(math.prod(layer.output_shape[0][1:]))
[docs]
def _wrout_fc_fnn(self, layer: TDenseLayer) -> int:
"""Count number of write operations for the output of a fully-connected layer in a formal neural network.
Eq. 12: Nout.
:meta public:
:param layer: A fully-connected layer
:return: Number of write operations for the output of a fully-connected layer in a formal neural network
"""
return layer.output_shape[0][-1]
[docs]
def _e_wrout_fc_fnn(self, layer: TDenseLayer, e_wrram: Callable[[int], float]) -> float:
"""Compute energy for write operations for the output of a fully-connected layer in a formal neural network.
:meta public:
:param layer: A fully-connected layer
:param e_rdram: Function to compute memory write access energy for a given memory size
:return: Energy for write operations for the output of a fully-connected layer in a formal neural network
"""
return self._wrout_fc_fnn(layer) * e_wrram(math.prod(layer.output_shape[0][1:]))
[docs]
def _wrout_add_fnn(self, layer: TAddLayer) -> int:
"""Count number of write operations for the output of an add layer in a formal neural network.
Nout.
:meta public:
:param layer: An add layer
:return: Number of write operations for the output of an add layer in a formal neural network
"""
return layer.output_shape[0][-1]
[docs]
def _e_wrout_add_fnn(self, layer: TAddLayer, e_wrram: Callable[[int], float]) -> float:
"""Compute energy for write operations for the output of an add layer in a formal neural network.
:meta public:
:param layer: An add layer
:param e_rdram: Function to compute memory write access energy for a given memory size
:return: Energy for write operations for the output of an add layer in a formal neural network
"""
return self._wrout_add_fnn(layer) * e_wrram(math.prod(layer.output_shape[0][1:]))
[docs]
def _mac_ops_conv_fnn(self, layer: TConvLayer) -> int:
"""Count number of multiply-accumulate operations inside a convolutional layer in a formal neural network.
Eq. 1.1: Cout × Hout × Wout × Cin × Hkernel × Wkernel.
:meta public:
:param layer: A convolutional layer
:return: Number of multiply-accumulate operations inside a convolutional layer in a formal neural network.
""" # noqa: RUF002
return math.prod(layer.output_shape[0][1:]) * layer.input_shape[0][-1] * math.prod(layer.kernel_size)
def _acc_ops_conv_fnn(self, layer: TConvLayer) -> int:
"""Count number of accumulate operations inside a convolutional layer in a formal neural network.
Eq. 1.2: Cout × Hout × Wout.
:meta public
:param layer: A convolutional layer
:return: Number of accumulate operations inside a convolutional layer in a formal neural network.
""" # noqa: RUF002
return math.prod(layer.output_shape[0][1:])
[docs]
def _e_ops_conv_fnn(self, layer: TConvLayer) -> float:
"""Compute energy for multiply-accumulate and accumulate operations inside convolutional layer in a formal neural network.
:meta public:
:param layer: A convolutional layer
:return: Energy for multiply-accumulate and accumulate operations inside a convolutional layer in a formal neural network.
"""
return self._mac_ops_conv_fnn(layer) * (self._e_mul + self._e_add) + self._acc_ops_conv_fnn(layer) * self._e_add
[docs]
def _mac_ops_fc_fnn(self, layer: TDenseLayer) -> int:
"""Count number of multiply-accumulate operations inside a fully-connected layer in a formal neural network.
Eq. 2.1: Nin × Nout.
:meta public:
:param layer: A fully-connected layer
:return: Number of multiply-accumulate operations inside a fully-connected layer in a formal neural network.
""" # noqa: RUF002
return layer.input_shape[0][-1] * layer.output_shape[0][-1]
[docs]
def _acc_ops_fc_fnn(self, layer: TDenseLayer) -> int:
"""Count number of accumulate operations inside a fully-connected layer in a formal neural network.
Eq. 2.2: Nout.
:meta public:
:param layer: A fully-connected layer
:return: Number of accumulate operations inside a fully-connected layer in a formal neural network.
"""
return layer.output_shape[0][-1]
[docs]
def _e_ops_fc_fnn(self, layer: TDenseLayer) -> float:
"""Compute energy for multiply-accumulate and accumulate operations inside fully-connected layer in formal neural network.
:meta public:
:param layer: A fully-connected layer
:return: Energy for multiply-accumulate and accumulate operations inside a fully-connected layer in formal neural network.
"""
return self._mac_ops_fc_fnn(layer) * (self._e_mul + self._e_add) + self._acc_ops_fc_fnn(layer) * self._e_add
[docs]
def _mac_ops_add_fnn(self, _: TAddLayer) -> int:
"""Count number of multiply-accumulate operations inside an add layer in a formal neural network.
0.
:meta public:
:param layer: An add layer
:return: 0.
"""
return 0
[docs]
def _acc_ops_add_fnn(self, layer: TAddLayer) -> int:
"""Count number of accumulate operations inside an add layer in a formal neural network.
(#InLayers -1 ) × Nin
Nout.
:meta public:
:param layer: An add layer
:return: Number of accumulate operations inside an add layer in a formal neural network.
""" # noqa: RUF002
return min(sum(input_shape[1:]) for input_shape in layer.input_shape) * (len(layer.input_shape) - 1)
[docs]
def _e_ops_add_fnn(self, layer: TAddLayer) -> float:
"""Compute energy for multiply-accumulate and accumulate operations inside an add layer in formal neural network.
:meta public:
:param layer: A fully-connected layer
:return: Energy for multiply-accumulate and accumulate operations inside an add layer in formal neural network.
"""
return self._mac_ops_add_fnn(layer) * (self._e_mul + self._e_add) + self._acc_ops_add_fnn(layer) * self._e_add
[docs]
def _mac_addr_conv_fnn(self, _: TConvLayer) -> int:
"""Count number of multiply-accumulate operations for addressing a convolutional layer in a formal neural network.
0
:meta public:
:return: 0
"""
return 0
[docs]
def _acc_addr_conv_fnn(self, layer: TConvLayer) -> int:
"""Count number of accumulate operations for addressing a convolutional layer in a formal neural network.
Cin × Hin × Win + Cout × Hout × Wout + Cout × Hkernel × Wkernel.
:meta public:
:param layer: A convolutional layer
:return: Number of accumulate operations for addressing a convolutional layer in a formal neural network
""" # noqa: RUF002
return (math.prod(layer.input_shape[0][1:])
+ math.prod(layer.output_shape[0][1:])
+ layer.output_shape[0][-1] * math.prod(layer.kernel_size))
[docs]
def _e_addr_conv_fnn(self, layer: TConvLayer) -> float:
"""Compute energy for addressing a convolutional layer in a formal neural network.
:meta public:
:param layer: A convolutional layer
:return: Energy for addressing a convolutional layer in a formal neural network.
"""
return self._mac_addr_conv_fnn(layer) * (self._e_mul + self._e_add) + self._acc_addr_conv_fnn(layer) * self._e_add
[docs]
def _mac_addr_fc_fnn(self, _: TDenseLayer) -> int:
"""Count number of multiply-accumulate operations for addressing a fully-connected layer in a formal neural network.
0
:meta public:
:return: 0
"""
return 0
[docs]
def _acc_addr_fc_fnn(self, layer: TDenseLayer) -> int:
"""Count number of accumulate operations for addressing a fully-connected layer in a formal neural network.
Nin × Nout.
:meta public:
:param layer: A fully-connected layer
:return: Number of accumulate operations for addressing a fully-connected layer in a formal neural network
""" # noqa: RUF002
return layer.input_shape[0][-1] * layer.output_shape[0][-1]
[docs]
def _e_addr_fc_fnn(self, layer: TDenseLayer) -> float:
"""Compute energy for addressing a fully-connected layer in a formal neural network.
:meta public:
:param layer: A fully-connected layer
:return: Energy for addressing a fully-connected layer in a formal neural network.
"""
return self._mac_addr_fc_fnn(layer) * (self._e_mul + self._e_add) + self._acc_addr_fc_fnn(layer) * self._e_add
[docs]
def _mac_addr_add_fnn(self, _: TAddLayer) -> int:
"""Count number of multiply-accumulate operations for addressing an add layer in a formal neural network.
0
:meta public:
:return: 0
"""
return 0
[docs]
def _acc_addr_add_fnn(self, layer: TAddLayer) -> int:
"""Count number of accumulate operations for addressing an add layer in a formal neural network.
Nin.
:meta public:
:param layer: An add layer
:return: Number of accumulate operations for addressing an add layer in a formal neural network
"""
return min(sum(input_shape[1:]) for input_shape in layer.input_shape)
[docs]
def _e_addr_add_fnn(self, layer: TAddLayer) -> float:
"""Compute energy for addressing an add layer in a formal neural network.
:meta public:
:param layer: An add layer
:return: Energy for addressing an add layer in a formal neural network.
"""
return self._mac_addr_add_fnn(layer) * (self._e_mul + self._e_add) + self._acc_addr_add_fnn(layer) * self._e_add
[docs]
def _compute_model_energy_fnn(self,
modelgraph: ModelGraph,
e_rdram: Callable[[int], float],
e_wrram: Callable[[int], float]) -> list[EnergyMetrics]:
"""Compute the energy per inference for each layer of a formal neural network.
Supports the following layers:
* :class:`qualia_codegen_core.graph.layers.TConvLayer.TConvLayer`
* :class:`qualia_codegen_core.graph.layers.TDenseLayer.TDenseLayer`
* :class:`qualia_codegen_core.graph.layers.TAddLayer.TAddLayer`
:meta public:
:param modelgraph: Model to compute energy on
:param e_rdram: Function to compute memory read energy for a given memory size
:param e_wrram: Function to compute memory write energy for a given memory size
:return: A list of EnergyMetrics for each layer and a total with fields populated with energy estimation
"""
from qualia_codegen_core.graph.layers import TAddLayer, TConvLayer, TDenseLayer, TFlattenLayer
ems: list[EnergyMetrics] = []
for node in modelgraph.nodes:
if isinstance(node.layer, TFlattenLayer):
# Flatten is assumed to not do anything for on-target inference
em = EnergyMetrics(name=node.layer.name,
mem_pot=0,
mem_weights=0,
mem_bias=0,
mem_io=0,
ops=0,
addr=0,
input_spikerate=None,
output_spikerate=None,
input_count=None,
output_count=None,
input_is_binary=False,
output_is_binary=False,
is_sj=False)
elif isinstance(node.layer, TConvLayer):
em = EnergyMetrics(name=node.layer.name,
mem_pot=0,
mem_weights=self._e_rdweights_conv_fnn(node.layer, e_rdram),
mem_bias=self._e_rdbias_conv_fnn(node.layer, e_rdram),
mem_io=self._e_rdin_conv_fnn(node.layer, e_rdram) + self._e_wrout_conv_fnn(node.layer, e_wrram),
ops=self._e_ops_conv_fnn(node.layer),
addr=self._e_addr_conv_fnn(node.layer),
input_spikerate=None,
output_spikerate=None,
input_count=None,
output_count=None,
input_is_binary=False,
output_is_binary=False,
is_sj=False)
elif isinstance(node.layer, TDenseLayer):
em = EnergyMetrics(name=node.layer.name,
mem_pot=0,
mem_weights=self._e_rdweights_fc_fnn(node.layer, e_rdram),
mem_bias=self._e_rdbias_fc_fnn(node.layer, e_rdram),
mem_io=self._e_rdin_fc_fnn(node.layer, e_rdram) + self._e_wrout_fc_fnn(node.layer, e_wrram),
ops=self._e_ops_fc_fnn(node.layer),
addr=self._e_addr_fc_fnn(node.layer),
input_spikerate=None,
output_spikerate=None,
input_count=None,
output_count=None,
input_is_binary=False,
output_is_binary=False,
is_sj=False)
elif isinstance(node.layer, TAddLayer):
# Assume element-wise addition of inputs, meaning we need to read inputs and write outputs
em = EnergyMetrics(name=node.layer.name,
mem_pot=0, # No potentials for add layer
mem_weights=0, # No weights for add layer
mem_bias=0, # No biases for add layer
mem_io=self._e_rdin_add_fnn(node.layer, e_rdram) + self._e_wrout_add_fnn(node.layer, e_wrram),
ops=self._e_ops_add_fnn(node.layer),
addr=self._e_addr_add_fnn(node.layer),
input_spikerate=None,
output_spikerate=None,
input_count=None,
output_count=None,
input_is_binary=False,
output_is_binary=False,
is_sj=False)
else:
logger.warning('%s skipped, result may be inaccurate', node.layer.name)
continue
ems.append(em)
em_total = EnergyMetrics(name='Total',
mem_pot=sum(em.mem_pot for em in ems),
mem_weights=sum(em.mem_weights for em in ems),
mem_bias=sum(em.mem_bias for em in ems),
mem_io=sum(em.mem_io for em in ems),
ops=sum(em.ops for em in ems),
addr=sum(em.addr for em in ems),
input_spikerate=None,
output_spikerate=None,
input_count=None,
output_count=None,
input_is_binary=False,
output_is_binary=False,
is_sj=False)
ems.append(em_total)
return ems
#######
# SNN #
#######
[docs]
def _rdin_snn(self, layer: TBaseLayer, input_spikerate: float) -> float:
"""Count average number of read operations for the input of a layer in a spiking neural network.
Eq. 5: θl-1.
:meta public:
:param layer: A convolutional or fully-connected layer
:param input_spikerate: Average spike per input per inference
:return: Average number of read operations for the input of a layer in a spiking neural network
"""
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return theta_in # noqa: RET504
[docs]
def _e_rdin_snn(self, layer: TBaseLayer, input_spikerate: float, e_rdram: Callable[[int], float]) -> float:
"""Compute average energy for read operations for the input of a layer in a spiking neural network.
:meta public:
:param layer: A convolutional or fully-connected layer
:param input_spikerate: Average spike per input per inference
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Average energy for read operations for the input of a layer in a spiking neural network
"""
return self._rdin_snn(layer, input_spikerate) * e_rdram(self._fifo_size)
[docs]
def _rdweights_conv_snn(self, layer: TConvLayer, input_spikerate: float) -> float:
"""Count average number of read operations for weights of convolutional layer (excluding bias) in spiking neural network.
Eq. 7f: θl-1 × Cout × Wkernel × Hkernel.
:meta public:
:param layer: A convolutional layer
:param input_spikerate: Average spike per input per inference
:return: Average number of read operations for weights of convolutional layer (excluding bias) in spiking neural network
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return theta_in * layer.output_shape[0][-1] * math.prod(layer.kernel_size)
[docs]
def _e_rdweights_conv_snn(self, layer: TConvLayer, input_spikerate: float, e_rdram: Callable[[int], float]) -> float:
"""Compute average energy for read operations for convolutional layer weights (excluding bias) in spiking neural network.
:meta public:
:param layer: A convolutional layer
:param input_spikerate: Average spike per input per inference
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Average energy for read operations for weights of convolutional layer (excluding bias) in spiking neural network
"""
return self._rdweights_conv_snn(layer, input_spikerate) * e_rdram(layer.kernel.size)
[docs]
def _rdbias_conv_snn(self, layer: TConvLayer, timesteps: int) -> int:
"""Count number of read operations for the biases of a convolutional layer in a spiking neural network.
Eq. 7s: Cout × Wout × Hout.
:meta public:
:param layer: A convolutional layer
:param timesteps: Number of timesteps
:return: Number of read operations for the biases of a convolutional layer in a spiking neural network
""" # noqa: RUF002
return math.prod(layer.output_shape[0][1:]) * timesteps
[docs]
def _e_rdbias_conv_snn(self, layer: TConvLayer, timesteps: int, e_rdram: Callable[[int], float]) -> float:
"""Compute energy for read operations for the biases of a convolutional layer in a spiking neural network.
:meta public:
:param layer: A convolutional layer
:param timesteps: Number of timesteps
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Energy for read operations for the biases of a convolutional layer in a spiking neural network or 0 if the layer
does not use biases
"""
if layer.use_bias:
return self._rdbias_conv_snn(layer, timesteps) * e_rdram(layer.bias.size)
return 0
[docs]
def _rdweights_fc_snn(self, layer: TDenseLayer, input_spikerate: float) -> float:
"""Count average number of read operations for weights of fully-connected layer (excluding bias) in spiking neural network.
Eq. 8f: θl-1 × Nout.
:meta public:
:param layer: A fully-connected layer
:param input_spikerate: Average spike per input per inference
:return: Average number of read operations for weights of fully-connected layer (excluding bias) in spiking neural network
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return theta_in * layer.output_shape[0][-1]
[docs]
def _e_rdweights_fc_snn(self, layer: TDenseLayer, input_spikerate: float, e_rdram: Callable[[int], float]) -> float:
"""Compute average energy for weights read operations of fully-connected layer (excluding bias) in spiking neural network.
:meta public:
:param layer: A fully-connected layer
:param input_spikerate: Average spike per input per inference
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Average energy for read operations for weights of fully-connected layer (excluding bias) in spiking neural network
"""
return self._rdweights_fc_snn(layer, input_spikerate) * e_rdram(layer.kernel.size)
[docs]
def _rdbias_fc_snn(self, layer: TDenseLayer, timesteps: int) -> int:
"""Count number of read operations for the biases of a fully-connected layer in a spiking neural network.
Eq. 6.2s: Nout.
:meta public:
:param layer: A fully-connected layer
:param timesteps: Number of timesteps
:return: Number of read operations for the biases of a fully-connected layer in a spiking neural network
"""
return layer.output_shape[0][-1] * timesteps
[docs]
def _e_rdbias_fc_snn(self, layer: TDenseLayer, timesteps: int, e_rdram: Callable[[int], float]) -> float:
"""Compute energy for read operations for the biases of a fully-connected layer in a spiking neural network.
:meta public:
:param layer: A fully-connected layer
:param timesteps: Number of timesteps
:param e_rdram: Function to compute memory read access energy for a given memory size
:return: Energy for read operations for the biases of a fully-connected layer in a spiking neural network or 0 if the layer
does not use biases
"""
if layer.use_bias:
return self._rdbias_fc_snn(layer, timesteps) * e_rdram(layer.bias.size)
return 0
[docs]
def _wrout_snn(self, layer: TBaseLayer, output_spikerate: float) -> float:
"""Count average number of write operations for the output of a layer in a spiking neural network.
Eq. 13: Noutput.
:meta public:
:param layer: A convolutional or fully-connected layer
:param output_spikerate: Average spike per output per inference
:return: Average number of write operations for the output of a layer in a spiking neural network
"""
theta_out = output_spikerate * math.prod(layer.output_shape[0][1:])
return theta_out # noqa: RET504
[docs]
def _e_wrout_snn(self, layer: TBaseLayer, output_spikerate: float, e_wrram: Callable[[int], float]) -> float:
"""Compute average energy for write operations for the output of a layer in a spiking neural network.
:meta public:
:param layer: A convolutional or fully-connected layer
:param output_spikerate: Average spike per output per inference
:param e_rdram: Function to compute memory write access energy for a given memory size
:return: Average energy for write operations for the output of a layer in a spiking neural network
"""
return self._wrout_snn(layer, output_spikerate) * e_wrram(self._fifo_size)
[docs]
def _wrpot_conv_snn(self, layer: TConvLayer, input_spikerate: float, timesteps: int) -> float:
"""Count average number of write operations for the potentials of a convolutional layer in a spiking neural network.
Eq 14: θl-1 × Cout × Wkernel × Hkernel + Cout × Hout × Wout.
:meta public:
:param layer: A convolutional layer
:param timesteps: Number of timesteps
:param input_spikerate: Average spike per input per inference
:return: Average number of write operations for the potentials of a convolutional layer in a spiking neural network
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return (theta_in * layer.output_shape[0][-1] * math.prod(layer.kernel_size)
+ math.prod(layer.output_shape[0][1:]) * timesteps)
[docs]
def _e_wrpot_conv_snn(self,
layer: TConvLayer,
input_spikerate: float,
timesteps: int,
e_wrram: Callable[[int], float]) -> float:
"""Compute average energy for write operations for the potentials of a convolutional layer in a spiking neural network.
:meta public:
:param layer: A convolutional layer
:param timesteps: Number of timesteps
:param input_spikerate: Average spike per input per inference
:param e_rdram: Function to compute memory write access energy for a given memory size
:return: Average energy for write operations for the potentials of a convolutional layer in a spiking neural network
"""
return self._wrpot_conv_snn(layer, input_spikerate, timesteps) * e_wrram(math.prod(layer.output_shape[0][1:]))
[docs]
def _wrpot_fc_snn(self, layer: TDenseLayer, input_spikerate: float, timesteps: int) -> float:
"""Count average number of write operations for the potentials of a fully-connected layer in a spiking neural network.
Eq 15: θl-1 × Nout + Nout.
:meta public:
:param layer: A fully-connected layer
:param timesteps: Number of timesteps
:param input_spikerate: Average spike per input per inference
:return: Average number of write operations for the potentials of a fully-connected layer in a spiking neural network
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return theta_in * layer.output_shape[0][-1] + layer.output_shape[0][-1] * timesteps
[docs]
def _e_wrpot_fc_snn(self,
layer: TDenseLayer,
input_spikerate: float,
timesteps: int,
e_wrram: Callable[[int], float]) -> float:
"""Compute average energy for write operations for the potentials of a fully-connected layer in a spiking neural network.
:meta public:
:param layer: A fully-connected layer
:param timesteps: Number of timesteps
:param input_spikerate: Average spike per input per inference
:param e_rdram: Function to compute memory write access energy for a given memory size
:return: Average energy for write operations for the potentials of a fully-connected layer in a spiking neural network
"""
return self._wrpot_fc_snn(layer, input_spikerate, timesteps) * e_wrram(math.prod(layer.output_shape[0][1:]))
[docs]
def _rdpot_conv_snn(self, layer: TConvLayer, input_spikerate: float, timesteps: int) -> float:
"""Count average number of read operations for the potentials of a convolutional layer in a spiking neural network.
Eq 9: θl-1 × Cout × Wkernel × Hkernel + Cout × Hout × Wout.
:meta public:
:param layer: A convolutional layer
:param timesteps: Number of timesteps
:param input_spikerate: Average spike per input per inference
:return: Average number of read operations for the potentials of a convolutional layer in a spiking neural network
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return (theta_in * layer.output_shape[0][-1] * math.prod(layer.kernel_size)
+ math.prod(layer.output_shape[0][1:]) * timesteps)
[docs]
def _e_rdpot_conv_snn(self,
layer: TConvLayer,
input_spikerate: float,
timesteps: int,
e_rdram: Callable[[int], float]) -> float:
"""Compute average energy for write operations for the potentials of a convolutional layer in a spiking neural network.
:meta public:
:param layer: A convolutional layer
:param timesteps: Number of timesteps
:param input_spikerate: Average spike per input per inference
:param e_rdram: Function to compute memory write access energy for a given memory size
:return: Average energy for read operations for the potentials of a convolutional layer in a spiking neural network
"""
return self._rdpot_conv_snn(layer, input_spikerate, timesteps) * e_rdram(math.prod(layer.output_shape[0][1:]))
[docs]
def _rdpot_fc_snn(self, layer: TDenseLayer, input_spikerate: float, timesteps: int) -> float:
"""Count average number of read operations for the potentials of a fully-connected layer in a spiking neural network.
Eq 10: θl-1 × Nout + Nout.
:meta public:
:param layer: A fully-connected layer
:param timesteps: Number of timesteps
:param input_spikerate: Average spike per input per inference
:return: Average number of read operations for the potentials of a fully-connected layer in a spiking neural network
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return theta_in * layer.output_shape[0][-1] + layer.output_shape[0][-1] * timesteps
[docs]
def _e_rdpot_fc_snn(self,
layer: TDenseLayer,
input_spikerate: float,
timesteps: int,
e_wrram: Callable[[int], float]) -> float:
"""Compute average energy for read operations for the potentials of a fully-connected layer in a spiking neural network.
:meta public:
:param layer: A fully-connected layer
:param timesteps: Number of timesteps
:param input_spikerate: Average spike per input per inference
:param e_rdram: Function to compute memory write access energy for a given memory size
:return: Average energy for read operations for the potentials of a fully-connected layer in a spiking neural network
"""
return self._rdpot_fc_snn(layer, input_spikerate, timesteps) * e_wrram(math.prod(layer.output_shape[0][1:]))
[docs]
def _mac_ops_conv_snn(self, layer: TConvLayer, timesteps: int, leak: bool) -> int: # noqa: FBT001
"""Count number of multiply-accumulate operations inside a convolutional layer in a spiking neural network.
Eq. 1.3: T × Cout × Hout × Wout.
:meta public:
:param layer: A convolutional layer
:param timesteps: Number of timesteps
:param leak: Whether the neuron has leak (LIF) or not (IF)
:return: Number of multiply-accumulate operations inside a convolutional layer in a spiking neural network, 0 if no leak
""" # noqa: RUF002
if not leak:
return 0
return timesteps * math.prod(layer.output_shape[0][1:])
[docs]
def _acc_ops_conv_snn(self, layer: TConvLayer, input_spikerate: float, output_spikerate: float, timesteps: int) -> float:
"""Count average number of accumulate operations inside a convolutional layer in a spiking neural network.
Eq. 1.4: θl-1 × ceil(Hkernel / S) × ceil(Wkernel / S) × Cout + T × Cout × Hout × Wout + θl.
:meta public:
:param layer: A convolutional layer
:param input_spikerate: Average spike per input per inference
:param output_spikerate: Average spike per output per inference
:param timesteps: Number of timesteps
:return: Average number of accumulate operations inside a convolutional layer in a spiking neural network.
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
theta_out = output_spikerate * math.prod(layer.output_shape[0][1:])
return (theta_in
* math.prod(math.ceil(k / s) for k, s in zip(layer.kernel_size, layer.strides))
* layer.output_shape[0][-1]
+ timesteps * math.prod(layer.output_shape[0][1:])
+ theta_out)
[docs]
def _e_ops_conv_snn(self,
layer: TConvLayer,
input_spikerate: float,
output_spikerate: float,
timesteps: int,
leak: bool) -> float: # noqa: FBT001
"""Compute average energy for multiply-accumulate and accumulate inside a convolutional layer in spiking neural network.
:meta public:
:param layer: A convolutional layer
:param input_spikerate: Average spike per input per inference
:param output_spikerate: Average spike per output per inference
:param timesteps: Number of timesteps
:param leak: Whether the neuron has leak (LIF) or not (IF)
:return: Average energy for multiply-accumulate and accumulate inside a convolutional layer in spiking neural network.
"""
return (self._mac_ops_conv_snn(layer, timesteps, leak) * (self._e_mul + self._e_add)
+ self._acc_ops_conv_snn(layer, input_spikerate, output_spikerate, timesteps) * self._e_add)
[docs]
def _mac_ops_fc_snn(self, layer: TDenseLayer, timesteps: int, leak: bool) -> int: # noqa: FBT001
"""Count number of multiply-accumulate operations inside a fully-connected layer in a spiking neural network.
Eq. 2.3: T × Nout.
:meta public:
:param layer: A fully-connected layer
:param timesteps: Number of timesteps
:param leak: Whether the neuron has leak (LIF) or not (IF)
:return: Number of multiply-accumulate operations inside a fully-connected layer in a spiking neural network, 0 if no leak
""" # noqa: RUF002
if not leak:
return 0
return timesteps * layer.output_shape[0][-1]
[docs]
def _acc_ops_fc_snn(self, layer: TDenseLayer, input_spikerate: float, output_spikerate: float, timesteps: int) -> float:
"""Count average number of accumulate operations inside a fully-connected layer in a spiking neural network.
Eq. 2.4: θl-1 × Nout + T × Nout + θl.
**Warning 1**: the article counts Nin times too many as 1 input spike triggers only Nout MAC (each output neuron).
**Warning 2**: the article is missing the third term which corresponds to the reset.
Original equation (not used here):
Eq. 2.4: θl-1 × Nin × Nout + T × Nout.
:meta public:
:param layer: A fully-connected layer
:param input_spikerate: Average spike per input per inference
:param output_spikerate: Average spike per output per inference
:param timesteps: Number of timesteps
:return: Average number of accumulate operations inside a fully-connected layer in a spiking neural network.
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return (theta_in * layer.output_shape[0][-1]
+ timesteps * layer.output_shape[0][-1]
+ output_spikerate * math.prod(layer.output_shape[0][1:]))
[docs]
def _e_ops_fc_snn(self,
layer: TDenseLayer,
input_spikerate: float,
output_spikerate: float,
timesteps: int,
leak: bool) -> float: # noqa: FBT001
"""Compute average energy for multiply-accumulate and accumulate inside fully-connected layer in spiking neural network.
:meta public:
:param layer: A fully-connected layer
:param input_spikerate: Average spike per input per inference
:param output_spikerate: Average spike per output per inference
:param timesteps: Number of timesteps
:param leak: Whether the neuron has leak (LIF) or not (IF)
:return: Average energy for multiply-accumulate and accumulate inside a fully-connected layer in spiking neural network.
"""
return (self._mac_ops_fc_snn(layer, timesteps, leak) * (self._e_mul + self._e_add)
+ self._acc_ops_fc_snn(layer, input_spikerate, output_spikerate, timesteps) * self._e_add)
[docs]
def _mac_addr_conv_snn(self, layer: TConvLayer, input_spikerate: float) -> float:
"""Count average number of multiply-accumulate operations for addressing a convolutional layer in a spiking neural network.
Eq. 16.2: θl-1 × 2.
:meta public:
:param layer: A convolutional layer
:param input_spikerate: Average spike per input per inference
:return: average number of multiply-accumulate operations for addressing a convolutional layer in a spiking neural network.
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return theta_in * 2
[docs]
def _acc_addr_conv_snn(self, layer: TConvLayer, input_spikerate: float) -> float:
"""Count average number of accumulate operations for addressing a convolutional layer in a spiking neural network.
Eq 16.3: θl-1 × Cout × Hkernel × Wkernel.
:meta public:
:param layer: A convolutional layer
:param input_spikerate: Average spike per input per inference
:return: Average number of accumulate operations for addressing a convolutional layer in a spiking neural network
""" # noqa: RUF002
theta_in = input_spikerate * math.prod(layer.input_shape[0][1:])
return theta_in * math.prod(layer.kernel_size)
[docs]
def _e_addr_conv_snn(self, layer: TConvLayer, input_spikerate: float) -> float:
"""Compute average energy for addressing a convolutional layer in a spiking neural network.
:meta public:
:param layer: A convolutional layer
:param input_spikerate: Average spike per input per inference
:return: Average energy for addressing a convolutional layer in a spiking neural network.
"""
return (self._mac_addr_conv_snn(layer, input_spikerate) * (self._e_mul + self._e_add)
+ self._acc_addr_conv_snn(layer, input_spikerate) * self._e_add)
[docs]
def _mac_addr_fc_snn(self, _: TDenseLayer) -> int:
"""Count number of multiply-accumulate operations for addressing a fully-connected layer in a spiking neural network.
0
:meta public:
:return: 0
"""
return 0
[docs]
def _acc_addr_fc_snn(self, layer: TDenseLayer, input_spikerate: float) -> float:
"""Count average number of accumulate operations for addressing a fully-connected layer in a spiking neural network.
Eq. 17.2: θl-1 × Nout.
:meta public:
:param layer: A fully-connected layer
:param input_spikerate: Average spike per input per inference
:return: Average number of accumulate operations for addressing a fully-connected layer in a spiking neural network
""" # noqa: RUF002
return input_spikerate * layer.output_shape[0][-1]
[docs]
def _e_addr_fc_snn(self, layer: TDenseLayer, input_spikerate: float) -> float:
"""Compute average energy for addressing a fully-connected layer in a spiking neural network.
:meta public:
:param layer: A fully-connected layer
:param input_spikerate: Average spike per input per inference
:return: Average energy for addressing a fully-connected layer in a spiking neural network.
"""
return (self._mac_addr_fc_snn(layer) * (self._e_mul + self._e_add)
+ self._acc_addr_fc_snn(layer, input_spikerate) * self._e_add)
[docs]
def _compute_model_energy_snn(self, # noqa: PLR0913, C901, PLR0912, PLR0915
modelgraph: ModelGraph,
input_spikerates: dict[str, float],
output_spikerates: dict[str, float],
input_is_binary: dict[str, bool],
output_is_binary: dict[str, bool],
input_counts: dict[str, Number],
output_counts: dict[str, Number],
is_module_sj: dict[str, bool],
timesteps: int,
e_rdram: Callable[[int], float],
e_wrram: Callable[[int], float]) -> list[EnergyMetrics]:
"""Compute the energy per inference for each layer of a spiking neural network.
Supports the following layers:
* :class:`qualia_codegen_core.graph.layers.TConvLayer.TConvLayer`
* :class:`qualia_codegen_core.graph.layers.TDenseLayer.TDenseLayer`
Input spike rates are per-timestep, this function multiplies by the number of timesteps to get the spike rates per infernce
which are used by the operation count and energy computation functions.
:meta public:
:param modelgraph: Model to compute energy on
:param input_spikerate: Dict of layer names and average spike per input per timestep for the layer
:param output_spikerate: Dict of layer names and average spike per output per timestep for the layer
:param input_is_binary: Dict of layer names and whether its input is binary (spike) or not
:param output_is_binary: Dict of layer names and whether its output is binary (spike) or not
:param input_counts: Dict of layer names and number of inputs for the layer
:param output_counts: Dict of layer names and number of outputs for the layer
:param timesteps: Number of timesteps
:param e_rdram: Function to compute memory read energy for a given memory size
:param e_wrram: Function to compute memory write energy for a given memory size
:return: A list of EnergyMetrics for each layer and a total with fields populated with energy estimation
"""
from qualia_codegen_core.graph.layers import TAddLayer, TConvLayer, TDenseLayer, TFlattenLayer, TInputLayer
from qualia_codegen_plugin_snn.graph.layers import TIfLayer, TLifLayer
ems: list[EnergyMetrics] = []
for node in modelgraph.nodes:
# Skip dummy input layer
if isinstance(node.layer, TInputLayer):
continue
# TIfLayer is completely hidden in case the previous layer is Conv/Dense since it already contains the required info
if isinstance(node.layer, TIfLayer) and len(node.innodes) > 0 and isinstance(node.innodes[0].layer, (TConvLayer,
TDenseLayer)):
continue
# Account for timesteps here since the spikerate has been averaged over timesteps
input_spikerate = input_spikerates[node.layer.name] * timesteps
# If no If activation, no output spikes are generated so no reset operation or writing to the output queue
output_spikerate = (output_spikerates[node.layer.name] * timesteps
if len(node.outnodes) > 0 and isinstance(node.outnodes[0].layer, TIfLayer)
else 0)
leak = len(node.outnodes) > 0 and isinstance(node.outnodes[0].layer, TLifLayer)
em: EnergyMetrics | None = None
if isinstance(node.layer, TFlattenLayer):
# Flatten is assumed to not do anything for on-target inference
em = EnergyMetrics(name=node.layer.name,
mem_pot=0,
mem_weights=0,
mem_bias=0,
mem_io=0,
ops=0,
addr=0,
input_spikerate=input_spikerates[node.layer.name],
output_spikerate=output_spikerates[node.layer.name],
input_count=input_counts[node.layer.name],
output_count=output_counts[node.layer.name],
input_is_binary=input_is_binary[node.layer.name],
output_is_binary=output_is_binary[node.layer.name],
is_sj=is_module_sj[node.layer.name])
elif is_module_sj[node.layer.name]:
is_sj: bool | Literal['Hybrid']
if isinstance(node.layer, TConvLayer):
if not input_is_binary[node.layer.name]: # Non-binary dense input:
# Computed as sparse input over a single timestep but with MAC operations for membrane potentials increment
e_mem_io = (self._e_rdin_snn(node.layer, input_spikerates[node.layer.name], e_rdram)
+ self._e_wrout_snn(node.layer, output_spikerate, e_wrram))
e_ops = ((self._mac_ops_conv_fnn(node.layer)
* input_spikerates[node.layer.name] * (self._e_mul + self._e_add)) # Input * Weight MACs
+ self._acc_ops_conv_fnn(node.layer) * self._e_add # Bias
+ (math.prod(node.layer.output_shape[0][1:]) * self._e_mul if leak else 0) # Leak
+ output_spikerate * math.prod(node.layer.output_shape[0][1:]) * self._e_add) # Reset
e_addr = self._e_addr_conv_snn(node.layer, input_spikerates[node.layer.name])
mem_weights = self._e_rdweights_conv_snn(node.layer, input_spikerates[node.layer.name], e_rdram)
is_sj = 'Hybrid'
else:
e_mem_io = (self._e_rdin_snn(node.layer, input_spikerate, e_rdram)
+ self._e_wrout_snn(node.layer, output_spikerate, e_wrram))
e_ops = self._e_ops_conv_snn(node.layer, input_spikerate, output_spikerate, timesteps, leak)
e_addr = self._e_addr_conv_snn(node.layer, input_spikerate)
mem_weights = self._e_rdweights_conv_snn(node.layer, input_spikerate, e_rdram)
is_sj = True
em = EnergyMetrics(name=node.layer.name,
mem_pot=self._e_wrpot_conv_snn(node.layer, input_spikerate, timesteps, e_wrram)
+ self._e_rdpot_conv_snn(node.layer, input_spikerate, timesteps, e_rdram),
mem_weights=mem_weights,
mem_bias=self._e_rdbias_conv_snn(node.layer, timesteps, e_rdram),
mem_io=e_mem_io,
ops=e_ops,
addr=e_addr,
input_spikerate=input_spikerates[node.layer.name],
output_spikerate=output_spikerates[node.layer.name],
input_count=input_counts[node.layer.name],
output_count=output_counts[node.layer.name],
input_is_binary=input_is_binary[node.layer.name],
output_is_binary=output_is_binary[node.layer.name],
is_sj=is_sj,
)
elif isinstance(node.layer, TDenseLayer):
if not input_is_binary[node.layer.name]: # Non-binary dense input:
# Computed as sparse input over a single timestep but with MAC operations for membrane potentials increment
e_mem_io = (self._e_rdin_snn(node.layer, input_spikerates[node.layer.name], e_rdram)
+ self._e_wrout_snn(node.layer, output_spikerate, e_wrram))
e_ops = ((self._mac_ops_fc_fnn(node.layer)
* input_spikerates[node.layer.name] * (self._e_mul + self._e_add)) # Input * Weight MACs
+ self._acc_ops_fc_fnn(node.layer) * self._e_add # Bias
+ (math.prod(node.layer.output_shape[0][1:]) * self._e_mul if leak else 0) # Leak
+ output_spikerate * math.prod(node.layer.output_shape[0][1:]) * self._e_add) # Reset
e_addr = self._e_addr_fc_snn(node.layer, input_spikerates[node.layer.name])
mem_weights = self._e_rdweights_fc_snn(node.layer, input_spikerates[node.layer.name], e_rdram)
is_sj = 'Hybrid'
else:
e_mem_io = (self._e_rdin_snn(node.layer, input_spikerate, e_rdram)
+ self._e_wrout_snn(node.layer, output_spikerate, e_wrram))
e_ops = self._e_ops_fc_snn(node.layer, input_spikerate, output_spikerate, timesteps, leak)
e_addr = self._e_addr_fc_snn(node.layer, input_spikerate)
mem_weights = self._e_rdweights_fc_snn(node.layer, input_spikerate, e_rdram)
is_sj = True
em = EnergyMetrics(name=node.layer.name,
mem_pot=self._e_wrpot_fc_snn(node.layer, input_spikerate, timesteps, e_wrram)
+ self._e_rdpot_fc_snn(node.layer, input_spikerate, timesteps, e_wrram),
mem_weights=mem_weights,
mem_bias=self._e_rdbias_fc_snn(node.layer, timesteps, e_rdram),
mem_io=e_mem_io,
ops=e_ops,
addr=e_addr,
input_spikerate=input_spikerates[node.layer.name],
output_spikerate=output_spikerates[node.layer.name],
input_count=input_counts[node.layer.name],
output_count=output_counts[node.layer.name],
input_is_binary=input_is_binary[node.layer.name],
output_is_binary=output_is_binary[node.layer.name],
is_sj=is_sj,
)
elif isinstance(node.layer, TAddLayer):
em = EnergyMetrics(name=node.layer.name,
mem_pot=0,
mem_weights=0,
mem_bias=0,
mem_io=0,
ops=0,
addr=0,
input_spikerate=input_spikerates[node.layer.name],
output_spikerate=output_spikerates[node.layer.name],
input_count=input_counts[node.layer.name],
output_count=output_counts[node.layer.name],
input_is_binary=input_is_binary[node.layer.name],
output_is_binary=output_is_binary[node.layer.name],
is_sj=is_module_sj[node.layer.name],
)
else: # noqa: PLR5501 keep separate if for clarity and consistency
if isinstance(node.layer, TConvLayer):
em = EnergyMetrics(name=node.layer.name,
mem_pot=0,
mem_weights=self._e_rdweights_conv_fnn(node.layer, e_rdram),
mem_bias=self._e_rdbias_conv_fnn(node.layer, e_rdram),
mem_io=self._e_rdin_conv_fnn(node.layer, e_rdram)
+ self._e_wrout_conv_fnn(node.layer, e_wrram),
ops=self._e_ops_conv_fnn(node.layer),
addr=self._e_addr_conv_fnn(node.layer),
input_spikerate=input_spikerates[node.layer.name],
output_spikerate=output_spikerates[node.layer.name],
input_count=input_counts[node.layer.name],
output_count=output_counts[node.layer.name],
input_is_binary=input_is_binary[node.layer.name],
output_is_binary=output_is_binary[node.layer.name],
is_sj=is_module_sj[node.layer.name],
)
elif isinstance(node.layer, TDenseLayer):
em = EnergyMetrics(name=node.layer.name,
mem_pot=0,
mem_weights=self._e_rdweights_fc_fnn(node.layer, e_rdram),
mem_bias=self._e_rdbias_fc_fnn(node.layer, e_rdram),
mem_io=self._e_rdin_fc_fnn(node.layer, e_rdram) + self._e_wrout_fc_fnn(node.layer, e_wrram),
ops=self._e_ops_fc_fnn(node.layer),
addr=self._e_addr_fc_fnn(node.layer),
input_spikerate=input_spikerates[node.layer.name],
output_spikerate=output_spikerates[node.layer.name],
input_count=input_counts[node.layer.name],
output_count=output_counts[node.layer.name],
input_is_binary=input_is_binary[node.layer.name],
output_is_binary=output_is_binary[node.layer.name],
is_sj=is_module_sj[node.layer.name],
)
elif isinstance(node.layer, TAddLayer):
em = EnergyMetrics(name=node.layer.name,
mem_pot=0,
mem_weights=0,
mem_bias=0,
mem_io=self._e_rdin_add_fnn(node.layer, e_rdram)
+ self._e_wrout_add_fnn(node.layer, e_wrram),
ops=self._e_ops_add_fnn(node.layer),
addr=self._e_addr_add_fnn(node.layer),
input_spikerate=input_spikerates[node.layer.name],
output_spikerate=output_spikerates[node.layer.name],
input_count=input_counts[node.layer.name],
output_count=output_counts[node.layer.name],
input_is_binary=input_is_binary[node.layer.name],
output_is_binary=output_is_binary[node.layer.name],
is_sj=is_module_sj[node.layer.name],
)
if em is None: # We do not know how to handle this layer, set energy values to 0
logger.warning('%s not handled, result may be inaccurate', node.layer.name)
em = EnergyMetrics(name=node.layer.name,
mem_pot=0,
mem_weights=0,
mem_bias=0,
mem_io=0,
ops=0,
addr=0,
input_spikerate=input_spikerates[node.layer.name],
output_spikerate=output_spikerates[node.layer.name],
input_count=input_counts[node.layer.name],
output_count=output_counts[node.layer.name],
input_is_binary=input_is_binary[node.layer.name],
output_is_binary=output_is_binary[node.layer.name],
is_sj=is_module_sj[node.layer.name])
ems.append(em)
total_is_sj: bool | Literal['Hybrid'] = (True if all(is_sj is True for is_sj in is_module_sj.values()) else
False if all(not is_sj for is_sj in is_module_sj.values()) else
'Hybrid')
em_total = EnergyMetrics(name='Total',
mem_pot=sum(em.mem_pot for em in ems),
mem_weights=sum(em.mem_weights for em in ems),
mem_bias=sum(em.mem_bias for em in ems),
mem_io=sum(em.mem_io for em in ems),
ops=sum(em.ops for em in ems),
addr=sum(em.addr for em in ems),
input_spikerate=input_spikerates['__TOTAL__'],
output_spikerate=output_spikerates['__TOTAL__'],
input_count=input_counts['__TOTAL__'],
output_count=output_counts['__TOTAL__'],
input_is_binary=input_is_binary[modelgraph.nodes[1].layer.name], # First layer after 'input'
output_is_binary=output_is_binary[modelgraph.nodes[-1].layer.name],
is_sj=total_is_sj,
)
ems.append(em_total)
return ems
[docs]
def _energy_summary(self, ems: list[EnergyMetrics]) -> str:
"""Generate a human-friendly text summary of the energy metrics per layer.
:meta public:
:param ems: List of EnergyMetrics per layer and the total
:return: The text summary
"""
pad = 11
pad_name = max(len(em.name) for em in ems)
header = f'{"Layer": <{pad_name}} | {"EMemPot": <{pad}} | {"EMemWeights": <{pad}} | {"EMemBias": <{pad}} |'
header += f' {"EMemIO": <{pad}} | {"EMemTotal": <{pad}} | {"EOps": <{pad}} | {"EAddr": <{pad}} | {"EOpsAddr": <{pad}} |'
header += f' {"ETotal": <{pad}} | {"SNN": <{pad}} | {"Input Spike Rate": <{pad}} | {"Output Spike Rate": <{pad}}\n'
s = '—' * len(header) + '\n'
s += header
s += '—' * len(header) + '\n'
for i, em in enumerate(ems):
# Print in nJ, original values are in pJ
s += f'{em.name: <{pad_name}.{pad_name}} | {em.mem_pot/1000: <{pad}.4} | {em.mem_weights/1000: <{pad}.4} |'
s += f' {em.mem_bias/1000: <{pad}.4} | {em.mem_io/1000: <{pad}.4} | {em.mem_total/1000: <{pad}.4} |'
s += f' {em.ops/1000: <{pad}.4} | {em.addr/1000: <{pad}.4} |'
s += f' {em.opsaddr/1000: <{pad}.4} | {em.total/1000: <{pad}.4} |'
s += f' {em.is_sj!s: <{pad}} |'
input_spikerate_pad = 16 if em.input_is_binary else 11
if em.input_spikerate is not None:
s += f' {em.input_spikerate: <{input_spikerate_pad}.4}'
else:
s += f' {"N/A": <{input_spikerate_pad}.16}'
if not em.input_is_binary:
s += ' (NB)'
s += ' |'
output_spikerate_pad = 16 if em.output_is_binary else 11
if em.output_spikerate is not None:
s += f' {em.output_spikerate: <{output_spikerate_pad}.4}'
else:
s += f' {"N/A": <{output_spikerate_pad}.16}'
if not em.output_is_binary:
s += ' (NB)'
s += '\n'
s += ('-' if i < len(ems) - 2 else '—') * len(header) + '\n'
s += ' NB = Non binary data'
return s
[docs]
def _record_spike_count(self,
trainresult: TrainResult,
framework: SpikingJelly,
model: SNN,
dataset: RawData) -> tuple[dict[str, bool],
dict[str, SpikeCounter],
dict[str, SpikeCounter]]:
"""Compute spike rate averaged over each neuron and timestep.
A forward hook is added to each layer in order to record its input and output during inference performed over the given
dataset.
The layer name is computed using the internal :mod:`torch.fx` functions :meth:`torch.fx.graph._Namespace.create_name`
and :meth:`torch.fx.graph._snake_case` so that it is compatible with
:class:`qualia_codegen_core.graph.ModelGraph.ModelGraph`.
:meta public:
:param trainresult: TrainResult containing configuration for :class:`pytorch_lightning.Trainer` such as
:attr:`microai_core.microai.TrainResult.batch_size`
and :attr:`microai_core.microai.TrainResult.dataaugmentations`
:param framework: A :class:`microai_plugin_snn.learningframework.SpikingJelly.SpikingJelly` instance
:param model: A model to evaluate spike rate on
:param dataset: The data to use for inference
:return: A tuple of 4 dicts, first 2 dicts map a layer name to its input and output spike rates per timestep respectively,
the other 2 dicts map a layer name to ``True`` if the input or outputs are binary respectively,
``False`` otherwise.
"""
import spikingjelly.activation_based.base as sjb # type: ignore[import-untyped]
import torch
from torch.fx.graph import _Namespace, _snake_case
namespace = _Namespace() # type: ignore[no-untyped-call]
if_inputs_spike_count_and_size: dict[str, SpikeCounter] = {}
if_outputs_spike_count_and_size: dict[str, SpikeCounter] = {}
is_module_sj: dict[str, bool] = {} # Is this module a SpikingJelly module and therefore we should compute as an SNN layer
def is_binary(a: torch.Tensor) -> bool:
return torch.equal(a, (a > 0).float())
def is_sj(m: nn.Module) -> bool:
# Module is assumed to be a SpikingJelly module if it inherits from StepModule
return isinstance(m, sjb.StepModule)
def hook(layername: str, module: nn.Module, x: torch.Tensor, output: torch.Tensor) -> None:
# Concatenate in last dim to handle Add layer
input_cat = (torch.cat(cast('tuple[torch.Tensor]', x), dim=-1) if isinstance(x, tuple) else x)
inputnp = input_cat.count_nonzero().item()
outputnp = output.count_nonzero().item()
# Used for special case of ResNet where spikes are not binary
input_sum = input_cat.sum().item()
output_sum = output.sum().item()
# In case of multi-step mode, apply number of timesteps here to compute average over timestep
# since we do not loop multiple times over the sample unlike single-step mode
nb_sample = (math.prod(input_cat.shape[0:2])
if isinstance(module, sjb.StepModule) and module.step_mode == 'm'
else input_cat.shape[0])
is_module_sj[layername] = is_sj(module)
if layername not in if_inputs_spike_count_and_size:
if_inputs_spike_count_and_size[layername] = SpikeCounter(spike_count=inputnp,
tensor_sum=input_sum,
size=input_cat.numel(),
binary=is_binary(input_cat),
sample_count=nb_sample)
else:
if_inputs_spike_count_and_size[layername].spike_count += inputnp
if_inputs_spike_count_and_size[layername].tensor_sum += input_sum
if_inputs_spike_count_and_size[layername].size += input_cat.numel()
if_inputs_spike_count_and_size[layername].binary &= is_binary(input_cat)
if_inputs_spike_count_and_size[layername].sample_count += nb_sample
if layername not in if_outputs_spike_count_and_size:
if_outputs_spike_count_and_size[layername] = SpikeCounter(spike_count=outputnp,
tensor_sum=output_sum,
size=output.numel(),
binary=is_binary(output),
sample_count=nb_sample)
else:
if_outputs_spike_count_and_size[layername].spike_count += outputnp
if_outputs_spike_count_and_size[layername].tensor_sum += output_sum
if_outputs_spike_count_and_size[layername].size += output.numel()
if_outputs_spike_count_and_size[layername].binary &= is_binary(output)
if_outputs_spike_count_and_size[layername].sample_count += nb_sample
handles = [layer.register_forward_hook(functools.partial(hook,
namespace.create_name(_snake_case(layername), layer)))
for layername, layer in cast('Generator[tuple[str, nn.Module], None, None]', model.named_modules())
if not isinstance(layer, Quantizer)] # No hook to register for Quantizer module
# Implement framework.predict()
_ = framework.predict(model=model,
dataset=dataset,
batch_size=trainresult.batch_size,
dataaugmentations=trainresult.dataaugmentations,
experimenttracking=None)
for handle in handles:
handle.remove()
return is_module_sj, if_inputs_spike_count_and_size, if_outputs_spike_count_and_size
def _compute_layer_spikerates(self,
if_inputs_spike_count_and_size: dict[str, SpikeCounter],
if_outputs_spike_count_and_size: dict[str, SpikeCounter],
timesteps: int) -> tuple[dict[str, float],
dict[str, float],
dict[str, bool],
dict[str, bool],
dict[str, Number],
dict[str, Number]]:
input_spikerates = {n: sc.spike_count / sc.size for n, sc in if_inputs_spike_count_and_size.items()}
output_spikerates = {n: sc.spike_count / sc.size for n, sc in if_outputs_spike_count_and_size.items()}
# Dict for the input counts of each layers divided by the batch size
input_counts = {n: timesteps * sc.spike_count / sc.sample_count
for n, sc in if_inputs_spike_count_and_size.items()}
output_counts = {n: timesteps * sc.spike_count / sc.sample_count
for n, sc in if_outputs_spike_count_and_size.items()}
input_is_binary = {n: sc.binary for n, sc in if_inputs_spike_count_and_size.items()}
output_is_binary = {n: sc.binary for n, sc in if_outputs_spike_count_and_size.items()}
return (input_spikerates,
output_spikerates,
input_is_binary,
output_is_binary,
input_counts,
output_counts)
def _compute_total_spikerate(self,
if_inputs_spike_count_and_size: dict[str, SpikeCounter],
if_outputs_spike_count_and_size: dict[str, SpikeCounter],
timesteps: int,
total_exclude_nonbinary: bool = True) -> tuple[float, float, float, float]: # noqa: FBT001, FBT002
# Filter out non-binary inputs/outputs if total_exclude_binary is True
if total_exclude_nonbinary:
logger.warning('Non-binary inputs/outputs are excluded from the total spike rate computation.')
else:
logger.warning('Non-binary inputs/outputs are included in the total spike rate computation.')
filetered_if_inputs_spike_count_and_size = {n: sc for n, sc in if_inputs_spike_count_and_size.items()
if not total_exclude_nonbinary or sc.binary}
filetered_if_outputs_spike_count_and_size = {n: sc for n, sc in if_outputs_spike_count_and_size.items()
if not total_exclude_nonbinary or sc.binary}
# Special value account for the total spike rate across the whole network
input_total_spikerate = (sum(sc.spike_count for sc in filetered_if_inputs_spike_count_and_size.values())
/ sum(sc.size for sc in filetered_if_inputs_spike_count_and_size.values())
if len(filetered_if_inputs_spike_count_and_size) > 0 else 0.0)
output_total_spikerate = (sum(sc.spike_count for sc in filetered_if_outputs_spike_count_and_size.values())
/ sum(sc.size for sc in filetered_if_outputs_spike_count_and_size.values())
if len(filetered_if_outputs_spike_count_and_size) > 0 else 0.0)
# Special value account for the total count across the whole network
input_total_count = sum(timesteps * sc.spike_count / sc.sample_count
for sc in filetered_if_inputs_spike_count_and_size.values())
output_total_count = sum(timesteps * sc.spike_count / sc.sample_count
for sc in filetered_if_outputs_spike_count_and_size.values())
return input_total_spikerate, output_total_spikerate, input_total_count, output_total_count
[docs]
def _process_model(self, # noqa: C901, PLR0915
trainresult: TrainResult) -> tuple[ModelGraph | None,
dict[str, float] | None,
dict[str, float] | None,
dict[str, bool] | None,
dict[str, bool] | None,
dict[str, Number] | None,
dict[str, Number] | None,
dict[str, bool] | None]:
"""Process the model to generate layer graph and compute activity in case of SNN.
Use Qualia CodeGen in order to build a :class:`qualia_codegen_core.graph.ModelGraph.ModelGraph` that is easier to parse.
Execute inference pass on dataset using :meth:`_record_spike_count` to collect statistics of spike activity in case of SNN.
:meta public:
:param trainresult: TrainResult containing the SNN or FNN model, the dataset and the training configuration
:return: A tuple of Qualia-CodeGen's ModelGraph and various SNN metrics as dicts of layer name and associated value, in
order: input spike rate, output spike rate, input is binary, output is binary, input spike count, output spike count,
module is sj
"""
import spikingjelly.activation_based.functional as sjf # type: ignore[import-untyped]
import spikingjelly.activation_based.layer as sjl # type: ignore[import-untyped]
import spikingjelly.activation_based.neuron as sjn # type: ignore[import-untyped]
from qualia_codegen_core.graph.layers import TAddLayer, TSumLayer
from qualia_codegen_plugin_snn.graph import TorchModelGraph
from qualia_codegen_plugin_snn.graph.layers import TIfLayer
from torch import nn
import qualia_plugin_snn.learningmodel.pytorch.layers.quantized_SNN_layers as qsjn
import qualia_plugin_snn.learningmodel.pytorch.layers.spikingjelly.quantized_layers as qsjl
import qualia_plugin_snn.learningmodel.pytorch.layers.spikingjelly.quantized_layers1d as qsjl1d
import qualia_plugin_snn.learningmodel.pytorch.layers.spikingjelly.quantized_layers2d as qsjl2d
model = trainresult.model
# SpikingJelly wrapped layers
custom_layers: dict[type[nn.Module],
Callable[[nn.Module, TBaseLayer], tuple[type[TBaseLayer], list[Any]]]] = {
sjl.AvgPool1d: TorchModelGraph.MODULE_MAPPING[nn.AvgPool1d],
sjl.AvgPool2d: TorchModelGraph.MODULE_MAPPING[nn.AvgPool2d],
sjl.BatchNorm1d: TorchModelGraph.MODULE_MAPPING[nn.BatchNorm1d],
sjl.BatchNorm2d: TorchModelGraph.MODULE_MAPPING[nn.BatchNorm2d],
sjl.Conv1d: TorchModelGraph.MODULE_MAPPING[nn.Conv1d],
sjl.Conv2d: TorchModelGraph.MODULE_MAPPING[nn.Conv2d],
sjl.Dropout: TorchModelGraph.MODULE_MAPPING[nn.Dropout],
sjl.Flatten: TorchModelGraph.MODULE_MAPPING[nn.Flatten],
sjl.Linear: TorchModelGraph.MODULE_MAPPING[nn.Linear],
sjl.MaxPool1d: TorchModelGraph.MODULE_MAPPING[nn.MaxPool1d],
sjl.MaxPool2d: TorchModelGraph.MODULE_MAPPING[nn.MaxPool2d],
Add: lambda *_: (TAddLayer, []),
SNNAdd: lambda *_: (TAddLayer, []),
GlobalSumPool1d: lambda *_: (TSumLayer, [(-1,)]),
SNNGlobalSumPool1d: lambda *_: (TSumLayer, [(-1,)]),
GlobalSumPool2d: lambda *_: (TSumLayer, [(-2, -1)]),
SNNGlobalSumPool2d: lambda *_: (TSumLayer, [(-2, -1)]),
QuantizedConv1d: TorchModelGraph.MODULE_MAPPING[nn.Conv1d],
QuantizedConv2d: TorchModelGraph.MODULE_MAPPING[nn.Conv2d],
QuantizedBatchNorm1d: TorchModelGraph.MODULE_MAPPING[nn.BatchNorm1d],
QuantizedBatchNorm2d: TorchModelGraph.MODULE_MAPPING[nn.BatchNorm2d],
QuantizedLinear: TorchModelGraph.MODULE_MAPPING[nn.Linear],
QuantizedIdentity: TorchModelGraph.MODULE_MAPPING[nn.Identity],
qsjl.QuantizedLinear: TorchModelGraph.MODULE_MAPPING[nn.Linear],
SNNQuantizedAdd: lambda *_: (TAddLayer, []),
qsjl1d.QuantizedBatchNorm1d: TorchModelGraph.MODULE_MAPPING[nn.BatchNorm1d],
qsjl2d.QuantizedBatchNorm2d: TorchModelGraph.MODULE_MAPPING[nn.BatchNorm2d],
qsjl1d.QuantizedConv1d: TorchModelGraph.MODULE_MAPPING[nn.Conv1d],
qsjl2d.QuantizedConv2d: TorchModelGraph.MODULE_MAPPING[nn.Conv2d],
qsjl1d.QuantizedMaxPool1d: TorchModelGraph.MODULE_MAPPING[nn.MaxPool1d],
qsjl2d.QuantizedMaxPool2d: TorchModelGraph.MODULE_MAPPING[nn.MaxPool2d],
qsjn.QuantizedIFNode: TorchModelGraph.MODULE_MAPPING[sjn.IFNode],
qsjn.QuantizedLIFNode: TorchModelGraph.MODULE_MAPPING[sjn.LIFNode],
QuantizedAdd: lambda *_: (TAddLayer, []),
QuantizedGlobalSumPool1d: lambda *_: (TSumLayer, [(-1,)]),
SNNQuantizedGlobalSumPool1d: lambda *_: (TSumLayer, [(-1,)]),
QuantizedGlobalSumPool2d: lambda *_: (TSumLayer, [(-2, -1)]),
SNNQuantizedGlobalSumPool2d: lambda *_: (TSumLayer, [(-2, -1)]),
}
orig_step_mode: str = getattr(model, 'step_mode', '')
if (getattr(model, 'is_snn', False) or isinstance(model, SNN)) and model.step_mode != 's':
logger.warning("Setting model.step_mode to 's' for tracing by Qualia-CodeGen")
sjf.set_step_mode(model, step_mode='s')
modelgraph = TorchModelGraph(model).convert(custom_layers=custom_layers)
if modelgraph is None:
logger.error('Model graph conversion failed')
return None, None, None, None, None, None, None, None
logger.info('ModelGraph:\n%s', modelgraph)
if getattr(model, 'step_mode', '') != orig_step_mode:
logger.info("Reverting back to original step_mode='%s'", orig_step_mode)
sjf.set_step_mode(model, step_mode=orig_step_mode)
if getattr(model, 'is_snn', False) or isinstance(model, SNN):
if not isinstance(trainresult.framework, SpikingJelly):
logger.error('LearningFramework must be SpikingJelly or a derived class, got: %s', type(trainresult.framework))
raise TypeError
if not self._fifo_size:
logger.error('fifo_size is required for SNN models')
raise ValueError
# Tuple unpacking
(is_module_sj,
if_inputs_spike_count_and_size,
if_outputs_spike_count_and_size) = self._record_spike_count(trainresult,
framework=trainresult.framework,
model=model,
dataset=trainresult.testset)
# SNN model post-processing
for node in modelgraph.nodes:
# Copy spikerate from IF activation layer to previous layer since it is the one energy is computed on
# Also set previous layer to be computed as an SNN layer
# And rename the previous layer to include the IF layer name
if isinstance(node.layer, TIfLayer):
in_name = node.innodes[0].layer.name
new_in_name = in_name + node.layer.name
if_outputs_spike_count_and_size[new_in_name] = if_outputs_spike_count_and_size[node.layer.name]
# Rename previous layer
if_inputs_spike_count_and_size[new_in_name] = if_inputs_spike_count_and_size[in_name]
is_module_sj[new_in_name] = is_module_sj[in_name]
node.innodes[0].layer.name = new_in_name
if isinstance(node.layer, TAddLayer): # noqa: SIM102 if statements separated for clarity and documentation
# Special case for Add to handle SResNet, only for Add layer with full "binary" inputs
# Includes Add layers that may have had their input connected to a previous Add layer
# that switched to "binary" during this very same process.
if if_inputs_spike_count_and_size[node.layer.name].binary:
# If inputs are binary, output is defined as binary even with accumulated spikes
output_spikecounter = if_outputs_spike_count_and_size[node.layer.name]
output_spikecounter.spike_count = output_spikecounter.tensor_sum
output_spikecounter.binary = True
for outnode in node.outnodes:
# Check for special case of Add that can branch into another Add
# with multiple inputs that may or may not all be spikes
# If they are all spikes the "last" Add input would trigger the change to assuming binary input
if all(if_outputs_spike_count_and_size[outnodeinnode.layer.name].binary
for outnodeinnode in outnode.innodes):
input_spikecounter = if_inputs_spike_count_and_size[outnode.layer.name]
# Non-binary integers translate to multiple spikes, so use sum of tensor
input_spikecounter.spike_count = input_spikecounter.tensor_sum
# But still assume input is spike
input_spikecounter.binary = True
(input_spikerates,
output_spikerates,
input_is_binary,
output_is_binary,
input_counts,
output_counts) = self._compute_layer_spikerates(if_inputs_spike_count_and_size,
if_outputs_spike_count_and_size,
timesteps=model.timesteps)
# Compute total only after the post-processing steps as it changes the layer spikerates
(input_spikerates['__TOTAL__'],
output_spikerates['__TOTAL__'],
input_counts['__TOTAL__'],
output_counts['__TOTAL__']) = self._compute_total_spikerate(if_inputs_spike_count_and_size,
if_outputs_spike_count_and_size,
timesteps=model.timesteps,
total_exclude_nonbinary=self._total_spikerate_exclude_nonbinary)
return (modelgraph,
input_spikerates,
output_spikerates,
input_is_binary,
output_is_binary,
input_counts,
output_counts,
is_module_sj)
return (modelgraph,
None,
None,
None,
None,
None,
None,
None)
[docs]
@override
def __call__(self,
trainresult: TrainResult,
model_conf: ModelConfigDict) -> tuple[TrainResult, ModelConfigDict]:
"""Compute energy estimation metric from Lemaire et al, 2022.
Uses Qualia CodeGen in order to build a :class:`qualia_codegen_core.graph.ModelGraph.ModelGraph` that is easier to parse.
Call either :meth:`_compute_model_energy_snn` or :meth:`_compute_model_energy_fnn` depending on whether the model is an
SNN or an FNN.
Print the resulting metrics and log them to a CSV file inside the `logs/<bench.name>/EnergyEstimationMetric` directory.
:meta public:
:param trainresult: TrainResult containing the SNN or FNN model, the dataset and the training configuration
:param model_conf: Unused
:return: The unmodified trainresult
"""
(modelgraph,
input_spikerates,
output_spikerates,
input_is_binary,
output_is_binary,
input_counts,
output_counts,
is_module_sj) = self._process_model(trainresult=trainresult)
if modelgraph is None:
return trainresult, model_conf
def e_rdram(x: int) -> float:
return self._e_ram(x, self._mem_width)
def e_wrram(x: int) -> float:
return self._e_ram(x, self._mem_width)
logger.info('Memory width set to %s bits', self._mem_width)
if getattr(trainresult.model, 'is_snn', False) or isinstance(trainresult.model, SNN):
if (input_spikerates is None
or output_spikerates is None
or input_is_binary is None
or output_is_binary is None
or input_counts is None
or output_counts is None
or is_module_sj is None):
logger.error('SNN model detected but one of the SNN metric could not be computed')
raise RuntimeError
ems = self._compute_model_energy_snn(modelgraph,
input_spikerates,
output_spikerates,
input_is_binary,
output_is_binary,
input_counts,
output_counts,
is_module_sj,
trainresult.model.timesteps,
e_rdram,
e_wrram)
else:
ems = self._compute_model_energy_fnn(modelgraph,
e_rdram,
e_wrram)
# Initialize CSV suffix with the memory width, self_m_e_add, self_m_e_mul, op_estimation_type and sram_estimation_type
# without "{", "}", ":", ",", "'" or " " characters
suffix = f'_{trainresult.name}_mem{self._mem_width}bit_{self._m_e_add}_{self._m_e_mul}'
if self._op_estimation_type is not None:
suffix += f'_{self._op_estimation_type}'
else:
suffix += '_ICONIP'
if self._sram_estimation_type is not None:
suffix += f'_sram{self._sram_estimation_type}'
else:
suffix += '_sramICONIP'
suffix = suffix.replace('{', '').replace('}', '').replace(':', '').replace(',', '').replace("'", '').replace(' ', '')
csvlogger: Logger[EnergyEstimationMetricLoggerFields] = Logger(name='EnergyEstimationMetric',
suffix=suffix + '.csv',
formatter=CSVFormatter())
csvlogger.fields = EnergyEstimationMetricLoggerFields
for em in ems:
csvlogger(em.asnamedtuple())
logger.info(('Estimated model energy consumption for one inference on 45nm ASIC (nJ)'
' and spike rate per neuron per timestep:\n%s'),
self._energy_summary(ems))
return trainresult, model_conf
[docs]
def _e_ram(self, mem_params: int, bits: int) -> float:
"""Energy for a single RAM access (read or write).
If :attr:`_sram_estimation_type` is 'new', use T. Louis method with multiple data packed over a single 10pJ 64-bit access,
regardless of total memory size.
Otherwise, use ICONIP2022 method with a single access for each data with energy proportional to total memory size, computed
with a linear regression over Horowitz 2014 values of 8KiB, 32KiB and 1MiB SRAM.
:meta public:
:param mem_params: Number of element stored in this memory
:param bits: Data width in bits
:return: Energy for a single read or write access in this memory
"""
# Case where we use 8KiB SRAM blocks with a 64-bit data bus
if self._sram_estimation_type == 'new':
# Data bus width (64 bits)
bus_width = 64
# Energy consumption per access (10 pJ for 8KiB/65536 bits SRAM blocks)
conso_per_bus_pj = 10
# cost for one access
return conso_per_bus_pj / (bus_width // bits)
# Simple linear formula
# From Computation_cost_metric.ipynb 45nm SRAM.
return 1.09 * 10**(-5) * mem_params * bits + 13.2
@property
def _e_add(self) -> float:
"""Energy for a single add operation.
:meta public:
"""
return self._m_e_add
@property
def _e_mul(self) -> float:
"""Energy for a single mul operation.
:meta public:
"""
return self._m_e_mul