Skip to content
Snippets Groups Projects
Commit 8e07cf5e authored by Yaman Umuroglu's avatar Yaman Umuroglu
Browse files

[Test] add a make_single_fclayer_modelwrapper helper fxn

there's lots more testing needed for StreamingFC, so this will
come in handy when writing those new tests
parent 5c668735
No related branches found
No related tags found
No related merge requests found
......@@ -4,20 +4,42 @@ from onnx import TensorProto, helper
import finn.core.onnx_exec as oxe
from finn.core.datatype import DataType
from finn.core.modelwrapper import ModelWrapper
from finn.core.utils import interleave_matrix_outer_dim_from_partitions
from finn.custom_op.multithreshold import MultiThreshold
def test_fpgadataflow_fclayer_all_bipolar():
mh = 4
mw = 4
pe = 4
simd = 4
def make_single_fclayer_modelwrapper(W, T, pe, simd, wdt, idt, tdt, odt):
mh = W.shape[0]
mw = W.shape[1]
assert mh % pe == 0
assert mw % simd == 0
wmem = mw * mh // (pe * simd)
n_thres_steps = 1
assert mw * mh == wmem * pe * simd
nf = mh // pe
tmem = nf
sf = mw // simd
# distribute rows between PEs
W_reshaped = interleave_matrix_outer_dim_from_partitions(W, pe)
# create SIMD as innermost dimension
W_reshaped = W_reshaped.reshape(pe, wmem, simd)
if T is not None:
assert T.shape[0] == 1 or T.shape[0] == mh
n_thres_steps = T.shape[1]
# if single global threshold specified, repeat along channels
if T.shape[0] == 1:
T = np.tile(T, (mh, 1))
# distribute T rows between PEs
T_reshaped = interleave_matrix_outer_dim_from_partitions(T, pe)
assert T_reshaped.shape[0] == pe
assert T_reshaped.shape[1] == tmem
assert T_reshaped.shape[2] == n_thres_steps
else:
n_thres_steps = 0
if wdt == DataType.BIPOLAR and idt == DataType.BIPOLAR:
rdt = "Recast<XnorMul>"
else:
assert "Weight & input datatype combo not yet supported"
inp = helper.make_tensor_value_info("inp", TensorProto.FLOAT, [1, sf, simd])
outp = helper.make_tensor_value_info("outp", TensorProto.FLOAT, [1, nf, pe])
FCLayer_node = helper.make_node(
......@@ -31,9 +53,8 @@ def test_fpgadataflow_fclayer_all_bipolar():
MH=mh,
SIMD=simd,
PE=pe,
resDataType="Recast<XnorMul>",
resDataType=rdt,
)
graph = helper.make_graph(
nodes=[FCLayer_node],
name="fclayer_graph",
......@@ -52,33 +73,38 @@ def test_fpgadataflow_fclayer_all_bipolar():
model = helper.make_model(graph, producer_name="fclayer-model")
model = ModelWrapper(model)
# set the tensor datatypes (in this case: all to bipolar)
for tensor in graph.input:
model.set_tensor_datatype(tensor.name, DataType["BIPOLAR"])
for tensor in graph.output:
model.set_tensor_datatype(tensor.name, DataType["BIPOLAR"])
model.set_tensor_datatype("inp", idt)
model.set_tensor_datatype("outp", odt)
model.set_tensor_datatype("weights", wdt)
model.set_initializer("weights", W_reshaped)
if T is not None:
model.set_tensor_datatype("thresh", tdt)
model.set_initializer("thresh", T_reshaped)
return model
# generate input data
x = np.random.randint(2, size=mw)
input_tensor = (np.asarray(x, dtype=np.float32)).reshape(1, sf, simd)
input_dict = {"inp": input_tensor}
def test_fpgadataflow_fclayer_all_bipolar():
mh = 4
mw = 4
pe = 4
simd = 4
wdt = idt = odt = DataType.BIPOLAR
tdt = DataType.UINT32
# generate weights
W = np.random.randint(2, size=(mh, mw))
weights_tensor = (np.asarray(W, dtype=np.float32)).reshape(pe, wmem, simd)
input_dict["weights"] = weights_tensor
# generate threshold activation
T = np.zeros(mh)
thresh_tensor = (np.asarray(T, dtype=np.float32)).reshape(pe, tmem, n_thres_steps)
input_dict["thresh"] = thresh_tensor
output_dict = oxe.execute_onnx(model, input_dict)
# single global threshold at zero
T = np.zeros((1, 1))
model = make_single_fclayer_modelwrapper(W, T, pe, simd, wdt, idt, tdt, odt)
# generate input data
x = np.random.randint(2, size=mw)
ishape = model.get_tensor_shape("inp")
input_tensor = (np.asarray(x, dtype=np.float32)).reshape(*ishape)
input_dict = {"inp": input_tensor}
produced = oxe.execute_onnx(model, input_dict)["outp"]
# convert to bipolar values
Wb = 2 * W - 1
xb = 2 * x - 1
yb = np.dot(Wb, xb)
thres = MultiThreshold()
expected = thres._execute(yb.reshape(1, mh), T.reshape((mh, 1)))
assert (output_dict["outp"] == expected).all()
expected = thres._execute(yb.reshape(1, mh), T)
assert (produced == expected).all()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment