Source code for qualia_codegen_core.Converter

# Copyright 2021 (c) Pierre-Emmanuel Novac <penovac@unice.fr> Université Côte d'Azur, CNRS, LEAT. All rights reserved.

from __future__ import annotations

import logging
import sys
from importlib.resources import files
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple, cast

import jinja2

from .Allocator import Allocator
from .DataConverter import DataConverter
from .graph import layers
from .graph.layers.TActivationLayer import TActivation, TActivationLayer
from .Quantizer import Quantizer
from .Validator import Validator

if TYPE_CHECKING:
    from .graph.LayerNode import LayerNode
    from .graph.layers.TBaseLayer import TBaseLayer
    from .graph.ModelGraph import ModelGraph

logger = logging.getLogger(__name__)

[docs] class NumberType(NamedTuple): number_type: type[int | float] width: int long_width: int min_val: int max_val: int
[docs] class Converter: layer_template_files: ClassVar[dict[type[TBaseLayer], str | None]] = { # Standard layers layers.TAvgPooling1DLayer: 'averagepool1d', layers.TAvgPooling2DLayer: 'averagepool2d', layers.TConv1DLayer: 'conv1d', layers.TConv2DLayer: 'conv2d', layers.TDenseLayer: 'fc', layers.TMaxPooling1DLayer: 'maxpool1d', layers.TMaxPooling2DLayer: 'maxpool2d', layers.TActivationLayer: 'activation', layers.TFlattenLayer: 'flatten', layers.TBatchNormalization1DLayer: 'batchnorm1d', layers.TBatchNormalization2DLayer: 'batchnorm2d', layers.TInputLayer: None, # Nothing to generate for input layer # Custom Qualia layers layers.TAddLayer: 'add', layers.TSumLayer: 'sum', # Global Sum Pooling } TEMPLATE_PATH = files('qualia_codegen_core.assets') def __init__(self, output_path: Path | None = None) -> None: super().__init__() self.validator = Validator() self.dataconverter = DataConverter() if output_path: self.output_path = output_path self.output_path_header = output_path / 'include' self.output_path_weights = output_path / 'weights' self.output_path.mkdir(parents=True, exist_ok=True) self.output_path_header.mkdir(parents=True, exist_ok=True) self.output_path_weights.mkdir(parents=True, exist_ok=True) self.write_file = True else: self.output_path = Path() self.output_path_header = Path() self.output_path_weights = Path() self.write_file = False self.number_types = {NumberType(int, 32, 64, -(2 ** (32 - 1)), 2 ** (32 - 1) - 1)} self._template_path: list[Path] | None = None if isinstance(Converter.TEMPLATE_PATH, Path): # Already Path objected, no need for hackery self._template_path = [Converter.TEMPLATE_PATH] elif sys.version_info >= (3, 10): # Python 3.10 may return MultiplexedPath from importlib.readers import MultiplexedPath if isinstance(Converter.TEMPLATE_PATH, MultiplexedPath): self._template_path = [Converter.TEMPLATE_PATH / ''] # / operator applies to underlying Path
[docs] def weights2carray(self, node: LayerNode) -> dict[str, dict[str, str | tuple[int, ...]]]: return {name: self.dataconverter.tensor2carray(arr, f'{node.layer.name}_{name}') for name, arr in node.layer.weights.items()}
[docs] def write_layer_function(self, template: str, node: LayerNode) -> str: return self.render_template('layers/' + template + '.cc', self.output_path / f'{node.layer.name}.c', node=node, qtype2ctype=self.dataconverter.qtype2ctype)
[docs] def write_layer_header(self, template: str, node: LayerNode) -> str: return self.render_template('include/layers/' + template + '.hh', self.output_path_header / f'{node.layer.name}.h', node=node, qtype2ctype=self.dataconverter.qtype2ctype)
[docs] def write_layer_weights(self, template: str, node: LayerNode) -> str: return self.render_template('layers/weights/' + template + '.cc', self.output_path_weights / f'{node.layer.name}.c', node=node, weights=self.weights2carray(node))
[docs] def render_template(self, name: str, out: Path, **kwargs: Any) -> str: # noqa: ANN401 We really want to be able to pass anything to the rendered template if self._template_path is None: return '' template = jinja2.Environment(loader=jinja2.FileSystemLoader(searchpath=self._template_path), autoescape=jinja2.select_autoescape()).get_template(name) rendered = template.render(**kwargs) if self.write_file: with out.open('w', encoding='utf-8') as f: _ = f.write(rendered) return rendered
[docs] def write_model_header(self, modelgraph: ModelGraph) -> str: return self.render_template('include/model.hh', self.output_path_header / 'model.h', nodes=modelgraph.nodes, qtype2ctype=self.dataconverter.qtype2ctype)
[docs] def write_model(self, modelgraph: ModelGraph, allocation: dict[str, list[list[LayerNode]] | dict[LayerNode, int]] | None) -> str: return self.render_template('model.cc', self.output_path / 'model.c', nodes=modelgraph.nodes, allocation=allocation, qtype2ctype=self.dataconverter.qtype2ctype)
[docs] def write_numeric_header(self) -> str: return self.render_template('include/number.hh', self.output_path_header / 'number.h', number_types=self.number_types, qtype2ctype=self.dataconverter.qtype2ctype)
[docs] def write_defines_header(self, modelgraph: ModelGraph) -> str: return self.render_template('include/defines.hh', self.output_path_header / 'defines.h', nodes=modelgraph.nodes)
[docs] def combine_zeropadding(self, modelgraph: ModelGraph) -> ModelGraph | None: zeropaddingnodes = [node for node in modelgraph.nodes if isinstance(node.layer, layers.TZeroPaddingLayer)] for zeropaddingnode in zeropaddingnodes: for outnode in zeropaddingnode.outnodes: if not hasattr(outnode.layer, 'padding'): logger.error('Cannot fuse pading: "%s" does not have a padding attribute', outnode.layer.name) return None # Double check since set doesn't contain layer type if not isinstance(zeropaddingnode.layer, layers.TZeroPaddingLayer): return None outnode.layer.padding = zeropaddingnode.layer.padding outnode.layer.input_shape = zeropaddingnode.layer.input_shape modelgraph.delete_node(zeropaddingnode) return modelgraph
[docs] def remove_dropout(self, modelgraph: ModelGraph) -> ModelGraph: dropoutnodes = [node for node in modelgraph.nodes if isinstance(node.layer, layers.TDropoutLayer)] for dropoutnode in dropoutnodes: modelgraph.delete_node(dropoutnode) return modelgraph
[docs] def combine_relu(self, modelgraph: ModelGraph) -> ModelGraph | None: relunodes = [node for node in modelgraph.nodes if isinstance(node.layer, layers.TActivationLayer) and node.layer.activation in [TActivation.RELU, TActivation.RELU6]] for relunode in relunodes: for innode in relunode.innodes: # warning: activations_range unsupported with multiple inputs to relu if not hasattr(innode.layer, 'activation'): logger.error('Cannot fuse activation: "%s" does not have an activation attribute', innode.layer.name) return None innode.layer.activation = cast(TActivationLayer, relunode.layer).activation innode.q.output_scale_factor = relunode.q.output_scale_factor modelgraph.delete_node(relunode) return modelgraph
[docs] def remove_identity(self, modelgraph: ModelGraph) -> ModelGraph: identitynodes = [node for node in modelgraph.nodes if isinstance(node.layer, layers.TIdentityLayer)] for identitynode in identitynodes: modelgraph.delete_node(identitynode) return modelgraph
# Operators (Add…) layers have names invalid as C tokens
[docs] def rename_operators(self, modelgraph: ModelGraph) -> ModelGraph: for node in modelgraph.nodes: node.layer.name = node.layer.name.replace('.', '') return modelgraph
[docs] def optimize_modelgraph(self, modelgraph: ModelGraph) -> ModelGraph | None: # Remove Indentity layers, useless modelgraph_no_identity = self.remove_identity(modelgraph) # Remove Dropout layers, useless during inference modelgraph_no_dropout = self.remove_dropout(modelgraph_no_identity) # Combine ZeroPadding with next layer (Conv1D) modelgraph_combined_zeropadding = self.combine_zeropadding(modelgraph_no_dropout) if modelgraph_combined_zeropadding is None: return None # Combine ReLU with previous layer (Conv1D/Dense), activations range must be copied to previous layer return self.combine_relu(modelgraph_combined_zeropadding)
[docs] def preprocess_modelgraph(self, modelgraph: ModelGraph) -> ModelGraph | None: logger.info('ModelGraph:\n%s', modelgraph) optimized_modelgraph = self.optimize_modelgraph(modelgraph) if optimized_modelgraph is None: logger.error('Could not optimize ModelGraph') return None logger.info('ModelGraph after optimization:\n%s', optimized_modelgraph) graphviz = optimized_modelgraph.graphviz() if graphviz: logger.info('Graphviz: %s', graphviz) # Rename operator layers that are not valid identifiers for C return self.rename_operators(optimized_modelgraph)
[docs] def validate_modelgraph(self, modelgraph: ModelGraph) -> bool: return all(self.validator.validate_node(node) for node in modelgraph.nodes)
[docs] def quantize_modelgraph(self, modelgraph: ModelGraph) -> bool: for node in modelgraph.nodes: if node.q.number_type is None or node.q.width is None or node.q.long_width is None: logger.error('Missing quantization information for "%s"', node.layer.name) return False # Apply weights quantization for each layer with fixed point and weights if node.q.number_type == int and hasattr(node.layer, 'weights'): quantizer = Quantizer(width=node.q.width) if not quantizer.quantize_weights(node): logger.error('Weights quantization failed for "%s"', node.layer.name) return False # Add type layer in type list t = NumberType(node.q.number_type, node.q.width, node.q.long_width, -(2 ** (node.q.width - 1)), 2 ** (node.q.width - 1) - 1) self.number_types.add(t) return True
[docs] def generate_code(self, modelgraph: ModelGraph, allocation: dict[str, list[list[LayerNode]] | dict[LayerNode, int]]) -> str | None: # Used to ignore includes in generated files for combined returned code rendered = '#define SINGLE_FILE\n' # Write defines.h global defines rendered += self.write_defines_header(modelgraph) # Write number.h numeric type configuration rendered += self.write_numeric_header() for node in modelgraph.nodes: template = self.layer_template_files[node.layer.__class__] # Skip layers with no code to generate if template is None: continue rendered += self.write_layer_header(template=template, node=node) + '\n' rendered += self.write_layer_function(template=template, node=node) + '\n' if hasattr(node.layer, 'weights') and len(node.layer.weights) > 0: rendered += self.write_layer_weights(template=template, node=node) + '\n' rendered += self.write_model_header(modelgraph=modelgraph) + '\n' rendered += self.write_model(modelgraph=modelgraph, allocation=allocation) + '\n' return rendered
[docs] def convert_model(self, modelgraph: ModelGraph) -> str | None: if self._template_path is None: logger.error('Could not discover template path from module') return None final_modelgraph = self.preprocess_modelgraph(modelgraph) if final_modelgraph is None: logger.error('Could not preprocess ModelGraph') return None if not self.validate_modelgraph(final_modelgraph): logger.error('ModelGraph validation failed') return None if not self.quantize_modelgraph(final_modelgraph): return None allocator = Allocator() allocation = allocator(modelgraph) if not allocation: logger.error('Allocation failed') return None return self.generate_code(final_modelgraph, allocation)