diff --git a/tests/fpgadataflow/test_fpgadataflow_fclayer.py b/tests/fpgadataflow/test_fpgadataflow_fclayer.py index 325536e5a25c27b94fe345de3c6307be90524c42..b0bda47795a8ccd73caee34da06332f64cdf48b7 100644 --- a/tests/fpgadataflow/test_fpgadataflow_fclayer.py +++ b/tests/fpgadataflow/test_fpgadataflow_fclayer.py @@ -4,20 +4,42 @@ from onnx import TensorProto, helper import finn.core.onnx_exec as oxe from finn.core.datatype import DataType from finn.core.modelwrapper import ModelWrapper +from finn.core.utils import interleave_matrix_outer_dim_from_partitions from finn.custom_op.multithreshold import MultiThreshold -def test_fpgadataflow_fclayer_all_bipolar(): - mh = 4 - mw = 4 - pe = 4 - simd = 4 +def make_single_fclayer_modelwrapper(W, T, pe, simd, wdt, idt, tdt, odt): + mh = W.shape[0] + mw = W.shape[1] + assert mh % pe == 0 + assert mw % simd == 0 wmem = mw * mh // (pe * simd) - n_thres_steps = 1 assert mw * mh == wmem * pe * simd nf = mh // pe tmem = nf sf = mw // simd + # distribute rows between PEs + W_reshaped = interleave_matrix_outer_dim_from_partitions(W, pe) + # create SIMD as innermost dimension + W_reshaped = W_reshaped.reshape(pe, wmem, simd) + if T is not None: + assert T.shape[0] == 1 or T.shape[0] == mh + n_thres_steps = T.shape[1] + # if single global threshold specified, repeat along channels + if T.shape[0] == 1: + T = np.tile(T, (mh, 1)) + # distribute T rows between PEs + T_reshaped = interleave_matrix_outer_dim_from_partitions(T, pe) + assert T_reshaped.shape[0] == pe + assert T_reshaped.shape[1] == tmem + assert T_reshaped.shape[2] == n_thres_steps + else: + n_thres_steps = 0 + if wdt == DataType.BIPOLAR and idt == DataType.BIPOLAR: + rdt = "Recast<XnorMul>" + else: + assert "Weight & input datatype combo not yet supported" + inp = helper.make_tensor_value_info("inp", TensorProto.FLOAT, [1, sf, simd]) outp = helper.make_tensor_value_info("outp", TensorProto.FLOAT, [1, nf, pe]) FCLayer_node = helper.make_node( @@ -31,9 +53,8 @@ def test_fpgadataflow_fclayer_all_bipolar(): MH=mh, SIMD=simd, PE=pe, - resDataType="Recast<XnorMul>", + resDataType=rdt, ) - graph = helper.make_graph( nodes=[FCLayer_node], name="fclayer_graph", @@ -52,33 +73,38 @@ def test_fpgadataflow_fclayer_all_bipolar(): model = helper.make_model(graph, producer_name="fclayer-model") model = ModelWrapper(model) - # set the tensor datatypes (in this case: all to bipolar) - for tensor in graph.input: - model.set_tensor_datatype(tensor.name, DataType["BIPOLAR"]) - for tensor in graph.output: - model.set_tensor_datatype(tensor.name, DataType["BIPOLAR"]) + model.set_tensor_datatype("inp", idt) + model.set_tensor_datatype("outp", odt) + model.set_tensor_datatype("weights", wdt) + model.set_initializer("weights", W_reshaped) + if T is not None: + model.set_tensor_datatype("thresh", tdt) + model.set_initializer("thresh", T_reshaped) + return model - # generate input data - x = np.random.randint(2, size=mw) - input_tensor = (np.asarray(x, dtype=np.float32)).reshape(1, sf, simd) - input_dict = {"inp": input_tensor} +def test_fpgadataflow_fclayer_all_bipolar(): + mh = 4 + mw = 4 + pe = 4 + simd = 4 + wdt = idt = odt = DataType.BIPOLAR + tdt = DataType.UINT32 # generate weights W = np.random.randint(2, size=(mh, mw)) - weights_tensor = (np.asarray(W, dtype=np.float32)).reshape(pe, wmem, simd) - input_dict["weights"] = weights_tensor - - # generate threshold activation - T = np.zeros(mh) - thresh_tensor = (np.asarray(T, dtype=np.float32)).reshape(pe, tmem, n_thres_steps) - input_dict["thresh"] = thresh_tensor - - output_dict = oxe.execute_onnx(model, input_dict) - + # single global threshold at zero + T = np.zeros((1, 1)) + model = make_single_fclayer_modelwrapper(W, T, pe, simd, wdt, idt, tdt, odt) + # generate input data + x = np.random.randint(2, size=mw) + ishape = model.get_tensor_shape("inp") + input_tensor = (np.asarray(x, dtype=np.float32)).reshape(*ishape) + input_dict = {"inp": input_tensor} + produced = oxe.execute_onnx(model, input_dict)["outp"] # convert to bipolar values Wb = 2 * W - 1 xb = 2 * x - 1 yb = np.dot(Wb, xb) thres = MultiThreshold() - expected = thres._execute(yb.reshape(1, mh), T.reshape((mh, 1))) - assert (output_dict["outp"] == expected).all() + expected = thres._execute(yb.reshape(1, mh), T) + assert (produced == expected).all()