Skip to content
Snippets Groups Projects
Commit a87c734c authored by Yaman Umuroglu's avatar Yaman Umuroglu
Browse files

[Test] generalize top-1 measurement, add own optional test

parent 9a8ba746
No related branches found
No related tags found
No related merge requests found
...@@ -92,14 +92,12 @@ from finn.util.pytorch import ToTensor ...@@ -92,14 +92,12 @@ from finn.util.pytorch import ToTensor
from finn.transformation.merge_onnx_models import MergeONNXModels from finn.transformation.merge_onnx_models import MergeONNXModels
from finn.transformation.insert_topk import InsertTopK from finn.transformation.insert_topk import InsertTopK
from finn.core.datatype import DataType from finn.core.datatype import DataType
import mnist from dataset_loading import mnist, cifar
build_dir = "/tmp/" + os.environ["FINN_INST_NAME"] build_dir = "/tmp/" + os.environ["FINN_INST_NAME"]
target_clk_ns = 10 target_clk_ns = 10
mem_mode = "decoupled" mem_mode = "decoupled"
rtlsim_trace = False rtlsim_trace = False
mnist_test_imgs = mnist.test_images()
mnist_test_labels = mnist.test_labels()
def get_checkpoint_name(topology, wbits, abits, step): def get_checkpoint_name(topology, wbits, abits, step):
...@@ -214,6 +212,59 @@ def get_golden_io_pair(topology, wbits, abits, preproc=ToTensor(), return_topk=N ...@@ -214,6 +212,59 @@ def get_golden_io_pair(topology, wbits, abits, preproc=ToTensor(), return_topk=N
return (input_tensor_npy, output_tensor_npy) return (input_tensor_npy, output_tensor_npy)
def measure_top1_accuracy(model_chkpt, dataset, parent_chkpt=None):
if dataset == "cifar10":
trainx, trainy, testx, testy, valx, valy = cifar.load_cifar_data(
"/workspace/finn/dataset", download=True, one_hot=False
)
elif dataset == "mnist":
trainx, trainy, testx, testy, valx, valy = mnist.load_mnist_data(
"/workspace/finn/dataset", download=True, one_hot=False
)
else:
raise Exception("Unrecognized dataset")
# move from dataset_loader layout to ONNX layout: NHWC -> NCHW
testx = testx.transpose(0, 3, 1, 2)
model = ModelWrapper(model_chkpt)
iname = model.graph.input[0].name
oname = model.graph.output[0].name
if parent_chkpt is None:
ishape = model.get_tensor_shape(iname)
else:
parent_model = ModelWrapper(parent_chkpt)
parent_iname = parent_model.graph.input[0].name
ishape = model.get_tensor_shape(parent_iname)
ok = 0
nok = 0
n_batches = testx.shape[0]
for i in range(n_batches):
tdata = testx[i].reshape(ishape).astype(np.float32)
exp = testy[i].item()
if parent_chkpt is not None:
y = execute_parent(parent_chkpt, model_chkpt, tdata)
else:
y = execute_onnx(model, {iname: tdata}, False)[oname]
ret = y.item()
if ret == exp:
ok += 1
else:
nok += 1
if i % 10 == 0:
print("%d : OK %d NOK %d " % (i, ok, nok))
acc_top1 = ok * 100.0 / (ok + nok)
warnings.warn("Final OK %d NOK %d top-1 %f" % (ok, nok, acc_top1))
return acc_top1
def topology2dataset(topology):
if "fc" in topology:
return "mnist"
elif "cnv" in topology:
return "cifar10"
else:
raise Exception("Unrecognized topology")
@pytest.mark.parametrize("wbits", [1, 2]) @pytest.mark.parametrize("wbits", [1, 2])
@pytest.mark.parametrize("abits", [1, 2]) @pytest.mark.parametrize("abits", [1, 2])
@pytest.mark.parametrize("topology", ["tfc", "cnv"]) @pytest.mark.parametrize("topology", ["tfc", "cnv"])
...@@ -235,7 +286,8 @@ class TestEnd2End: ...@@ -235,7 +286,8 @@ class TestEnd2End:
model = model.transform(GiveReadableTensorNames()) model = model.transform(GiveReadableTensorNames())
model = model.transform(InferDataTypes()) model = model.transform(InferDataTypes())
model = model.transform(RemoveStaticGraphInputs()) model = model.transform(RemoveStaticGraphInputs())
model.save(get_checkpoint_name(topology, wbits, abits, "import_and_tidy")) chkpt = get_checkpoint_name(topology, wbits, abits, "import_and_tidy")
model.save(chkpt)
def test_add_pre_and_postproc(self, topology, wbits, abits): def test_add_pre_and_postproc(self, topology, wbits, abits):
prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "import_and_tidy") prev_chkpt_name = get_checkpoint_name(topology, wbits, abits, "import_and_tidy")
...@@ -256,6 +308,13 @@ class TestEnd2End: ...@@ -256,6 +308,13 @@ class TestEnd2End:
# postprocessing: insert Top-1 node at the end # postprocessing: insert Top-1 node at the end
model = model.transform(InsertTopK(k=1)) model = model.transform(InsertTopK(k=1))
chkpt_name = get_checkpoint_name(topology, wbits, abits, "pre_post") chkpt_name = get_checkpoint_name(topology, wbits, abits, "pre_post")
# tidy-up again
model = model.transform(InferShapes())
model = model.transform(FoldConstants())
model = model.transform(GiveUniqueNodeNames())
model = model.transform(GiveReadableTensorNames())
model = model.transform(InferDataTypes())
model = model.transform(RemoveStaticGraphInputs())
model.save(chkpt_name) model.save(chkpt_name)
assert os.path.isfile(chkpt_name) assert os.path.isfile(chkpt_name)
...@@ -394,33 +453,6 @@ class TestEnd2End: ...@@ -394,33 +453,6 @@ class TestEnd2End:
warnings.warn("Estimated & rtlsim performance: " + str(perf)) warnings.warn("Estimated & rtlsim performance: " + str(perf))
assert np.isclose(y, output_tensor_npy).all() assert np.isclose(y, output_tensor_npy).all()
@pytest.mark.slow
@pytest.mark.parametrize("kind", ["zynq"])
def test_rtlsim_top1(self, topology, wbits, abits, kind):
if "TEST_RTLSIM_TOP1" not in os.environ:
pytest.skip("TEST_RTLSIM_TOP1 not set")
if "fc" not in topology:
pytest.skip("Top-1 rtlsim test currently for MNIST only")
rtlsim_chkpt = get_checkpoint_name(
topology, wbits, abits, "ipstitch_rtlsim_" + kind
)
parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
load_test_checkpoint_or_skip(rtlsim_chkpt)
ok = 0
nok = 0
for i in range(10000):
tdata = mnist_test_imgs[i].reshape(1, 1, 28, 28).astype(np.float32)
exp = mnist_test_labels[i].item()
y = execute_parent(parent_chkpt, rtlsim_chkpt, tdata)
ret = y.item()
if ret == exp:
ok += 1
else:
nok += 1
acc_top1 = ok * 100.0 / (ok + nok)
warnings.warn("Final OK %d NOK %d top-1 %f" % (ok, nok, acc_top1))
assert acc_top1 > 90.0
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.vivado @pytest.mark.vivado
@pytest.mark.parametrize("kind", ["zynq"]) @pytest.mark.parametrize("kind", ["zynq"])
...@@ -439,6 +471,25 @@ class TestEnd2End: ...@@ -439,6 +471,25 @@ class TestEnd2End:
est_cycles = latency + cycles_per_sample_est * batchsize est_cycles = latency + cycles_per_sample_est * batchsize
assert (abs(res_cycles - est_cycles) / res_cycles) < 0.15 assert (abs(res_cycles - est_cycles) / res_cycles) < 0.15
@pytest.mark.slow
@pytest.mark.vivado
@pytest.mark.parametrize("kind", ["zynq"])
def test_validate_top1(self, topology, wbits, abits, kind):
if "TEST_END2END_VALIDATE_TOP1" not in os.environ:
pytest.skip("TEST_END2END_VALIDATE_TOP1 not set")
prepostproc_chkpt = get_checkpoint_name(topology, wbits, abits, "pre_post")
streamline_chkpt = get_checkpoint_name(topology, wbits, abits, "streamline")
parent_chkpt = get_checkpoint_name(topology, wbits, abits, "dataflow_parent")
cppsim_chkpt = get_checkpoint_name(topology, wbits, abits, "cppsim")
rtlsim_chkpt = get_checkpoint_name(
topology, wbits, abits, "ipstitch_rtlsim_" + kind
)
dataset = topology2dataset(topology)
assert measure_top1_accuracy(prepostproc_chkpt, dataset) > 80
assert measure_top1_accuracy(streamline_chkpt, dataset) > 80
assert measure_top1_accuracy(cppsim_chkpt, dataset, parent_chkpt) > 80
assert measure_top1_accuracy(rtlsim_chkpt, dataset, parent_chkpt) > 80
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.vivado @pytest.mark.vivado
@pytest.mark.vitis @pytest.mark.vitis
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment