Skip to content
Snippets Groups Projects
test_minimize_bit_width.py 12.6 KiB
Newer Older
# Copyright (C) 2023, Advanced Micro Devices, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of FINN nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import pytest
import numpy as np
from typing import Optional, Union
from onnx import TensorProto, helper
from qonnx.core.datatype import DataType, IntType, BipolarType
from qonnx.core.modelwrapper import ModelWrapper
from qonnx.custom_op.registry import getCustomOp
from qonnx.util.basic import (
    gen_finn_dt_tensor,
    roundup_to_integer_multiple
)

from finn.custom_op.fpgadataflow.vectorvectoractivation import VectorVectorActivation
from finn.custom_op.fpgadataflow.matrixvectoractivation import MatrixVectorActivation
from finn.transformation.fpgadataflow.minimize_weight_bit_width import MinimizeWeightBitWidth
from finn.transformation.fpgadataflow.minimize_accumulator_width import MinimizeAccumulatorWidth


def make_unit_test_model(wdt: DataType, idt: DataType, tdt: Optional[DataType] = None):
    """Creates a toy finn-onnx model for unit testing. The VVAU-MVAU pair is based
    on the first pair of MobileNetV1"""
    inp = helper.make_tensor_value_info("inp", TensorProto.FLOAT, [1, 32, 32, 288])
    outp = helper.make_tensor_value_info("outp", TensorProto.FLOAT, [1, 32, 32, 64])
    layer1 = helper.make_node(
        "VectorVectorActivation",
        ["inp", "params0", "thresh0"] if tdt is not None else ["inp", "params0"],
        ["hid"],
        domain="finn.custom_op.fpgadataflow",
        backend="fpgadataflow",
        PE=1,
        Channels=32,
        Dim=(32, 32),
        Kernel=(3,3),
        inputDataType=idt.name,
        outputDataType=idt.name,
        weightDataType=wdt.name,
icolbert's avatar
icolbert committed
        ActVal=tdt.min() if tdt is not None else 0,
        noActivation=0 if tdt is not None else 1,
    )
    layer2 = helper.make_node(
        "MatrixVectorActivation",
        ["hid", "params1", "thresh1"] if tdt is not None else ["hid", "params1"],
        ["outp"],
        domain="finn.custom_op.fpgadataflow",
        backend="fpgadataflow",
        MW=32, # matrix_width (num_inputs)
        MH=64, # matrix_height (num_outputs)
        SIMD=1,
        PE=1,
        inputDataType=idt.name,
        outputDataType=idt.name,
        weightDataType=wdt.name,
icolbert's avatar
icolbert committed
        ActVal=tdt.min() if tdt is not None else 0,
        noActivation=0 if tdt is not None else 1,
        binaryXnorMode=0
    )
    graph = helper.make_graph(
        nodes=[layer1, layer2], name="fclayer_graph", inputs=[inp], outputs=[outp]
    )

    model = helper.make_model(graph, producer_name="fclayer-model")
    model = ModelWrapper(model)

    model.set_tensor_datatype("inp", idt)
    model.set_tensor_datatype("outp", idt)
    model.set_tensor_datatype("hid", idt)
    model.set_tensor_datatype("params0", wdt)
    model.set_tensor_datatype("params1", wdt)
    model.set_initializer("params0",
        gen_finn_dt_tensor(wdt, (32, 1, 3, 3))
    )
    model.set_initializer("params1",
                          gen_finn_dt_tensor(wdt, (32, 64))
    )
    # if the threshold data type is specified, then we need to generate
    # some dummy threshold values
    if tdt is not None:
        model.set_tensor_datatype("thresh0", tdt)
        model.set_tensor_datatype("thresh1", tdt)
        # Create threshold tensors
        n_steps: int = idt.get_num_possible_values() - 1
        thresholds: Optional[np.ndarray] = np.random.randint(tdt.min(), tdt.max() - 1, \
            (32, n_steps)).astype(np.float32) # generate thresholds for the activations
        thresholds = np.sort(thresholds, axis=1) # provide non-decreasing thresholds
        model.set_initializer("thresh0", thresholds)
        thresholds: Optional[np.ndarray] = np.random.randint(tdt.min(), tdt.max() - 1, \
            (64, n_steps)).astype(np.float32) # generate thresholds for the activations
        thresholds = np.sort(thresholds, axis=1) # provide non-decreasing thresholds
        model.set_initializer("thresh1", thresholds)
    return model


weight_data_types = [
    DataType['INT8'],
    DataType['UINT8'],
    DataType['INT7'],
    DataType['UINT7'],
    DataType['INT3'],
    DataType['UINT3'],
    DataType["BIPOLAR"],
    DataType["TERNARY"],
]


input_data_types = [
    DataType['INT8'],
    DataType['UINT8'],
    DataType['INT3'],
    DataType['UINT3'],
    DataType["BIPOLAR"],
    DataType["TERNARY"],
]


@pytest.mark.parametrize("wdt", weight_data_types)
@pytest.mark.parametrize("rww", [True, False])
def test_minimize_weight_bit_width(wdt: DataType, rww: bool):
    """Testing MinimizeWeightBitWidth for VVAU and MVAU.
    
    :param wdt: (DataType) The data type that we are testing for the weights
    :param rww: (bool) Whether or not to use runtime-writeable weights"""
    if isinstance(wdt, BipolarType):
        # current MinimizeWeightBitWidth sets {-1,1} to INT2, need to check
        # for 0 in weights to minimize weight bit width to bipolar
        pytest.skip("Not well-supported for this optimization")

    # Create a w8a8 model
    def_wdt = DataType['UINT8'] 
    model = make_unit_test_model(def_wdt, DataType['INT8'])
    
    # Create new weights for the model based on wdt
    params0 = gen_finn_dt_tensor(wdt, (32, 1, 3, 3))
    params1 = gen_finn_dt_tensor(wdt, (32, 64))
    model.set_initializer("params0", params0)
    model.set_initializer("params1", params1)

    # If runtime-writeable weights, specify as a node attribute
    for node in model.graph.node:
        inst = getCustomOp(node)
        if isinstance(inst, (MatrixVectorActivation, VectorVectorActivation)):
            inst.set_nodeattr("runtime_writeable_weights", int(rww))

    # Apply the optimization
    model = model.transform(MinimizeWeightBitWidth())

    # Iterate through each node to make sure it functioned properly 
    for node in model.graph.node:
        inst = getCustomOp(node)
        if isinstance(inst, (MatrixVectorActivation, VectorVectorActivation)):
            cur_wdt = DataType[inst.get_nodeattr("weightDataType")]
            exp_wdt = def_wdt if rww else wdt
            assert cur_wdt.bitwidth() == exp_wdt.bitwidth(), "Mismatched data types"
def calculate_accumulator_bit_width(
        inst: Union[MatrixVectorActivation, VectorVectorActivation],
        model: ModelWrapper
    ) -> Union[DataType, IntType]:
    """Calculate the accumulator bit width using the closed-form expressions
    derived in `Quantized Neural Networks for Low-Precision Accumulation
    with Guaranteed Overflow Avoidance` (2023) by I.Colbert, A. Pappalardo,
    J. Petri-Koenig

    :param inst: (HLSCustomOp) The instance of the MVAU or VVAU
    :param model: (ModelWrapper) The instance of the whole model
    """
    def phi(x: float) -> float:
        return np.log2(1 + pow(2, -x))

    weights = model.get_initializer(inst.onnx_node.input[1])
    # since in the calculation the values of the weight matrix are used,
    # for the bipolar case they need to be converted to bipolar
    if inst.get_nodeattr("binaryXnorMode"):
        weights = 2 * weights - 1
    # modify the weights based on if the node is a VVAU or MVAU
    if isinstance(inst, MatrixVectorActivation):
        K = inst.get_nodeattr("MW") # matrix_width = num_inputs
    elif isinstance(inst, VectorVectorActivation):
        k_h, k_w = inst.get_nodeattr("Kernel")
        K = k_h * k_w # size of kernels = num_inputs
        fm = inst.get_nodeattr("Channels")
        # put weights into the shape expected by calculate_matvec_accumulator_range
        weights = weights.reshape(fm, k_h * k_w).transpose()
    else:
        raise Exception("Considering only MVAU and VVAU currently")
    # collect attributes used to determine the accumulator bit width bound
    wdt = inst.get_weight_datatype()
    idt = inst.get_input_datatype()
    rww = inst.get_nodeattr("runtime_writeable_weights")
    # if runtime-writeable weights, then use the lower bound on the accumulator bit
    # width as determined by the input and weight data types and size of dot product
    if rww:
        alpha = np.log2(K) + idt.bitwidth() + wdt.bitwidth() - 1. - float(idt.signed())
        P = np.ceil(alpha + phi(alpha) + 1.)
    # if not runtime-writable weights, then use the tighter bound on the accumulator
    # bit width as determined by the weight values themselves
    else:
        beta = np.log2(abs(weights).sum(axis=0).max()) + idt.bitwidth() - float(idt.signed())
        P = np.ceil(beta + phi(beta) + 1.)
    # if the node is the last in the graph, then round up to the nearest 8 bits
    if model.find_direct_successors(inst.onnx_node) is None:
        P = roundup_to_integer_multiple(P, 8)
    return DataType[f"INT{int(P)}"]


thresh_data_types = [
    None,
    DataType['INT32'],
    DataType['INT24'],
    DataType['INT16'],
]


@pytest.mark.parametrize("wdt", weight_data_types)
@pytest.mark.parametrize("idt", input_data_types)
@pytest.mark.parametrize("tdt", thresh_data_types)
@pytest.mark.parametrize("rww", [True, False])
def test_minimize_accumulator_width(wdt: DataType, idt: DataType, tdt: DataType, rww: bool):
    """Testing MinimizeAccumulatorWidth for VVAU and MVAU.
    
    :param wdt: (DataType) The data type that we are testing for the weights
    :param idt: (DataType) The data type that we are testing for the activations
    :param tdt: (DataType) The data type that we are testing for the thresholds
    :param rww: (bool) Whether or not to use runtime-writeable weights"""
    if (not wdt.signed()) or isinstance(wdt, BipolarType):
        pytest.skip("Closed-form accumulator calculation is designed to consider only signed weights")

    # Create uniform-precision model
    model = make_unit_test_model(wdt, idt, tdt)
    def_adt = DataType["INT32"]

    # If runtime-writeable weights, specify as a node attribute
    for node in model.graph.node:
        inst = getCustomOp(node)
        if isinstance(inst, (MatrixVectorActivation, VectorVectorActivation)):
            inst.set_nodeattr("runtime_writeable_weights", int(rww))
            cur_adt = DataType[inst.get_nodeattr("accDataType")]
            assert cur_adt.bitwidth() == def_adt.bitwidth(), "Default data type is incorrect"

    # Apply the optimization
    model = model.transform(MinimizeAccumulatorWidth())

    # Iterate through each node to make sure it functioned properly 
    for node in model.graph.node:
        inst = getCustomOp(node)
        if isinstance(inst, (MatrixVectorActivation, VectorVectorActivation)):
            cur_adt = DataType[inst.get_nodeattr("accDataType")]
            cur_odt = DataType[inst.get_nodeattr("outputDataType")]
            # Calculating expected accumulator bit width using a closed-form expression
            # that is a slight over-approximation of the lower bound. The accumulator
            # bit width minimization logic in the MVAU and VVAU is exact and should be
            # less than or equal to this calculation
            exp_adt = calculate_accumulator_bit_width(inst, model)
            assert cur_adt.bitwidth() <= exp_adt.bitwidth(), "Mismatched accumulation data types"
            if model.find_direct_successors(inst.onnx_node) is None:
                assert (cur_adt.bitwidth() % 8) == 0, "bit width of last node needs to be divisible by 8"
                assert cur_adt.bitwidth() == cur_odt.bitwidth(), "outputDataType and accDataType should be equal"
            else:
                assert cur_odt.bitwidth() == idt.bitwidth(), "outputDataType should not be changed"