Skip to content
Snippets Groups Projects
test_end2end_bnn_pynq.py 24.68 KiB
# Copyright (c) 2020, Xilinx
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of FINN nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os
import pytest

import numpy as np

# as of Feb'20 there is a bug that segfaults ONNX shape inference if we
# import pytorch before onnx, so we make sure to import onnx first
import onnx  # NOQA
import torch
import brevitas.onnx as bo

import finn.transformation.fpgadataflow.convert_to_hls_layers as to_hls
import finn.transformation.streamline.absorb as absorb
from finn.core.onnx_exec import execute_onnx
from finn.custom_op.registry import getCustomOp
from finn.transformation.bipolar_to_xnor import ConvertBipolarMatMulToXnorPopcount
from finn.transformation.fold_constants import FoldConstants

from finn.transformation.fpgadataflow.create_dataflow_partition import (
    CreateDataflowPartition,
)
from finn.transformation.fpgadataflow.make_deployment import DeployToPYNQ
from finn.transformation.general import (
    RemoveUnusedTensors,
    RemoveStaticGraphInputs,
    GiveReadableTensorNames,
    GiveUniqueNodeNames,
)
from finn.transformation.infer_datatypes import InferDataTypes
from finn.transformation.infer_shapes import InferShapes
from finn.transformation.streamline import Streamline
from finn.util.test import (
    get_build_env,
    load_test_checkpoint_or_skip,
    get_example_input,
    get_trained_network_and_ishape,
    execute_parent,
    get_topk,
)
from finn.transformation.fpgadataflow.annotate_resources import AnnotateResources
from finn.transformation.infer_data_layouts import InferDataLayouts
from finn.transformation.move_reshape import RemoveCNVtoFCFlatten
from finn.transformation.lower_convs_to_matmul import LowerConvsToMatMul
from finn.transformation.streamline.reorder import (
    MakeMaxPoolNHWC,
    MoveScalarLinearPastInvariants,
)
import warnings
from finn.transformation.fpgadataflow.prepare_ip import PrepareIP
from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP
from finn.transformation.fpgadataflow.prepare_cppsim import PrepareCppSim
from finn.transformation.fpgadataflow.compile_cppsim import CompileCppSim
from finn.transformation.fpgadataflow.set_exec_mode import SetExecMode
from finn.transformation.fpgadataflow.create_stitched_ip import CreateStitchedIP
from finn.transformation.fpgadataflow.prepare_rtlsim import PrepareRTLSim
from finn.transformation.fpgadataflow.insert_dwc import InsertDWC
from finn.transformation.fpgadataflow.insert_fifo import InsertFIFO
from finn.transformation.fpgadataflow.annotate_cycles import AnnotateCycles
from finn.analysis.fpgadataflow.dataflow_performance import dataflow_performance
from finn.core.modelwrapper import ModelWrapper
from scipy.stats import linregress
from finn.core.throughput_test import throughput_test_remote, throughput_test_rtlsim
from finn.util.pytorch import ToTensor
from finn.transformation.merge_onnx_models import MergeONNXModels
from finn.transformation.insert_topk import InsertTopK
from finn.core.datatype import DataType
import mnist

build_dir = "/tmp/" + os.environ["FINN_INST_NAME"]
target_clk_ns = 10
mem_mode = "decoupled"
rtlsim_trace = False
mnist_test_imgs = mnist.test_images()
mnist_test_labels = mnist.test_labels()


def get_checkpoint_name(topology, wbits, abits, step):
    return build_dir + "/end2end_%s_w%da%d_%s.onnx" % (topology, wbits, abits, step)


def fold_tfc(model):
    fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
    # (PE, SIMD, in_fifo_depth, out_fifo_depth, ramstyle) for each layer
    config = [
        (16, 49, 16, 64, "block"),
        (8, 8, 64, 64, "auto"),
        (8, 8, 64, 64, "auto"),
        (10, 8, 64, 10, "distributed"),
    ]
    for fcl, (pe, simd, ififo, ofifo, ramstyle) in zip(fc_layers, config):
        fcl_inst = getCustomOp(fcl)
        fcl_inst.set_nodeattr("PE", pe)
        fcl_inst.set_nodeattr("SIMD", simd)
        fcl_inst.set_nodeattr("inFIFODepth", ififo)
        fcl_inst.set_nodeattr("outFIFODepth", ofifo)
        fcl_inst.set_nodeattr("ram_style", ramstyle)
    # set parallelism for input quantizer to be same as first layer's SIMD
    inp_qnt_node = model.get_nodes_by_op_type("Thresholding_Batch")[0]
    inp_qnt = getCustomOp(inp_qnt_node)
    inp_qnt.set_nodeattr("PE", 49)
    return model


def fold_cnv_large(model):
    fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
    # each tuple is (PE, SIMD, in_fifo_depth) for a layer
    folding = [
        (16, 3, 256),
        (32, 32, 256),
        (16, 32, 256),
        (16, 32, 256),
        (4, 32, 214),
        (1, 32, 2),
        (1, 4, 126),
        (1, 8, 62),
        (5, 1, 6),
    ]
    for fcl, (pe, simd, ififodepth) in zip(fc_layers, folding):
        fcl_inst = getCustomOp(fcl)
        fcl_inst.set_nodeattr("PE", pe)
        fcl_inst.set_nodeattr("SIMD", simd)
        fcl_inst.set_nodeattr("inFIFODepth", ififodepth)

    swg_layers = model.get_nodes_by_op_type("ConvolutionInputGenerator")
    swg_idepth = [2, 51, 9, 106, 2, 2]
    for i in range(len(swg_layers)):
        swg_inst = getCustomOp(swg_layers[i])
        simd = folding[i][1]
        swg_inst.set_nodeattr("SIMD", simd)
        swg_inst.set_nodeattr("inFIFODepth", swg_idepth[i])
    return model


def fold_cnv_small(model):
    fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
    # each tuple is (PE, SIMD, in_fifo_depth) for a layer
    folding = [
        (8, 3, 256, "auto"),
        (16, 16, 256, "auto"),
        (8, 16, 256, "auto"),
        (8, 16, 256, "block"),
        (4, 8, 214, "auto"),
        (1, 8, 2, "auto"),
        (1, 2, 126, "distributed"),
        (2, 2, 62, "block"),
        (5, 1, 6, "distributed"),
    ]
    for fcl, (pe, simd, ififodepth, ramstyle) in zip(fc_layers, folding):
        fcl_inst = getCustomOp(fcl)
        fcl_inst.set_nodeattr("PE", pe)
        fcl_inst.set_nodeattr("SIMD", simd)
        fcl_inst.set_nodeattr("inFIFODepth", ififodepth)
        fcl_inst.set_nodeattr("ram_style", ramstyle)

    swg_layers = model.get_nodes_by_op_type("ConvolutionInputGenerator")
    swg_idepth = [2, 51, 9, 106, 2, 2]
    for i in range(len(swg_layers)):
        swg_inst = getCustomOp(swg_layers[i])
        simd = folding[i][1]
        swg_inst.set_nodeattr("SIMD", simd)
        swg_inst.set_nodeattr("inFIFODepth", swg_idepth[i])
    return model


def get_folding_function(topology, wbits, abits):
    if "tfc" in topology:
        return fold_tfc
    elif "cnv" in topology:
        if wbits == 1 and abits == 1:
            return fold_cnv_large
        else:
            return fold_cnv_small
    else:
        raise Exception("Unknown topology/quantization combo for predefined folding")


def get_golden_io_pair(topology, wbits, abits, preproc=ToTensor(), return_topk=None):
    (model, ishape) = get_trained_network_and_ishape(topology, wbits, abits)
    input_tensor_npy = get_example_input(topology)
    input_tensor_torch = torch.from_numpy(input_tensor_npy).float()
    if preproc is not None:
        input_tensor_torch = preproc.forward(input_tensor_torch).detach()
    output_tensor_npy = model.forward(input_tensor_torch).detach().numpy()
    if return_topk is not None:
        output_tensor_npy = get_topk(output_tensor_npy, k=return_topk)
    return (input_tensor_npy, output_tensor_npy)


@pytest.mark.parametrize("wbits", [1, 2])
@pytest.mark.parametrize("abits", [1, 2])
@pytest.mark.parametrize("topology", ["tfc", "cnv"])
class TestEnd2End:
    def test_export(self, topology, wbits, abits):
        if wbits > abits:
            pytest.skip("No wbits > abits end2end network configs for now")
        (model, ishape) = get_trained_network_and_ishape(topology, wbits, abits)
        chkpt_name = get_checkpoint_name(topology, wbits, abits, "export")
        bo.export_finn_onnx(model, ishape, chkpt_name)
        assert os.path.isfile(chkpt_name)

    def test_import_and_tidy(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "export")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        model = model.transform(InferShapes())
        model = model.transform(FoldConstants())
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(GiveReadableTensorNames())
        model = model.transform(InferDataTypes())
        model = model.transform(RemoveStaticGraphInputs())
        model.save(get_checkpoint_name(topology, wbits, abits, "import_and_tidy"))

    def test_add_pre_and_postproc(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "import_and_tidy")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        global_inp_name = model.graph.input[0].name
        ishape = model.get_tensor_shape(global_inp_name)
        # preprocessing: torchvision's ToTensor divides uint8 inputs by 255
        totensor_pyt = ToTensor()
        chkpt_preproc_name = get_checkpoint_name(topology, wbits, abits, "preproc")
        bo.export_finn_onnx(totensor_pyt, ishape, chkpt_preproc_name)
        assert os.path.isfile(chkpt_preproc_name)
        # join preprocessing and core model
        pre_model = ModelWrapper(chkpt_preproc_name)
        model = model.transform(MergeONNXModels(pre_model))
        # add input quantization annotation: UINT8 for all BNN-PYNQ models
        global_inp_name = model.graph.input[0].name
        model.set_tensor_datatype(global_inp_name, DataType.UINT8)
        # postprocessing: insert Top-1 node at the end
        model = model.transform(InsertTopK(k=1))
        chkpt_name = get_checkpoint_name(topology, wbits, abits, "pre_post")
        model.save(chkpt_name)
        assert os.path.isfile(chkpt_name)

    def test_streamline(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "pre_post")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        # move past any reshapes to be able to streamline input scaling
        model = model.transform(MoveScalarLinearPastInvariants())
        model = model.transform(Streamline())
        if "fc" not in topology:
            model = model.transform(LowerConvsToMatMul())
            model = model.transform(MakeMaxPoolNHWC())
            model = model.transform(absorb.AbsorbTransposeIntoMultiThreshold())
        model = model.transform(ConvertBipolarMatMulToXnorPopcount())
        model = model.transform(Streamline())
        # absorb final add-mul nodes into TopK
        model = model.transform(absorb.AbsorbScalarMulAddIntoTopK())
        model = model.transform(InferDataLayouts())
        model = model.transform(RemoveUnusedTensors())
        model.save(get_checkpoint_name(topology, wbits, abits, "streamline"))

    def test_convert_to_hls_layers(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "streamline")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        # needed for bipolar MatMul layers
        model = model.transform(to_hls.InferBinaryStreamingFCLayer(mem_mode))
        # needed for non-bipolar MatMul layers
        model = model.transform(to_hls.InferQuantizedStreamingFCLayer(mem_mode))
        # TopK to LabelSelect
        model = model.transform(to_hls.InferLabelSelectLayer())
        # input quantization (if any) to standalone thresholding
        model = model.transform(to_hls.InferThresholdingLayer())
        # needed for convolutions
        if "fc" not in topology:
            model = model.transform(to_hls.InferConvInpGen())
            model = model.transform(to_hls.InferStreamingMaxPool())
            model = model.transform(RemoveCNVtoFCFlatten())
        # get rid of Tranpose -> Tranpose identity seq
        model = model.transform(absorb.AbsorbConsecutiveTransposes())
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(InferDataLayouts())
        model.save(get_checkpoint_name(topology, wbits, abits, "convert_to_hls_layers"))

    def test_create_dataflow_partition(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(
            topology, wbits, abits, "convert_to_hls_layers"
        )
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        parent_model = model.transform(CreateDataflowPartition())
        parent_model_chkpt = get_checkpoint_name(
            topology, wbits, abits, "dataflow_parent"
        )
        parent_model.save(parent_model_chkpt)
        sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
        sdp_node = getCustomOp(sdp_node)
        dataflow_model_filename = sdp_node.get_nodeattr("model")
        dataflow_model = load_test_checkpoint_or_skip(dataflow_model_filename)
        dataflow_model_chkpt = get_checkpoint_name(
            topology, wbits, abits, "dataflow_model"
        )
        dataflow_model.save(dataflow_model_chkpt)

    def test_fold(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "dataflow_model")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        folding_fxn = get_folding_function(topology, wbits, abits)
        model = folding_fxn(model)
        model.save(get_checkpoint_name(topology, wbits, abits, "fold"))

    @pytest.mark.slow
    @pytest.mark.vivado
    def test_cppsim(self, topology, wbits, abits):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "fold")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        model = model.transform(PrepareCppSim())
        model = model.transform(CompileCppSim())
        model = model.transform(SetExecMode("cppsim"))
        cppsim_chkpt = get_checkpoint_name(topology, wbits, abits, "cppsim")
        model.save(cppsim_chkpt)
        parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
        (input_tensor_npy, output_tensor_npy) = get_golden_io_pair(
            topology, wbits, abits, return_topk=1
        )
        y = execute_parent(parent_chkpt, cppsim_chkpt, input_tensor_npy)
        assert np.isclose(y, output_tensor_npy).all()

    @pytest.mark.slow
    @pytest.mark.vivado
    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_ipgen(self, topology, wbits, abits, kind):
        if kind == "alveo" and ("VITIS_PATH" not in os.environ):
            pytest.skip("VITIS_PATH not set")
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "fold")
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        test_fpga_part = get_build_env(kind, target_clk_ns)["part"]
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(PrepareIP(test_fpga_part, target_clk_ns))
        model = model.transform(HLSSynthIP())
        model.save(get_checkpoint_name(topology, wbits, abits, "ipgen_" + kind))

    @pytest.mark.slow
    @pytest.mark.vivado
    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_ipstitch_rtlsim(self, topology, wbits, abits, kind):
        if kind == "alveo" and ("VITIS_PATH" not in os.environ):
            pytest.skip("VITIS_PATH not set")
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "ipgen_" + kind)
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        test_fpga_part = get_build_env(kind, target_clk_ns)["part"]
        model = model.transform(InsertDWC())
        model = model.transform(InsertFIFO())
        model = model.transform(GiveUniqueNodeNames())
        model = model.transform(AnnotateCycles())
        perf = model.analysis(dataflow_performance)
        latency = perf["critical_path_cycles"]
        model = model.transform(PrepareIP(test_fpga_part, target_clk_ns))
        model = model.transform(HLSSynthIP())
        model = model.transform(CreateStitchedIP(test_fpga_part, target_clk_ns))
        model = model.transform(PrepareRTLSim())
        model.set_metadata_prop("exec_mode", "rtlsim")
        os.environ["LIVENESS_THRESHOLD"] = str(int(latency * 1.1))
        if rtlsim_trace:
            model.set_metadata_prop(
                "rtlsim_trace", "%s_w%da%d.vcd" % (topology, wbits, abits)
            )
            os.environ["RTLSIM_TRACE_DEPTH"] = "3"
        rtlsim_chkpt = get_checkpoint_name(
            topology, wbits, abits, "ipstitch_rtlsim_" + kind
        )
        model.save(rtlsim_chkpt)
        parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
        (input_tensor_npy, output_tensor_npy) = get_golden_io_pair(
            topology, wbits, abits, return_topk=1
        )
        y = execute_parent(parent_chkpt, rtlsim_chkpt, input_tensor_npy)
        model = ModelWrapper(rtlsim_chkpt)
        perf["cycles_rtlsim"] = model.get_metadata_prop("cycles_rtlsim")
        warnings.warn("Estimated & rtlsim performance: " + str(perf))
        assert np.isclose(y, output_tensor_npy).all()

    @pytest.mark.slow
    @pytest.mark.parametrize("kind", ["zynq"])
    def test_rtlsim_top1(self, topology, wbits, abits, kind):
        if "fc" not in topology:
            pytest.skip("Top-1 rtlsim test currently for MNIST only")
        rtlsim_chkpt = get_checkpoint_name(
            topology, wbits, abits, "ipstitch_rtlsim_" + kind
        )
        parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
        load_test_checkpoint_or_skip(rtlsim_chkpt)
        ok = 0
        nok = 0
        for i in range(10000):
            tdata = mnist_test_imgs[i].reshape(1, 1, 28, 28).astype(np.float32)
            exp = mnist_test_labels[i].item()
            y = execute_parent(parent_chkpt, rtlsim_chkpt, tdata)
            ret = y.item()
            if ret == exp:
                ok += 1
            else:
                nok += 1
        acc_top1 = ok * 100.0 / (ok + nok)
        warnings.warn("Final OK %d NOK %d top-1 %f" % (ok, nok, acc_top1))
        assert acc_top1 > 90.0

    @pytest.mark.slow
    @pytest.mark.vivado
    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_throughput_rtlsim(self, topology, wbits, abits, kind):
        if kind == "alveo" and ("VITIS_PATH" not in os.environ):
            pytest.skip("VITIS_PATH not set")
        prev_chkpt_name = get_checkpoint_name(
            topology, wbits, abits, "ipstitch_rtlsim_" + kind
        )
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        n_nodes = len(model.graph.node)
        perf_est = model.analysis(dataflow_performance)
        latency = int(model.get_metadata_prop("cycles_rtlsim"))
        cycles_per_sample_est = perf_est["max_cycles"]
        batchsize = 2 * n_nodes
        ret = throughput_test_rtlsim(model, batchsize=batchsize)
        res_cycles = ret["cycles"]
        est_cycles = latency + cycles_per_sample_est * batchsize
        assert (abs(res_cycles - est_cycles) / res_cycles) < 0.15

    @pytest.mark.slow
    @pytest.mark.vivado
    @pytest.mark.vitis
    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_build(self, topology, wbits, abits, kind):
        if kind == "alveo" and ("VITIS_PATH" not in os.environ):
            pytest.skip("VITIS_PATH not set")
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "ipgen_" + kind)
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        cfg = get_build_env(kind, target_clk_ns)
        model = model.transform(cfg["build_fxn"])
        model = model.transform(AnnotateResources("synth"))
        warnings.warn(
            "Post-synthesis resources (excluding shell): "
            + model.get_metadata_prop("res_total_synth")
        )
        warnings.warn(
            "Post-synthesis resources (all inclusive): "
            + model.get_metadata_prop("res_total_top_synth")
        )
        model.save(get_checkpoint_name(topology, wbits, abits, "build_" + kind))

    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_deploy(self, topology, wbits, abits, kind):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "build_" + kind)
        model = load_test_checkpoint_or_skip(prev_chkpt_name)
        cfg = get_build_env(kind, target_clk_ns)
        if cfg["ip"] == "":
            pytest.skip("PYNQ board IP address not specified")
        model = model.transform(
            DeployToPYNQ(
                cfg["ip"],
                cfg["port"],
                cfg["username"],
                cfg["password"],
                cfg["target_dir"],
            )
        )
        # save the model to be able to link it to the parent
        model.save(get_checkpoint_name(topology, wbits, abits, "deploy_" + kind))

    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_run_on_pynq(self, topology, wbits, abits, kind):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "deploy_" + kind)
        model = load_test_checkpoint_or_skip(prev_chkpt_name)  # NOQA
        cfg = get_build_env(kind, target_clk_ns)
        if cfg["ip"] == "":
            pytest.skip("PYNQ board IP address not specified")
        (input_tensor_npy, output_tensor_npy) = get_golden_io_pair(
            topology, wbits, abits, return_topk=1
        )
        parent_model = load_test_checkpoint_or_skip(
            get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
        )
        iname = parent_model.graph.input[0].name
        oname = parent_model.graph.output[0].name
        sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
        sdp_node = getCustomOp(sdp_node)
        sdp_node.set_nodeattr("model", prev_chkpt_name)
        ret = execute_onnx(parent_model, {iname: input_tensor_npy}, True)
        y = ret[oname]
        assert np.isclose(y, output_tensor_npy).all()

    @pytest.mark.parametrize("kind", ["zynq", "alveo"])
    def test_throughput_hw(self, topology, wbits, abits, kind):
        prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "deploy_" + kind)
        end2end_example = "%s_w%da%d_%s" % (topology, wbits, abits, kind)
        model = load_test_checkpoint_or_skip(prev_chkpt_name)  # NOQA
        cfg = get_build_env(kind, target_clk_ns)
        if cfg["ip"] == "":
            pytest.skip("PYNQ board IP address not specified")
        ret = dict()
        # try a range of batch sizes, some may fail due to insufficient DMA
        # buffers
        bsize_range_in = [8 ** i for i in range(5)]
        bsize_range = []
        for bsize in bsize_range_in:
            res = throughput_test_remote(model, bsize)
            if res is not None:
                ret[bsize] = res
                bsize_range.append(bsize)
            else:
                # assume we reached largest possible N
                break
        y = [ret[key]["runtime[ms]"] for key in bsize_range]
        lrret = linregress(bsize_range, y)
        ret_str = ""
        ret_str += "\n" + "%s Throughput Test Results" % end2end_example
        ret_str += "\n" + "-----------------------------"
        ret_str += "\n" + "From linear regression:"
        ret_str += "\n" + "Invocation overhead: %f ms" % lrret.intercept
        ret_str += "\n" + "Time per sample: %f ms" % lrret.slope
        ret_str += "\n" + "Raw data:"

        ret_str += "\n" + "{:<8} {:<16} {:<16} {:<16} {:<16} {:<16}".format(
            "N", "runtime[ms]", "fclk[mhz]", "fps", "DRAM rd[Mb/s]", "DRAM wr[Mb/s]"
        )
        for k in bsize_range:
            v = ret[k]
            ret_str += "\n" + "{:<8} {:<16} {:<16} {:<16} {:<16} {:<16}".format(
                k,
                np.round(v["runtime[ms]"], 4),
                v["fclk[mhz]"],
                np.round(v["throughput[images/s]"], 2),
                np.round(v["DRAM_in_bandwidth[Mb/s]"], 2),
                np.round(v["DRAM_out_bandwidth[Mb/s]"], 2),
            )
        ret_str += "\n" + "-----------------------------"
        warnings.warn(ret_str)