Skip to content
Snippets Groups Projects
Commit 41e89ca0 authored by Tobi-Alonso's avatar Tobi-Alonso
Browse files

[HLSCustomOp] Remove redundant SameResize_Batch HLSCustomOp as FMPadding_Batch is preferred over it

parent bf7254c4
No related branches found
No related tags found
No related merge requests found
import os
import numpy as np
from onnx import TensorProto, helper
from finn.core.datatype import DataType
from finn.custom_op.fpgadataflow import HLSCustomOp
from finn.util.data_packing import npy_to_rtlsim_input, rtlsim_output_to_npy
class SameResize_Batch(HLSCustomOp):
"""Class that corresponds to finn-hlslib SameResize function.
Implements 'same' padding on a given input image."""
def __init__(self, onnx_node):
def get_nodeattr_types(self):
my_attrs = {
"ImgDim": ("i", True, 0),
"KernelDim": ("i", True, 0),
"Stride": ("i", True, 0),
"NumChannels": ("i", True, 0),
# FINN input datatype
"inputDataType": ("s", True, ""),
# distribution of added values to achieve "same" padding
"PaddingStyle": ("i", True, 2),
return my_attrs
def get_normal_input_shape(self):
idim = self.get_nodeattr("ImgDim")
num_ch = self.get_nodeattr("NumChannels")
ishape = (1, idim, idim, num_ch)
return ishape
def get_normal_output_shape(self):
idim = self.get_nodeattr("ImgDim")
num_ch = self.get_nodeattr("NumChannels")
kdim = self.get_nodeattr("KernelDim")
stride = self.get_nodeattr("Stride")
assert idim % stride == 0, "Stride must divide input dimension."
# number of "same" windows over the input data
same_windows = idim // stride
odim = kdim + stride * (same_windows - 1)
oshape = (1, odim, odim, num_ch)
return oshape
def get_folded_input_shape(self):
# even though there is no folding in the current hlslib op,
# insert a time multiplexing axis to remain compatible with the
# shapes produced by the rest of the dataflow pipeline
ret = list(self.get_normal_input_shape())
ret.insert(-1, 1)
return tuple(ret)
def get_folded_output_shape(self):
# even though there is no folding in the current hlslib op,
# insert a time multiplexing axis to remain compatible with the
# shapes produced by the rest of the dataflow pipeline
ret = list(self.get_normal_output_shape())
ret.insert(-1, 1)
return tuple(ret)
def make_shape_compatible_op(self, model):
exp_ishape = self.get_normal_input_shape()
oshape = self.get_normal_output_shape()
ishape = tuple(model.get_tensor_shape(self.onnx_node.input[0]))
assert ishape == exp_ishape, "Unexpect input shape for SameResize."
# implement tensor with correct shape
values = np.random.randn(*oshape).astype(np.float32)
return helper.make_node(
def infer_node_datatype(self, model):
node = self.onnx_node
# data type stays the same
dtype = model.get_tensor_datatype(node.input[0])
exp_idtype = self.get_input_datatype()
assert dtype == exp_idtype, "Unexpected datatype for SameResize_Batch"
model.set_tensor_datatype(node.output[0], dtype)
def verify_node(self):
def get_input_datatype(self):
"""Returns FINN DataType of input."""
ret = DataType[self.get_nodeattr("inputDataType")]
# the hlslib op always pads with zeroes, so ensure that the DataType
# is able to represent zeroes
assert ret.allowed(0), "SameResize_Batch DataType must support zero"
return ret
def get_output_datatype(self):
"""Returns FINN DataType of output. (Same as input datatype)"""
return self.get_input_datatype()
def get_instream_width(self):
ibits = self.get_input_datatype().bitwidth()
num_ch = self.get_nodeattr("NumChannels")
return ibits * num_ch
def get_outstream_width(self):
obits = self.get_output_datatype().bitwidth()
num_ch = self.get_nodeattr("NumChannels")
return obits * num_ch
def get_number_output_values(self):
folded_oshape = self.get_folded_output_shape()
def global_includes(self):
self.code_gen_dict["$GLOBALS$"] = ['#include "streamtools.h"']
def defines(self, var):
numReps = 1
assert self.get_nodeattr("PaddingStyle") == 2, "Only PaddingStyle=2 supported"
self.code_gen_dict["$DEFINES$"] = [
"""#define ImgDim1 {}\n #define KernelDim1 {}\n
#define Stride1 {}\n #define NumChannels1 {}\n
#define PaddingStyle1 {}\n #define numReps {}""".format(
def read_npy_data(self):
code_gen_dir = self.get_nodeattr("code_gen_dir_cppsim")
dtype = self.get_input_datatype()
if dtype == DataType.BIPOLAR:
# use binary for bipolar storage
dtype = DataType.BINARY
elem_bits = dtype.bitwidth()
packed_bits = self.get_instream_width()
packed_hls_type = "ap_uint<%d>" % packed_bits
elem_hls_type = dtype.get_hls_datatype_str()
npy_type = "float"
npy_in = "%s/input_0.npy" % code_gen_dir
self.code_gen_dict["$READNPYDATA$"] = []
'npy2apintstream<%s, %s, %d, %s>("%s", in0);'
% (packed_hls_type, elem_hls_type, elem_bits, npy_type, npy_in)
def strm_decl(self):
self.code_gen_dict["$STREAMDECLARATIONS$"] = []
'hls::stream<ap_uint<{}>> in0 ("in0");'.format(self.get_instream_width())
'hls::stream<ap_uint<{}>> out ("out");'.format(self.get_outstream_width())
def docompute(self):
in_t = self.get_input_datatype().get_hls_datatype_str()
node = self.onnx_node
self.code_gen_dict["$DOCOMPUTE$"] = [
"""{}<ImgDim1, KernelDim1, Stride1, NumChannels1,
{}, PaddingStyle1> (in0, out, numReps);""".format(
node.op_type, in_t
def dataoutstrm(self):
code_gen_dir = self.get_nodeattr("code_gen_dir_cppsim")
dtype = self.get_output_datatype()
if dtype == DataType.BIPOLAR:
# use binary for bipolar storage
dtype = DataType.BINARY
elem_bits = dtype.bitwidth()
packed_bits = self.get_outstream_width()
packed_hls_type = "ap_uint<%d>" % packed_bits
elem_hls_type = dtype.get_hls_datatype_str()
npy_type = "float"
npy_out = "%s/output.npy" % code_gen_dir
oshape = self.get_folded_output_shape()
oshape_cpp_str = str(oshape).replace("(", "{").replace(")", "}")
self.code_gen_dict["$DATAOUTSTREAM$"] = [
'apintstream2npy<%s, %s, %d, %s>(out, %s, "%s");'
% (
def save_as_npy(self):
self.code_gen_dict["$SAVEASCNPY$"] = []
def blackboxfunction(self):
packed_bits = self.get_instream_width()
packed_hls_type = "ap_uint<%d>" % packed_bits
self.code_gen_dict["$BLACKBOXFUNCTION$"] = [
"void %s(hls::stream<%s > &in0, hls::stream<%s > &out)"
% (, packed_hls_type, packed_hls_type)
def pragmas(self):
self.code_gen_dict["$PRAGMAS$"] = ["#pragma HLS INTERFACE axis port=in0"]
self.code_gen_dict["$PRAGMAS$"].append("#pragma HLS INTERFACE axis port=out")
"#pragma HLS INTERFACE ap_ctrl_none port=return"
def execute_node(self, context, graph):
mode = self.get_nodeattr("exec_mode")
node = self.onnx_node
exp_ishape = self.get_normal_input_shape()
exp_oshape = self.get_normal_output_shape()
folded_oshape = self.get_folded_output_shape()
if mode == "cppsim":
code_gen_dir = self.get_nodeattr("code_gen_dir_cppsim")
elif mode == "rtlsim":
code_gen_dir = self.get_nodeattr("code_gen_dir_ipgen")
raise Exception(
"""Invalid value for attribute exec_mode! Is currently set to: {}
has to be set to one of the following value ("cppsim", "rtlsim")""".format(
inp = context[node.input[0]]
assert str(inp.dtype) == "float32", "Input datatype is not float32"
assert (
inp.shape == exp_ishape
), """Input shape doesn't
match expected shape (1, ImgDim, ImgDim, NumChannels)."""
export_idt = self.get_input_datatype()
# no reshaping for input since assuming no folding on input
# make copy before saving array
inp = inp.copy(), "input_0.npy"), inp)
if mode == "cppsim":
# execute the precompiled model
# load output npy file
assert (
context[node.output[0]].shape == folded_oshape
), "cppsim \
did not produce expected ofolded utput shape"
context[node.output[0]] = context[node.output[0]].reshape(*exp_oshape)
elif mode == "rtlsim":
sim = self.get_rtlsim()
nbits = self.get_instream_width()
rtlsim_inp = npy_to_rtlsim_input(
"{}/input_0.npy".format(code_gen_dir), export_idt, nbits
rtlsim_output = self.rtlsim(sim, rtlsim_inp)
odt = export_idt
target_bits = odt.bitwidth()
packed_bits = self.get_outstream_width()
out_npy_path = "{}/output.npy".format(code_gen_dir)
out_shape = self.get_folded_output_shape()
rtlsim_output, out_npy_path, odt, out_shape, packed_bits, target_bits
# load and reshape output
output = np.load(out_npy_path)
output = np.asarray([output], dtype=np.float32).reshape(*exp_oshape)
context[node.output[0]] = output
raise Exception(
"""Invalid value for attribute exec_mode! Is currently set to: {}
has to be set to one of the following value ("cppsim", "rtlsim")""".format(
assert (
context[node.output[0]].shape == exp_oshape
), """Output shape doesn't match expected shape
(1, OutputDim, OutputDim, NumChannels)."""
......@@ -44,7 +44,6 @@ from finn.custom_op.fpgadataflow.streamingdatawidthconverter_batch import (
from finn.custom_op.fpgadataflow.globalaccpool_batch import GlobalAccPool_Batch
from finn.custom_op.fpgadataflow.sameresize_batch import SameResize_Batch
from finn.custom_op.fpgadataflow.fmpadding import FMPadding_Batch
from finn.custom_op.fpgadataflow.thresholding_batch import Thresholding_Batch
from finn.custom_op.fpgadataflow.addstreams_batch import AddStreams_Batch
......@@ -66,7 +65,6 @@ custom_op["MaxPoolNHWC"] = MaxPoolNHWC
custom_op["StreamingDataWidthConverter_Batch"] = StreamingDataWidthConverter_Batch
custom_op["StreamingFIFO"] = StreamingFIFO
custom_op["GlobalAccPool_Batch"] = GlobalAccPool_Batch
custom_op["SameResize_Batch"] = SameResize_Batch
custom_op["FMPadding_Batch"] = FMPadding_Batch
custom_op["Thresholding_Batch"] = Thresholding_Batch
custom_op["AddStreams_Batch"] = AddStreams_Batch
import pytest
import os
import numpy as np
from onnx import TensorProto, helper
from finn.core.datatype import DataType
from finn.core.modelwrapper import ModelWrapper
from finn.util.basic import gen_finn_dt_tensor
import finn.core.onnx_exec as oxe
from finn.transformation.infer_shapes import InferShapes
from finn.transformation.fpgadataflow.set_exec_mode import SetExecMode
from finn.transformation.general import GiveUniqueNodeNames
from finn.transformation.fpgadataflow.prepare_cppsim import PrepareCppSim
from finn.transformation.fpgadataflow.compile_cppsim import CompileCppSim
from finn.transformation.fpgadataflow.prepare_ip import PrepareIP
from finn.transformation.fpgadataflow.hlssynth_ip import HLSSynthIP
from finn.transformation.fpgadataflow.prepare_rtlsim import PrepareRTLSim
from finn.util.basic import pynq_part_map
test_pynq_board = os.getenv("PYNQ_BOARD", default="Pynq-Z1")
test_fpga_part = pynq_part_map[test_pynq_board]
target_clk_ns = 10
def make_single_sameresize_modelwrapper(
idim, odim, kdim, stride, num_ch, idt, pad_style
inp = helper.make_tensor_value_info(
"inp", TensorProto.FLOAT, [1, idim, idim, num_ch]
outp = helper.make_tensor_value_info(
"outp", TensorProto.FLOAT, [1, odim, odim, num_ch]
SameResize_node = helper.make_node(
graph = helper.make_graph(
nodes=[SameResize_node], name="sameresize_graph", inputs=[inp], outputs=[outp]
model = helper.make_model(graph, producer_name="sameresize-model")
model = ModelWrapper(model)
model.set_tensor_datatype("inp", idt)
model.set_tensor_datatype("outp", idt)
return model
# image dimension
@pytest.mark.parametrize("idim", [8, 16])
# kernel dimension
@pytest.mark.parametrize("kdim", [2, 3])
# stride
@pytest.mark.parametrize("stride", [1, 2])
# number of channels
@pytest.mark.parametrize("num_ch", [1, 2])
# FINN input datatype
@pytest.mark.parametrize("idt", [DataType.INT2, DataType.INT4])
# execution mode
@pytest.mark.parametrize("mode", ["cppsim", "rtlsim"])
def test_fpgadataflow_sameresize(idim, kdim, stride, num_ch, idt, mode):
pad_style = 2
assert idim % stride == 0, "Stride must divide input dimension."
# number of "same" windows over the input data
same_windows = idim // stride
odim = kdim + stride * (same_windows - 1)
# generate input data
x = gen_finn_dt_tensor(idt, [1, idim, idim, num_ch])
input_dict = {"inp": x}
model = make_single_sameresize_modelwrapper(
idim, odim, kdim, stride, num_ch, idt, pad_style
model = model.transform(InferShapes())
model = model.transform(SetExecMode(mode))
model = model.transform(GiveUniqueNodeNames())
if mode == "cppsim":
model = model.transform(PrepareCppSim())
model = model.transform(CompileCppSim())
elif mode == "rtlsim":
model = model.transform(PrepareIP(test_fpga_part, target_clk_ns))
model = model.transform(HLSSynthIP())
model = model.transform(PrepareRTLSim())
y_produced = oxe.execute_onnx(model, input_dict)["outp"]
expected_oshape = (1, odim, odim, num_ch)
assert y_produced.shape == expected_oshape
# calculate reference
# calculate correct padding according to parameters
pad = odim - idim
if pad_style == 2:
if pad % 2 == 0:
pad_up = pad // 2
pad_left = pad // 2
pad_up = pad // 2 + 1
pad_left = pad // 2 + 1
pad_up = pad // 2
pad_left = pad // 2
pad_down = pad - pad_up
pad_right = pad - pad_left
y_expected = np.pad(
x, ((0, 0), (pad_up, pad_down), (pad_left, pad_right), (0, 0)), "constant"
assert (y_produced == y_expected).all()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment