Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# Copyright (c) 2020, Xilinx
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of FINN nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import pytest
import numpy as np
# as of Feb'20 there is a bug that segfaults ONNX shape inference if we
# import pytorch before onnx, so we make sure to import onnx first
import onnx # NOQA
import torch
import brevitas.onnx as bo
import finn.transformation.fpgadataflow.convert_to_hls_layers as to_hls
import finn.transformation.streamline.absorb as absorb
from finn.core.onnx_exec import execute_onnx
from finn.custom_op.registry import getCustomOp
from finn.transformation.bipolar_to_xnor import ConvertBipolarMatMulToXnorPopcount
from finn.transformation.fold_constants import FoldConstants
from finn.transformation.fpgadataflow.create_dataflow_partition import (
CreateDataflowPartition,
)
from finn.transformation.fpgadataflow.make_deployment import DeployToPYNQ
from finn.transformation.general import (
RemoveUnusedTensors,
RemoveStaticGraphInputs,
GiveReadableTensorNames,
GiveUniqueNodeNames,
)
from finn.transformation.infer_datatypes import InferDataTypes
from finn.transformation.infer_shapes import InferShapes
from finn.transformation.streamline import Streamline
from finn.util.test import (
get_build_env,
load_test_checkpoint_or_skip,
get_example_input,
get_trained_network_and_ishape,
execute_parent,
from finn.transformation.fpgadataflow.annotate_resources import AnnotateResources
from finn.transformation.infer_data_layouts import InferDataLayouts
from finn.transformation.move_reshape import RemoveCNVtoFCFlatten
from finn.transformation.lower_convs_to_matmul import LowerConvsToMatMul
from finn.transformation.streamline.reorder import (
MakeMaxPoolNHWC,
MoveScalarLinearPastInvariants,
)
from finn.transformation.fpgadataflow.prepare_ip import PrepareIP
from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP
from finn.transformation.fpgadataflow.prepare_cppsim import PrepareCppSim
from finn.transformation.fpgadataflow.compile_cppsim import CompileCppSim
from finn.transformation.fpgadataflow.set_exec_mode import SetExecMode
from finn.transformation.fpgadataflow.create_stitched_ip import CreateStitchedIP
from finn.transformation.fpgadataflow.prepare_rtlsim import PrepareRTLSim
from finn.transformation.fpgadataflow.insert_dwc import InsertDWC
from finn.transformation.fpgadataflow.annotate_cycles import AnnotateCycles
from finn.transformation.fpgadataflow.set_fifo_depths import InsertAndSetFIFODepths
from finn.analysis.fpgadataflow.dataflow_performance import dataflow_performance
from finn.core.modelwrapper import ModelWrapper
from scipy.stats import linregress
from finn.core.throughput_test import throughput_test_remote, throughput_test_rtlsim
from finn.util.pytorch import ToTensor
from finn.transformation.merge_onnx_models import MergeONNXModels
from finn.transformation.insert_topk import InsertTopK
from finn.core.datatype import DataType
from dataset_loading import mnist, cifar
from datetime import datetime
from finn.util.gdrive import upload_to_end2end_dashboard
from collections import OrderedDict
build_dir = os.environ["FINN_BUILD_DIR"]
target_clk_ns = 10
mem_mode = "decoupled"
def get_checkpoint_name(topology, wbits, abits, step):
return build_dir + "/end2end_%s_w%da%d_%s.onnx" % (topology, wbits, abits, step)
def get_dashboard_data(topology, wbits, abits):
stats_file = build_dir + "/end2end_%s_w%da%d.txt" % (topology, wbits, abits)
stats_dict = OrderedDict()
if os.path.isfile(stats_file):
with open(stats_file, "r") as f:
stats_dict_txt = f.read()
stats_dict = eval(stats_dict_txt)
return stats_dict
def update_dashboard_data(topology, wbits, abits, key, val):
stats_dict = get_dashboard_data(topology, wbits, abits)
stats_file = build_dir + "/end2end_%s_w%da%d.txt" % (topology, wbits, abits)
with open(stats_file, "w") as f:
f.write(str(stats_dict))
def fold_tfc(model):
fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
# (PE, SIMD, ramstyle) for each layer
config = [(16, 49, "block"), (8, 8, "auto"), (8, 8, "auto"), (10, 8, "distributed")]
for fcl, (pe, simd, ramstyle) in zip(fc_layers, config):
fcl_inst = getCustomOp(fcl)
fcl_inst.set_nodeattr("PE", pe)
fcl_inst.set_nodeattr("SIMD", simd)
fcl_inst.set_nodeattr("ram_style", ramstyle)
# set parallelism for input quantizer to be same as first layer's SIMD
inp_qnt_node = model.get_nodes_by_op_type("Thresholding_Batch")[0]
inp_qnt = getCustomOp(inp_qnt_node)
inp_qnt.set_nodeattr("PE", 49)
inp_qnt.set_nodeattr("mem_mode", "decoupled")
inp_qnt.set_nodeattr("runtime_writeable_weights", 1)
return model
def fold_lfc(model):
fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
# (PE, SIMD, ramstyle) for each layer
config = [
(32, 49, "block"),
(64, 32, "auto"),
(32, 64, "auto"),
(10, 8, "distributed"),
]
for fcl, (pe, simd, ramstyle) in zip(fc_layers, config):
fcl_inst = getCustomOp(fcl)
fcl_inst.set_nodeattr("PE", pe)
fcl_inst.set_nodeattr("SIMD", simd)
fcl_inst.set_nodeattr("ram_style", ramstyle)
fcl_inst.set_nodeattr("runtime_writeable_weights", 1)
# set parallelism for input quantizer to be same as first layer's SIMD
inp_qnt_node = model.get_nodes_by_op_type("Thresholding_Batch")[0]
inp_qnt = getCustomOp(inp_qnt_node)
inp_qnt.set_nodeattr("PE", 49)
return model
def fold_cnv_large(model):
fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
# each tuple is (PE, SIMD) for a layer
(16, 3),
(32, 32),
(16, 32),
(16, 32),
(4, 32),
(1, 32),
(1, 4),
(1, 8),
(5, 1),
for fcl, (pe, simd) in zip(fc_layers, folding):
fcl_inst = getCustomOp(fcl)
fcl_inst.set_nodeattr("PE", pe)
fcl_inst.set_nodeattr("SIMD", simd)
swg_layers = model.get_nodes_by_op_type("ConvolutionInputGenerator")
for i in range(len(swg_layers)):
swg_inst = getCustomOp(swg_layers[i])
simd = folding[i][1]
swg_inst.set_nodeattr("SIMD", simd)
return model
def fold_cnv_small(model):
fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
# each tuple is (PE, SIMD) for a layer
(8, 3, "auto"),
(16, 16, "auto"),
(8, 16, "auto"),
(8, 16, "block"),
(4, 8, "auto"),
(1, 8, "auto"),
(1, 2, "distributed"),
(2, 2, "block"),
(5, 1, "distributed"),
for fcl, (pe, simd, ramstyle) in zip(fc_layers, folding):
fcl_inst = getCustomOp(fcl)
fcl_inst.set_nodeattr("PE", pe)
fcl_inst.set_nodeattr("SIMD", simd)
fcl_inst.set_nodeattr("ram_style", ramstyle)
swg_layers = model.get_nodes_by_op_type("ConvolutionInputGenerator")
for i in range(len(swg_layers)):
swg_inst = getCustomOp(swg_layers[i])
simd = folding[i][1]
swg_inst.set_nodeattr("SIMD", simd)
return model
def get_folding_function(topology, wbits, abits):
if "tfc" in topology:
return fold_tfc
elif "lfc" in topology:
return fold_lfc
elif "cnv" in topology:
if wbits == 1 and abits == 1:
return fold_cnv_large
else:
return fold_cnv_small
else:
raise Exception("Unknown topology/quantization combo for predefined folding")
def get_golden_io_pair(topology, wbits, abits, preproc=ToTensor(), return_topk=None):
(model, ishape) = get_trained_network_and_ishape(topology, wbits, abits)
input_tensor_npy = get_example_input(topology)
input_tensor_torch = torch.from_numpy(input_tensor_npy).float()
if preproc is not None:
input_tensor_torch = preproc.forward(input_tensor_torch).detach()
output_tensor_npy = model.forward(input_tensor_torch).detach().numpy()
if return_topk is not None:
output_tensor_npy = get_topk(output_tensor_npy, k=return_topk)
return (input_tensor_npy, output_tensor_npy)
def measure_top1_accuracy(model_chkpt, dataset, parent_chkpt=None):
if dataset == "cifar10":
trainx, trainy, testx, testy, valx, valy = cifar.load_cifar_data(
"/workspace/finn/dataset", download=True, one_hot=False
)
elif dataset == "mnist":
trainx, trainy, testx, testy, valx, valy = mnist.load_mnist_data(
"/workspace/finn/dataset", download=True, one_hot=False
)
else:
raise Exception("Unrecognized dataset")
# move from dataset_loader layout to ONNX layout: NHWC -> NCHW
testx = testx.transpose(0, 3, 1, 2)
model = ModelWrapper(model_chkpt)
iname = model.graph.input[0].name
oname = model.graph.output[0].name
if parent_chkpt is None:
ishape = model.get_tensor_shape(iname)
else:
parent_model = ModelWrapper(parent_chkpt)
parent_iname = parent_model.graph.input[0].name
ishape = parent_model.get_tensor_shape(parent_iname)
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
ok = 0
nok = 0
n_batches = testx.shape[0]
for i in range(n_batches):
tdata = testx[i].reshape(ishape).astype(np.float32)
exp = testy[i].item()
if parent_chkpt is not None:
y = execute_parent(parent_chkpt, model_chkpt, tdata)
else:
y = execute_onnx(model, {iname: tdata}, False)[oname]
ret = y.item()
if ret == exp:
ok += 1
else:
nok += 1
if i % 10 == 0:
print("%d : OK %d NOK %d " % (i, ok, nok))
acc_top1 = ok * 100.0 / (ok + nok)
warnings.warn("Final OK %d NOK %d top-1 %f" % (ok, nok, acc_top1))
return acc_top1
def topology2dataset(topology):
if "fc" in topology:
return "mnist"
elif "cnv" in topology:
return "cifar10"
else:
raise Exception("Unrecognized topology")
@pytest.mark.parametrize("wbits", [1, 2])
@pytest.mark.parametrize("abits", [1, 2])
@pytest.mark.parametrize("topology", ["lfc", "tfc", "cnv"])
class TestEnd2End:
def test_export(self, topology, wbits, abits):
if wbits > abits:
pytest.skip("No wbits > abits end2end network configs for now")
if topology == "lfc" and not (wbits == 1 and abits == 1):
pytest.skip("Skipping certain lfc configs")
(model, ishape) = get_trained_network_and_ishape(topology, wbits, abits)
chkpt_name = get_checkpoint_name(topology, wbits, abits, "export")
bo.export_finn_onnx(model, ishape, chkpt_name)
nname = "%s_w%da%d" % (topology, wbits, abits)
update_dashboard_data(topology, wbits, abits, "network", nname)
dtstr = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
update_dashboard_data(topology, wbits, abits, "datetime", dtstr)
finn_commit = subprocess.check_output(
["git", "rev-parse", "HEAD"], cwd="/workspace/finn"
)
finn_commit = finn_commit.decode("utf-8").strip()
update_dashboard_data(topology, wbits, abits, "finn-commit", finn_commit)
assert os.path.isfile(chkpt_name)
def test_import_and_tidy(self, topology, wbits, abits):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "export")
model = load_test_checkpoint_or_skip(prev_chkpt_name)
model = model.transform(InferShapes())
model = model.transform(FoldConstants())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(GiveReadableTensorNames())
model = model.transform(InferDataTypes())
model = model.transform(RemoveStaticGraphInputs())
chkpt = get_checkpoint_name(topology, wbits, abits, "import_and_tidy")
model.save(chkpt)
def test_add_pre_and_postproc(self, topology, wbits, abits):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "import_and_tidy")
model = load_test_checkpoint_or_skip(prev_chkpt_name)
global_inp_name = model.graph.input[0].name
ishape = model.get_tensor_shape(global_inp_name)
# preprocessing: torchvision's ToTensor divides uint8 inputs by 255
totensor_pyt = ToTensor()
chkpt_preproc_name = get_checkpoint_name(topology, wbits, abits, "preproc")
bo.export_finn_onnx(totensor_pyt, ishape, chkpt_preproc_name)
assert os.path.isfile(chkpt_preproc_name)
# join preprocessing and core model
pre_model = ModelWrapper(chkpt_preproc_name)
pre_model = pre_model.transform(InferShapes())
pre_model = pre_model.transform(FoldConstants())
model = model.transform(MergeONNXModels(pre_model))
# add input quantization annotation: UINT8 for all BNN-PYNQ models
global_inp_name = model.graph.input[0].name
model.set_tensor_datatype(global_inp_name, DataType.UINT8)
# postprocessing: insert Top-1 node at the end
model = model.transform(InsertTopK(k=1))
chkpt_name = get_checkpoint_name(topology, wbits, abits, "pre_post")
# tidy-up again
model = model.transform(InferShapes())
model = model.transform(FoldConstants())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(GiveReadableTensorNames())
model = model.transform(InferDataTypes())
model = model.transform(RemoveStaticGraphInputs())
model.save(chkpt_name)
assert os.path.isfile(chkpt_name)
def test_streamline(self, topology, wbits, abits):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "pre_post")
model = load_test_checkpoint_or_skip(prev_chkpt_name)
model = model.transform(absorb.AbsorbSignBiasIntoMultiThreshold())
# move past any reshapes to be able to streamline input scaling
model = model.transform(MoveScalarLinearPastInvariants())
model = model.transform(Streamline())
if "fc" not in topology:
model = model.transform(LowerConvsToMatMul())
model = model.transform(MakeMaxPoolNHWC())
model = model.transform(absorb.AbsorbTransposeIntoMultiThreshold())
model = model.transform(ConvertBipolarMatMulToXnorPopcount())
model = model.transform(Streamline())
# absorb final add-mul nodes into TopK
model = model.transform(absorb.AbsorbScalarMulAddIntoTopK())
model = model.transform(InferDataLayouts())
model = model.transform(RemoveUnusedTensors())
model.save(get_checkpoint_name(topology, wbits, abits, "streamline"))
def test_convert_to_hls_layers(self, topology, wbits, abits):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "streamline")
model = load_test_checkpoint_or_skip(prev_chkpt_name)
if topology == "tfc" and wbits == 1 and abits == 1:
# use standalone thresholds for tfc-w1a1 to also exercise that option
model = model.transform(to_hls.InferThresholdingLayer())
# needed for bipolar MatMul layers
model = model.transform(to_hls.InferBinaryStreamingFCLayer(mem_mode))
# needed for non-bipolar MatMul layers
model = model.transform(to_hls.InferQuantizedStreamingFCLayer(mem_mode))
# TopK to LabelSelect
model = model.transform(to_hls.InferLabelSelectLayer())
# input quantization (if any) to standalone thresholding
model = model.transform(to_hls.InferThresholdingLayer())
# needed for convolutions
if "fc" not in topology:
model = model.transform(to_hls.InferConvInpGen())
model = model.transform(to_hls.InferStreamingMaxPool())
model = model.transform(RemoveCNVtoFCFlatten())
# get rid of Tranpose -> Tranpose identity seq
model = model.transform(absorb.AbsorbConsecutiveTransposes())
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
model = model.transform(GiveUniqueNodeNames())
model = model.transform(InferDataLayouts())
model.save(get_checkpoint_name(topology, wbits, abits, "convert_to_hls_layers"))
def test_create_dataflow_partition(self, topology, wbits, abits):
prev_chkpt_name = get_checkpoint_name(
topology, wbits, abits, "convert_to_hls_layers"
)
model = load_test_checkpoint_or_skip(prev_chkpt_name)
parent_model = model.transform(CreateDataflowPartition())
parent_model_chkpt = get_checkpoint_name(
topology, wbits, abits, "dataflow_parent"
)
parent_model.save(parent_model_chkpt)
sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
sdp_node = getCustomOp(sdp_node)
dataflow_model_filename = sdp_node.get_nodeattr("model")
dataflow_model = load_test_checkpoint_or_skip(dataflow_model_filename)
dataflow_model_chkpt = get_checkpoint_name(
topology, wbits, abits, "dataflow_model"
)
dataflow_model.save(dataflow_model_chkpt)
def test_fold(self, topology, wbits, abits):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "dataflow_model")
model = load_test_checkpoint_or_skip(prev_chkpt_name)
folding_fxn = get_folding_function(topology, wbits, abits)
model = folding_fxn(model)
model.save(get_checkpoint_name(topology, wbits, abits, "fold"))
@pytest.mark.slow
@pytest.mark.vivado
def test_cppsim(self, topology, wbits, abits):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "fold")
model = load_test_checkpoint_or_skip(prev_chkpt_name)
model = model.transform(PrepareCppSim())
model = model.transform(CompileCppSim())
model = model.transform(SetExecMode("cppsim"))
cppsim_chkpt = get_checkpoint_name(topology, wbits, abits, "cppsim")
model.save(cppsim_chkpt)
parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
(input_tensor_npy, output_tensor_npy) = get_golden_io_pair(
topology, wbits, abits, return_topk=1
)
y = execute_parent(parent_chkpt, cppsim_chkpt, input_tensor_npy)
assert np.isclose(y, output_tensor_npy).all()
@pytest.mark.slow
@pytest.mark.vivado
@pytest.mark.parametrize("kind", ["zynq", "alveo"])
def test_ipgen(self, topology, wbits, abits, kind):
if kind == "alveo" and ("VITIS_PATH" not in os.environ):
pytest.skip("VITIS_PATH not set")
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "fold")
model = load_test_checkpoint_or_skip(prev_chkpt_name)
test_fpga_part = get_build_env(kind, target_clk_ns)["part"]
model = model.transform(GiveUniqueNodeNames())
model = model.transform(PrepareIP(test_fpga_part, target_clk_ns))
model = model.transform(HLSSynthIP())
model.save(get_checkpoint_name(topology, wbits, abits, "ipgen_" + kind))
@pytest.mark.slow
@pytest.mark.vivado
@pytest.mark.parametrize("kind", ["zynq", "alveo"])
def test_set_fifo_depths(self, topology, wbits, abits, kind):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "ipgen_" + kind)
model = load_test_checkpoint_or_skip(prev_chkpt_name)
test_fpga_part = get_build_env(kind, target_clk_ns)["part"]
model = model.transform(InsertAndSetFIFODepths(test_fpga_part, target_clk_ns))
fifo_layers = model.get_nodes_by_op_type("StreamingFIFO")
assert len(fifo_layers) > 0
hls_layers = model.get_finn_nodes()
for node in hls_layers:
if node.op_type != "StreamingFIFO":
op_inst = getCustomOp(node)
assert op_inst.get_nodeattr("inFIFODepth") == 0
assert op_inst.get_nodeattr("outFIFODepth") == 0
model.save(get_checkpoint_name(topology, wbits, abits, "fifodepth_" + kind))
@pytest.mark.slow
@pytest.mark.vivado
@pytest.mark.parametrize("kind", ["zynq"])
def test_ipstitch_rtlsim(self, topology, wbits, abits, kind):
prev_chkpt_name = get_checkpoint_name(
topology, wbits, abits, "fifodepth_" + kind
)
model = load_test_checkpoint_or_skip(prev_chkpt_name)
test_fpga_part = get_build_env(kind, target_clk_ns)["part"]
model = model.transform(InsertDWC())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(AnnotateCycles())
perf = model.analysis(dataflow_performance)
latency = perf["critical_path_cycles"]
# rtlsim only supports impl_style=rtl for StreamingFIFO, ensure that
for fifo_layer in model.get_nodes_by_op_type("StreamingFIFO"):
getCustomOp(fifo_layer).set_nodeattr("impl_style", "rtl")
model = model.transform(PrepareIP(test_fpga_part, target_clk_ns))
model = model.transform(HLSSynthIP())
model = model.transform(CreateStitchedIP(test_fpga_part, target_clk_ns))
model = model.transform(PrepareRTLSim())
model.set_metadata_prop("exec_mode", "rtlsim")
os.environ["LIVENESS_THRESHOLD"] = str(int(latency * 1.1))
if rtlsim_trace:
model.set_metadata_prop(
"rtlsim_trace", "%s_w%da%d.vcd" % (topology, wbits, abits)
)
os.environ["RTLSIM_TRACE_DEPTH"] = "3"
rtlsim_chkpt = get_checkpoint_name(
topology, wbits, abits, "ipstitch_rtlsim_" + kind
)
model.save(rtlsim_chkpt)
parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
(input_tensor_npy, output_tensor_npy) = get_golden_io_pair(
topology, wbits, abits, return_topk=1
)
y = execute_parent(parent_chkpt, rtlsim_chkpt, input_tensor_npy)
model = ModelWrapper(rtlsim_chkpt)
perf["cycles_rtlsim"] = model.get_metadata_prop("cycles_rtlsim")
# warnings.warn("Estimated & rtlsim performance: " + str(perf))
# for (k, v) in perf.items():
# update_dashboard_data(topology, wbits, abits, k, v)
update_dashboard_data(
topology, wbits, abits, "cycles_rtlsim", perf["cycles_rtlsim"]
)
assert np.isclose(y, output_tensor_npy).all()
@pytest.mark.parametrize("kind", ["zynq"])
def test_throughput_rtlsim(self, topology, wbits, abits, kind):
prev_chkpt_name = get_checkpoint_name(
topology, wbits, abits, "ipstitch_rtlsim_" + kind
)
model = load_test_checkpoint_or_skip(prev_chkpt_name)
n_nodes = len(model.graph.node)
perf_est = model.analysis(dataflow_performance)
latency = int(model.get_metadata_prop("cycles_rtlsim"))
cycles_per_sample_est = perf_est["max_cycles"]
batchsize = 2 * n_nodes
ret = throughput_test_rtlsim(model, batchsize=batchsize)
res_cycles = ret["cycles"]
est_cycles = latency + cycles_per_sample_est * batchsize
assert (abs(res_cycles - est_cycles) / res_cycles) < 0.15
@pytest.mark.slow
@pytest.mark.vivado
@pytest.mark.parametrize("kind", ["zynq"])
def test_validate_top1(self, topology, wbits, abits, kind):
if "TEST_END2END_VALIDATE_TOP1" not in os.environ:
pytest.skip("TEST_END2END_VALIDATE_TOP1 not set")
prepostproc_chkpt = get_checkpoint_name(topology, wbits, abits, "pre_post")
streamline_chkpt = get_checkpoint_name(topology, wbits, abits, "streamline")
parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
cppsim_chkpt = get_checkpoint_name(topology, wbits, abits, "cppsim")
rtlsim_chkpt = get_checkpoint_name(
topology, wbits, abits, "ipstitch_rtlsim_" + kind
)
dataset = topology2dataset(topology)
assert measure_top1_accuracy(prepostproc_chkpt, dataset) > 80
assert measure_top1_accuracy(streamline_chkpt, dataset) > 80
assert measure_top1_accuracy(cppsim_chkpt, dataset, parent_chkpt) > 80
assert measure_top1_accuracy(rtlsim_chkpt, dataset, parent_chkpt) > 80
@pytest.mark.vivado
@pytest.mark.vitis
@pytest.mark.parametrize("kind", ["zynq", "alveo"])
def test_build(self, topology, wbits, abits, kind):
if kind == "alveo" and ("VITIS_PATH" not in os.environ):
pytest.skip("VITIS_PATH not set")
prev_chkpt_name = get_checkpoint_name(
topology, wbits, abits, "fifodepth_" + kind
)
model = load_test_checkpoint_or_skip(prev_chkpt_name)
cfg = get_build_env(kind, target_clk_ns)
model = model.transform(cfg["build_fxn"])
model = model.transform(AnnotateResources("synth"))
synth_dct = eval(model.get_metadata_prop("res_total_top_synth"))
for (k, v) in synth_dct.items():
update_dashboard_data(topology, wbits, abits, k, v)
update_dashboard_data(topology, wbits, abits, "board", cfg["board"])
model.save(get_checkpoint_name(topology, wbits, abits, "build_" + kind))
@pytest.mark.parametrize("kind", ["zynq", "alveo"])
def test_deploy(self, topology, wbits, abits, kind):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "build_" + kind)
model = load_test_checkpoint_or_skip(prev_chkpt_name)
cfg = get_build_env(kind, target_clk_ns)
pytest.skip("PYNQ board IP address not specified")
model = model.transform(
DeployToPYNQ(
cfg["ip"],
cfg["port"],
cfg["username"],
cfg["password"],
cfg["target_dir"],
)
)
# save the model to be able to link it to the parent
model.save(get_checkpoint_name(topology, wbits, abits, "deploy_" + kind))
@pytest.mark.parametrize("kind", ["zynq", "alveo"])
def test_run_on_hw(self, topology, wbits, abits, kind):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "deploy_" + kind)
model = load_test_checkpoint_or_skip(prev_chkpt_name) # NOQA
cfg = get_build_env(kind, target_clk_ns)
pytest.skip("PYNQ board IP address not specified")
(input_tensor_npy, output_tensor_npy) = get_golden_io_pair(
topology, wbits, abits, return_topk=1
)
parent_model = load_test_checkpoint_or_skip(
get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
)
iname = parent_model.graph.input[0].name
oname = parent_model.graph.output[0].name
sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
sdp_node = getCustomOp(sdp_node)
sdp_node.set_nodeattr("model", prev_chkpt_name)
ret = execute_onnx(parent_model, {iname: input_tensor_npy}, True)
y = ret[oname]
assert np.isclose(y, output_tensor_npy).all()
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
@pytest.mark.parametrize("kind", ["zynq", "alveo"])
def test_throughput_hw(self, topology, wbits, abits, kind):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "deploy_" + kind)
end2end_example = "%s_w%da%d_%s" % (topology, wbits, abits, kind)
model = load_test_checkpoint_or_skip(prev_chkpt_name) # NOQA
cfg = get_build_env(kind, target_clk_ns)
if cfg["ip"] == "":
pytest.skip("PYNQ board IP address not specified")
ret = dict()
# try a range of batch sizes, some may fail due to insufficient DMA
# buffers
bsize_range_in = [8 ** i for i in range(5)]
bsize_range = []
for bsize in bsize_range_in:
res = throughput_test_remote(model, bsize)
if res is not None:
ret[bsize] = res
bsize_range.append(bsize)
else:
# assume we reached largest possible N
break
y = [ret[key]["runtime[ms]"] for key in bsize_range]
lrret = linregress(bsize_range, y)
ret_str = ""
ret_str += "\n" + "%s Throughput Test Results" % end2end_example
ret_str += "\n" + "-----------------------------"
ret_str += "\n" + "From linear regression:"
ret_str += "\n" + "Invocation overhead: %f ms" % lrret.intercept
ret_str += "\n" + "Time per sample: %f ms" % lrret.slope
ret_str += "\n" + "Raw data:"
ret_str += "\n" + "{:<8} {:<16} {:<16} {:<16} {:<16} {:<16}".format(
"N", "runtime[ms]", "fclk[mhz]", "fps", "DRAM rd[Mb/s]", "DRAM wr[Mb/s]"
)
for k in bsize_range:
v = ret[k]
ret_str += "\n" + "{:<8} {:<16} {:<16} {:<16} {:<16} {:<16}".format(
k,
np.round(v["runtime[ms]"], 4),
v["fclk[mhz]"],
np.round(v["throughput[images/s]"], 2),
np.round(v["DRAM_in_bandwidth[Mb/s]"], 2),
np.round(v["DRAM_out_bandwidth[Mb/s]"], 2),
)
ret_str += "\n" + "-----------------------------"
warnings.warn(ret_str)
largest_bsize = bsize_range[-1]
update_dashboard_data(
topology, wbits, abits, "fclk[mhz]", ret[largest_bsize]["fclk[mhz]"]
)
update_dashboard_data(
topology,
wbits,
abits,
"throughput[images/s]",
ret[largest_bsize]["throughput[images/s]"],
)
def test_upload_results_to_dashboard(self, topology, wbits, abits):
dashboard_data = get_dashboard_data(topology, wbits, abits)
if len(dashboard_data.keys()) > 0:
upload_to_end2end_dashboard(dashboard_data)
else:
pytest.skip("No data to upload to dashboard")