Skip to content
Snippets Groups Projects
test_end2end_bnn_pynq.py 30.6 KiB
Newer Older
# Copyright (c) 2020, Xilinx
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of FINN nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
import pytest

import numpy as np

# as of Feb'20 there is a bug that segfaults ONNX shape inference if we
# import pytorch before onnx, so we make sure to import onnx first
import onnx  # NOQA
import torch
import brevitas.onnx as bo

import finn.transformation.fpgadataflow.convert_to_hls_layers as to_hls
import finn.transformation.streamline.absorb as absorb
from finn.core.onnx_exec import execute_onnx
from finn.custom_op.registry import getCustomOp
from finn.transformation.bipolar_to_xnor import ConvertBipolarMatMulToXnorPopcount
from finn.transformation.fold_constants import FoldConstants

from finn.transformation.fpgadataflow.create_dataflow_partition import (
    CreateDataflowPartition,
)
from finn.transformation.fpgadataflow.make_deployment import DeployToPYNQ
from finn.transformation.general import (
    RemoveUnusedTensors,
    RemoveStaticGraphInputs,
    GiveReadableTensorNames,
    GiveUniqueNodeNames,
)
from finn.transformation.infer_datatypes import InferDataTypes
from finn.transformation.infer_shapes import InferShapes
from finn.transformation.streamline import Streamline
from finn.util.test import (
    get_build_env,
    load_test_checkpoint_or_skip,
    get_example_input,
    get_trained_network_and_ishape,
    execute_parent,
from finn.transformation.fpgadataflow.annotate_resources import AnnotateResources
from finn.transformation.infer_data_layouts import InferDataLayouts
from finn.transformation.move_reshape import RemoveCNVtoFCFlatten
from finn.transformation.lower_convs_to_matmul import LowerConvsToMatMul
from finn.transformation.streamline.reorder import (
    MakeMaxPoolNHWC,
    MoveScalarLinearPastInvariants,
)
from finn.transformation.fpgadataflow.prepare_ip import PrepareIP
from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP
from finn.transformation.fpgadataflow.prepare_cppsim import PrepareCppSim
from finn.transformation.fpgadataflow.compile_cppsim import CompileCppSim
from finn.transformation.fpgadataflow.set_exec_mode import SetExecMode
from finn.transformation.fpgadataflow.create_stitched_ip import CreateStitchedIP
from finn.transformation.fpgadataflow.prepare_rtlsim import PrepareRTLSim
from finn.transformation.fpgadataflow.insert_dwc import InsertDWC
from finn.transformation.fpgadataflow.annotate_cycles import AnnotateCycles
from finn.transformation.fpgadataflow.set_fifo_depths import InsertAndSetFIFODepths
from finn.analysis.fpgadataflow.dataflow_performance import dataflow_performance
from finn.core.modelwrapper import ModelWrapper
from scipy.stats import linregress
from finn.core.throughput_test import throughput_test_remote, throughput_test_rtlsim
from finn.util.pytorch import ToTensor
from finn.transformation.merge_onnx_models import MergeONNXModels
from finn.transformation.insert_topk import InsertTopK
from finn.core.datatype import DataType
from dataset_loading import mnist, cifar
from datetime import datetime
from finn.util.gdrive import upload_to_end2end_dashboard
from collections import OrderedDict
build_dir = os.environ["FINN_BUILD_DIR"]
target_clk_ns = 10
mem_mode = "decoupled"
rtlsim_trace = False


def get_checkpoint_name(topology, wbits, abits, step):
    return build_dir + "/end2end_%s_w%da%d_%s.onnx" % (topology, wbits, abits, step)


def get_dashboard_data(topology, wbits, abits):
    stats_file = build_dir + "/end2end_%s_w%da%d.txt" % (topology, wbits, abits)
    stats_dict = OrderedDict()
    if os.path.isfile(stats_file):
        with open(stats_file, "r") as f:
            stats_dict_txt = f.read()
        stats_dict = eval(stats_dict_txt)
    return stats_dict


def update_dashboard_data(topology, wbits, abits, key, val):
    stats_dict = get_dashboard_data(topology, wbits, abits)
    stats_dict[key] = val
    stats_file = build_dir + "/end2end_%s_w%da%d.txt" % (topology, wbits, abits)
    with open(stats_file, "w") as f:
        f.write(str(stats_dict))


def fold_tfc(model):
    fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
    # (PE, SIMD, ramstyle) for each layer
    config = [(16, 49, "block"), (8, 8, "auto"), (8, 8, "auto"), (10, 8, "distributed")]
    for fcl, (pe, simd, ramstyle) in zip(fc_layers, config):
        fcl_inst = getCustomOp(fcl)
        fcl_inst.set_nodeattr("PE", pe)
        fcl_inst.set_nodeattr("SIMD", simd)
        fcl_inst.set_nodeattr("ram_style", ramstyle)
    # set parallelism for input quantizer to be same as first layer's SIMD
    inp_qnt_node = model.get_nodes_by_op_type("Thresholding_Batch")[0]
    inp_qnt = getCustomOp(inp_qnt_node)
    inp_qnt.set_nodeattr("PE", 49)
    inp_qnt.set_nodeattr("mem_mode", "decoupled")
    inp_qnt.set_nodeattr("runtime_writeable_weights", 1)
def fold_lfc(model):
    fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
    # (PE, SIMD, ramstyle) for each layer
    config = [
        (32, 49, "block"),
        (64, 32, "auto"),
        (32, 64, "auto"),
        (10, 8, "distributed"),
    ]
    for fcl, (pe, simd, ramstyle) in zip(fc_layers, config):
        fcl_inst = getCustomOp(fcl)
        fcl_inst.set_nodeattr("PE", pe)
        fcl_inst.set_nodeattr("SIMD", simd)
        fcl_inst.set_nodeattr("ram_style", ramstyle)
        fcl_inst.set_nodeattr("runtime_writeable_weights", 1)
    # set parallelism for input quantizer to be same as first layer's SIMD
    inp_qnt_node = model.get_nodes_by_op_type("Thresholding_Batch")[0]
    inp_qnt = getCustomOp(inp_qnt_node)
    inp_qnt.set_nodeattr("PE", 49)
    return model


def fold_cnv_large(model):
    fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
    # each tuple is (PE, SIMD) for a layer
        (16, 3),
        (32, 32),
        (16, 32),
        (16, 32),
        (4, 32),
        (1, 32),
        (1, 4),
        (1, 8),
        (5, 1),
    for fcl, (pe, simd) in zip(fc_layers, folding):
        fcl_inst = getCustomOp(fcl)
        fcl_inst.set_nodeattr("PE", pe)
        fcl_inst.set_nodeattr("SIMD", simd)

    swg_layers = model.get_nodes_by_op_type("ConvolutionInputGenerator")
    for i in range(len(swg_layers)):
        swg_inst = getCustomOp(swg_layers[i])
        simd = folding[i][1]
        swg_inst.set_nodeattr("SIMD", simd)
    return model


def fold_cnv_small(model):
    fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
    # each tuple is (PE, SIMD) for a layer
        (8, 3, "auto"),
        (16, 16, "auto"),
        (8, 16, "auto"),
        (8, 16, "block"),
        (4, 8, "auto"),
        (1, 8, "auto"),
        (1, 2, "distributed"),
        (2, 2, "block"),
        (5, 1, "distributed"),
    for fcl, (pe, simd, ramstyle) in zip(fc_layers, folding):
        fcl_inst = getCustomOp(fcl)
        fcl_inst.set_nodeattr("PE", pe)
        fcl_inst.set_nodeattr("SIMD", simd)
        fcl_inst.set_nodeattr("ram_style", ramstyle)

    swg_layers = model.get_nodes_by_op_type("ConvolutionInputGenerator")
    for i in range(len(swg_layers)):
        swg_inst = getCustomOp(swg_layers[i])
        simd = folding[i][1]
        swg_inst.set_nodeattr("SIMD", simd)
    return model


def get_folding_function(topology, wbits, abits):
    if "tfc" in topology:
        return fold_tfc
    elif "lfc" in topology:
        return fold_lfc
    elif "cnv" in topology:
        if wbits == 1 and abits == 1:
            return fold_cnv_large
        else:
            return fold_cnv_small
    else:
        raise Exception("Unknown topology/quantization combo for predefined folding")


def get_golden_io_pair(topology, wbits, abits, preproc=ToTensor(), return_topk=None):
    (model, ishape) = get_trained_network_and_ishape(topology, wbits, abits)
    input_tensor_npy = get_example_input(topology)
    input_tensor_torch = torch.from_numpy(input_tensor_npy).float()
    if preproc is not None:
        input_tensor_torch = preproc.forward(input_tensor_torch).detach()
    output_tensor_npy = model.forward(input_tensor_torch).detach().numpy()
    if return_topk is not None:
        output_tensor_npy = get_topk(output_tensor_npy, k=return_topk)
    return (input_tensor_npy, output_tensor_npy)


def measure_top1_accuracy(model_chkpt, dataset, parent_chkpt=None):
    if dataset == "cifar10":
        trainx, trainy, testx, testy, valx, valy = cifar.load_cifar_data(
            "/workspace/finn/dataset", download=True, one_hot=False
        )
    elif dataset == "mnist":
        trainx, trainy, testx, testy, valx, valy = mnist.load_mnist_data(
            "/workspace/finn/dataset", download=True, one_hot=False
        )
    else:
        raise Exception("Unrecognized dataset")
    # move from dataset_loader layout to ONNX layout: NHWC -> NCHW
    testx = testx.transpose(0, 3, 1, 2)
    model = ModelWrapper(model_chkpt)
    iname = model.graph.input[0].name
    oname = model.graph.output[0].name
    if parent_chkpt is None:
        ishape = model.get_tensor_shape(iname)
    else:
        parent_model = ModelWrapper(parent_chkpt)
        parent_iname = parent_model.graph.input[0].name
        ishape = parent_model.get_tensor_shape(parent_iname)
    ok = 0
    nok = 0
    n_batches = testx.shape[0]
    for i in range(n_batches):
        tdata = testx[i].reshape(ishape).astype(np.float32)
        exp = testy[i].item()
        if parent_chkpt is not None:
            y = execute_parent(parent_chkpt, model_chkpt, tdata)
        else:
            y = execute_onnx(model, {iname: tdata}, False)[oname]
        ret = y.item()
        if ret == exp:
            ok += 1
        else:
            nok += 1
        if i % 10 == 0:
            print("%d : OK %d NOK %d " % (i, ok, nok))
    acc_top1 = ok * 100.0 / (ok + nok)
    warnings.warn("Final OK %d NOK %d top-1 %f" % (ok, nok, acc_top1))
    return acc_top1


def topology2dataset(topology):
    if "fc" in topology:
        return "mnist"
    elif "cnv" in topology:
        return "cifar10"
    else:
        raise Exception("Unrecognized topology")


@pytest.mark.parametrize("wbits", [1, 2])
@pytest.mark.parametrize("abits", [1, 2])
@pytest.mark.parametrize("topology", ["lfc", "tfc", "cnv"])
class TestEnd2End:
    def test_export(self, topology, wbits, abits):
        if wbits > abits:
            pytest.skip("No wbits > abits end2end network configs for now")
        if topology == "lfc" and not (wbits == 1 and abits == 1):
            pytest.skip("Skipping certain lfc configs")
        (model, ishape) = get_trained_network_and_ishape(topology, wbits, abits)
        chkpt_name = get_checkpoint_name(topology, wbits, abits, "export")
        bo.export_finn_onnx(model, ishape, chkpt_name)
        nname = "%s_w%da%d" % (topology, wbits, abits)
        update_dashboard_data(topology, wbits, abits, "network", nname)
        dtstr = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        update_dashboard_data(topology, wbits, abits, "datetime", dtstr)
        finn_commit = subprocess.check_output(
            ["git", "rev-parse", "HEAD"], cwd="/workspace/finn"
        )
        finn_commit = finn_commit.decode("utf-8").strip()
        update_dashboard_data(topology, wbits, abits, "finn-commit", finn_commit)
        assert os.path.isfile(chkpt_name)

    def test_import_and_tidy(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "export")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        model = model.transform(InferShapes())
        model = model.transform(FoldConstants())
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(GiveReadableTensorNames())
        model = model.transform(InferDataTypes())
        model = model.transform(RemoveStaticGraphInputs())
        chkpt = get_checkpoint_name(topology, wbits, abits, "import_and_tidy")
        model.save(chkpt)
    def test_add_pre_and_postproc(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "import_and_tidy")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        global_inp_name = model.graph.input[0].name
        ishape = model.get_tensor_shape(global_inp_name)
        # preprocessing: torchvision's ToTensor divides uint8 inputs by 255
        totensor_pyt = ToTensor()
        chkpt_preproc_name = get_checkpoint_name(topology, wbits, abits, "preproc")
        bo.export_finn_onnx(totensor_pyt, ishape, chkpt_preproc_name)
        assert os.path.isfile(chkpt_preproc_name)
        # join preprocessing and core model
        pre_model = ModelWrapper(chkpt_preproc_name)
        pre_model = pre_model.transform(InferShapes())
        pre_model = pre_model.transform(FoldConstants())
        model = model.transform(MergeONNXModels(pre_model))
        # add input quantization annotation: UINT8 for all BNN-PYNQ models
        global_inp_name = model.graph.input[0].name
        model.set_tensor_datatype(global_inp_name, DataType.UINT8)
        # postprocessing: insert Top-1 node at the end
        model = model.transform(InsertTopK(k=1))
        chkpt_name = get_checkpoint_name(topology, wbits, abits, "pre_post")
        # tidy-up again
        model = model.transform(InferShapes())
        model = model.transform(FoldConstants())
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(GiveReadableTensorNames())
        model = model.transform(InferDataTypes())
        model = model.transform(RemoveStaticGraphInputs())
        model.save(chkpt_name)
        assert os.path.isfile(chkpt_name)

    def test_streamline(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "pre_post")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        model = model.transform(absorb.AbsorbSignBiasIntoMultiThreshold())
        # move past any reshapes to be able to streamline input scaling
        model = model.transform(MoveScalarLinearPastInvariants())
        model = model.transform(Streamline())
        if "fc" not in topology:
            model = model.transform(LowerConvsToMatMul())
            model = model.transform(MakeMaxPoolNHWC())
            model = model.transform(absorb.AbsorbTransposeIntoMultiThreshold())
        model = model.transform(ConvertBipolarMatMulToXnorPopcount())
        model = model.transform(Streamline())
        # absorb final add-mul nodes into TopK
        model = model.transform(absorb.AbsorbScalarMulAddIntoTopK())
        model = model.transform(InferDataLayouts())
        model = model.transform(RemoveUnusedTensors())
        model.save(get_checkpoint_name(topology, wbits, abits, "streamline"))

    def test_convert_to_hls_layers(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "streamline")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        if topology == "tfc" and wbits == 1 and abits == 1:
            # use standalone thresholds for tfc-w1a1 to also exercise that option
            model = model.transform(to_hls.InferThresholdingLayer())
        # needed for bipolar MatMul layers
        model = model.transform(to_hls.InferBinaryStreamingFCLayer(mem_mode))
        # needed for non-bipolar MatMul layers
        model = model.transform(to_hls.InferQuantizedStreamingFCLayer(mem_mode))
        # TopK to LabelSelect
        model = model.transform(to_hls.InferLabelSelectLayer())
        # input quantization (if any) to standalone thresholding
        model = model.transform(to_hls.InferThresholdingLayer())
        # needed for convolutions
        if "fc" not in topology:
            model = model.transform(to_hls.InferConvInpGen())
            model = model.transform(to_hls.InferStreamingMaxPool())
            model = model.transform(RemoveCNVtoFCFlatten())
        # get rid of Tranpose -> Tranpose identity seq
        model = model.transform(absorb.AbsorbConsecutiveTransposes())
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(InferDataLayouts())
        model.save(get_checkpoint_name(topology, wbits, abits, "convert_to_hls_layers"))

    def test_create_dataflow_partition(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(
            topology, wbits, abits, "convert_to_hls_layers"
        )
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        parent_model = model.transform(CreateDataflowPartition())
        parent_model_chkpt = get_checkpoint_name(
            topology, wbits, abits, "dataflow_parent"
        )
        parent_model.save(parent_model_chkpt)
        sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
        sdp_node = getCustomOp(sdp_node)
        dataflow_model_filename = sdp_node.get_nodeattr("model")
        dataflow_model = load_test_checkpoint_or_skip(dataflow_model_filename)
        dataflow_model_chkpt = get_checkpoint_name(
            topology, wbits, abits, "dataflow_model"
        )
        dataflow_model.save(dataflow_model_chkpt)

    def test_fold(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "dataflow_model")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        folding_fxn = get_folding_function(topology, wbits, abits)
        model = folding_fxn(model)
        model.save(get_checkpoint_name(topology, wbits, abits, "fold"))

    @pytest.mark.slow
    @pytest.mark.vivado
    def test_cppsim(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "fold")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        model = model.transform(PrepareCppSim())
        model = model.transform(CompileCppSim())
        model = model.transform(SetExecMode("cppsim"))
        cppsim_chkpt = get_checkpoint_name(topology, wbits, abits, "cppsim")
        model.save(cppsim_chkpt)
        parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
        (input_tensor_npy, output_tensor_npy) = get_golden_io_pair(
            topology, wbits, abits, return_topk=1
        )
        y = execute_parent(parent_chkpt, cppsim_chkpt, input_tensor_npy)
        assert np.isclose(y, output_tensor_npy).all()

    @pytest.mark.slow
    @pytest.mark.vivado
    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_ipgen(self, topology, wbits, abits, kind):
        if kind == "alveo" and ("VITIS_PATH" not in os.environ):
            pytest.skip("VITIS_PATH not set")
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "fold")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        test_fpga_part = get_build_env(kind, target_clk_ns)["part"]
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(PrepareIP(test_fpga_part, target_clk_ns))
        model = model.transform(HLSSynthIP())
        model.save(get_checkpoint_name(topology, wbits, abits, "ipgen_" + kind))
    @pytest.mark.slow
    @pytest.mark.vivado
    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_set_fifo_depths(self, topology, wbits, abits, kind):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "ipgen_" + kind)
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        test_fpga_part = get_build_env(kind, target_clk_ns)["part"]
        model = model.transform(InsertAndSetFIFODepths(test_fpga_part, target_clk_ns))
        fifo_layers = model.get_nodes_by_op_type("StreamingFIFO")
        assert len(fifo_layers) > 0
        hls_layers = model.get_finn_nodes()
        for node in hls_layers:
            if node.op_type != "StreamingFIFO":
                op_inst = getCustomOp(node)
                assert op_inst.get_nodeattr("inFIFODepth") == 0
                assert op_inst.get_nodeattr("outFIFODepth") == 0
        model.save(get_checkpoint_name(topology, wbits, abits, "fifodepth_" + kind))

    @pytest.mark.slow
    @pytest.mark.vivado
    @pytest.mark.parametrize("kind", ["zynq"])
    def test_ipstitch_rtlsim(self, topology, wbits, abits, kind):
        prev_chkpt_name = get_checkpoint_name(
            topology, wbits, abits, "fifodepth_" + kind
        )
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        test_fpga_part = get_build_env(kind, target_clk_ns)["part"]
        model = model.transform(InsertDWC())
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(AnnotateCycles())
        perf = model.analysis(dataflow_performance)
        latency = perf["critical_path_cycles"]
        # rtlsim only supports impl_style=rtl for StreamingFIFO, ensure that
        for fifo_layer in model.get_nodes_by_op_type("StreamingFIFO"):
            getCustomOp(fifo_layer).set_nodeattr("impl_style", "rtl")
        model = model.transform(PrepareIP(test_fpga_part, target_clk_ns))
        model = model.transform(HLSSynthIP())
        model = model.transform(CreateStitchedIP(test_fpga_part, target_clk_ns))
        model = model.transform(PrepareRTLSim())
        model.set_metadata_prop("exec_mode", "rtlsim")
        os.environ["LIVENESS_THRESHOLD"] = str(int(latency * 1.1))
        if rtlsim_trace:
            model.set_metadata_prop(
                "rtlsim_trace", "%s_w%da%d.vcd" % (topology, wbits, abits)
            )
            os.environ["RTLSIM_TRACE_DEPTH"] = "3"
        rtlsim_chkpt = get_checkpoint_name(
            topology, wbits, abits, "ipstitch_rtlsim_" + kind
        )
        model.save(rtlsim_chkpt)
        parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
        (input_tensor_npy, output_tensor_npy) = get_golden_io_pair(
            topology, wbits, abits, return_topk=1
        )
        y = execute_parent(parent_chkpt, rtlsim_chkpt, input_tensor_npy)
        model = ModelWrapper(rtlsim_chkpt)
        perf["cycles_rtlsim"] = model.get_metadata_prop("cycles_rtlsim")
        # warnings.warn("Estimated & rtlsim performance: " + str(perf))
        # for (k, v) in perf.items():
        #    update_dashboard_data(topology, wbits, abits, k, v)
        update_dashboard_data(
            topology, wbits, abits, "cycles_rtlsim", perf["cycles_rtlsim"]
        )
        assert np.isclose(y, output_tensor_npy).all()

    @pytest.mark.slow
    @pytest.mark.vivado
    @pytest.mark.parametrize("kind", ["zynq"])
    def test_throughput_rtlsim(self, topology, wbits, abits, kind):
        prev_chkpt_name = get_checkpoint_name(
            topology, wbits, abits, "ipstitch_rtlsim_" + kind
        )
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        n_nodes = len(model.graph.node)
        perf_est = model.analysis(dataflow_performance)
        latency = int(model.get_metadata_prop("cycles_rtlsim"))
        cycles_per_sample_est = perf_est["max_cycles"]
        batchsize = 2 * n_nodes
        ret = throughput_test_rtlsim(model, batchsize=batchsize)
        res_cycles = ret["cycles"]
        est_cycles = latency + cycles_per_sample_est * batchsize
        assert (abs(res_cycles - est_cycles) / res_cycles) < 0.15

    @pytest.mark.slow
    @pytest.mark.vivado
    @pytest.mark.parametrize("kind", ["zynq"])
    def test_validate_top1(self, topology, wbits, abits, kind):
        if "TEST_END2END_VALIDATE_TOP1" not in os.environ:
            pytest.skip("TEST_END2END_VALIDATE_TOP1 not set")
        prepostproc_chkpt = get_checkpoint_name(topology, wbits, abits, "pre_post")
        streamline_chkpt = get_checkpoint_name(topology, wbits, abits, "streamline")
        parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
        cppsim_chkpt = get_checkpoint_name(topology, wbits, abits, "cppsim")
        rtlsim_chkpt = get_checkpoint_name(
            topology, wbits, abits, "ipstitch_rtlsim_" + kind
        )
        dataset = topology2dataset(topology)
        assert measure_top1_accuracy(prepostproc_chkpt, dataset) > 80
        assert measure_top1_accuracy(streamline_chkpt, dataset) > 80
        assert measure_top1_accuracy(cppsim_chkpt, dataset, parent_chkpt) > 80
        assert measure_top1_accuracy(rtlsim_chkpt, dataset, parent_chkpt) > 80

    @pytest.mark.vivado
    @pytest.mark.vitis
    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_build(self, topology, wbits, abits, kind):
        if kind == "alveo" and ("VITIS_PATH" not in os.environ):
            pytest.skip("VITIS_PATH not set")
        prev_chkpt_name = get_checkpoint_name(
            topology, wbits, abits, "fifodepth_" + kind
        )
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        cfg = get_build_env(kind, target_clk_ns)
        model = model.transform(cfg["build_fxn"])
        model = model.transform(AnnotateResources("synth"))
        synth_dct = eval(model.get_metadata_prop("res_total_top_synth"))
        for (k, v) in synth_dct.items():
            update_dashboard_data(topology, wbits, abits, k, v)
        update_dashboard_data(topology, wbits, abits, "board", cfg["board"])
        model.save(get_checkpoint_name(topology, wbits, abits, "build_" + kind))
    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_deploy(self, topology, wbits, abits, kind):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "build_" + kind)
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        cfg = get_build_env(kind, target_clk_ns)
        if cfg["ip"] == "":
            pytest.skip("PYNQ board IP address not specified")
        model = model.transform(
            DeployToPYNQ(
                cfg["ip"],
                cfg["port"],
                cfg["username"],
                cfg["password"],
                cfg["target_dir"],
            )
        )
        # save the model to be able to link it to the parent
        model.save(get_checkpoint_name(topology, wbits, abits, "deploy_" + kind))
    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_run_on_hw(self, topology, wbits, abits, kind):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "deploy_" + kind)
        model = load_test_checkpoint_or_skip(prev_chkpt_name)  # NOQA
        cfg = get_build_env(kind, target_clk_ns)
        if cfg["ip"] == "":
            pytest.skip("PYNQ board IP address not specified")
        (input_tensor_npy, output_tensor_npy) = get_golden_io_pair(
            topology, wbits, abits, return_topk=1
        )
        parent_model = load_test_checkpoint_or_skip(
            get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
        )
        iname = parent_model.graph.input[0].name
        oname = parent_model.graph.output[0].name
        sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
        sdp_node = getCustomOp(sdp_node)
        sdp_node.set_nodeattr("model", prev_chkpt_name)
        ret = execute_onnx(parent_model, {iname: input_tensor_npy}, True)
        y = ret[oname]
        assert np.isclose(y, output_tensor_npy).all()

    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_throughput_hw(self, topology, wbits, abits, kind):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "deploy_" + kind)
        end2end_example = "%s_w%da%d_%s" % (topology, wbits, abits, kind)
        model = load_test_checkpoint_or_skip(prev_chkpt_name)  # NOQA
        cfg = get_build_env(kind, target_clk_ns)
        if cfg["ip"] == "":
            pytest.skip("PYNQ board IP address not specified")
        ret = dict()
        # try a range of batch sizes, some may fail due to insufficient DMA
        # buffers
        bsize_range_in = [8 ** i for i in range(5)]
        bsize_range = []
        for bsize in bsize_range_in:
            res = throughput_test_remote(model, bsize)
            if res is not None:
                ret[bsize] = res
                bsize_range.append(bsize)
            else:
                # assume we reached largest possible N
                break
        y = [ret[key]["runtime[ms]"] for key in bsize_range]
        lrret = linregress(bsize_range, y)
        ret_str = ""
        ret_str += "\n" + "%s Throughput Test Results" % end2end_example
        ret_str += "\n" + "-----------------------------"
        ret_str += "\n" + "From linear regression:"
        ret_str += "\n" + "Invocation overhead: %f ms" % lrret.intercept
        ret_str += "\n" + "Time per sample: %f ms" % lrret.slope
        ret_str += "\n" + "Raw data:"

        ret_str += "\n" + "{:<8} {:<16} {:<16} {:<16} {:<16} {:<16}".format(
            "N", "runtime[ms]", "fclk[mhz]", "fps", "DRAM rd[Mb/s]", "DRAM wr[Mb/s]"
        )
        for k in bsize_range:
            v = ret[k]
            ret_str += "\n" + "{:<8} {:<16} {:<16} {:<16} {:<16} {:<16}".format(
                k,
                np.round(v["runtime[ms]"], 4),
                v["fclk[mhz]"],
                np.round(v["throughput[images/s]"], 2),
                np.round(v["DRAM_in_bandwidth[Mb/s]"], 2),
                np.round(v["DRAM_out_bandwidth[Mb/s]"], 2),
            )
        ret_str += "\n" + "-----------------------------"
        warnings.warn(ret_str)
        largest_bsize = bsize_range[-1]
        update_dashboard_data(
            topology, wbits, abits, "fclk[mhz]", ret[largest_bsize]["fclk[mhz]"]
        )
        update_dashboard_data(
            topology,
            wbits,
            abits,
            "throughput[images/s]",
            ret[largest_bsize]["throughput[images/s]"],
        )

    def test_upload_results_to_dashboard(self, topology, wbits, abits):
        dashboard_data = get_dashboard_data(topology, wbits, abits)
        if len(dashboard_data.keys()) > 0:
            upload_to_end2end_dashboard(dashboard_data)
        else:
            pytest.skip("No data to upload to dashboard")