diff --git a/src/finn/transformation/fpgadataflow/templates.py b/src/finn/transformation/fpgadataflow/templates.py index 55a5af2ad887e4a8cfa5e3836bef00f2defe7284..8959a4dcd21dd1d07f31fa54438d3eb4bf21cf4f 100644 --- a/src/finn/transformation/fpgadataflow/templates.py +++ b/src/finn/transformation/fpgadataflow/templates.py @@ -103,21 +103,21 @@ from finn.util.data_packing import ( from finn.core.datatype import DataType class RemoteTest(): - def __init__( - self, - exec_mode, - N, - bitfile="resizer.bit", - inputfile="input.npy", - outputfile="output.npy"): - - self.exec_mode = exec_mode + def __init__(self, N, bitfile): + self.N = N - self.inputfile = inputfile - self.outputfile = outputfile self.ol = Overlay(bitfile) self.dma = self.ol.axi_dma_0 self.ctrl_regs = self.ol.resize_accel_0 + # input FINN DataType + self.idt = $INPUT_FINN_DATATYPE$ + # output FINN DataType + self.odt = $OUTPUT_FINN_DATATYPE$ + # input and output shapes + self.ishape_normal = $INPUT_SHAPE_NORMAL$ + self.oshape_normal = $OUTPUT_SHAPE_NORMAL$ + self.ishape_folded = $INPUT_SHAPE_FOLDED$ + self.oshape_folded = $OUTPUT_SHAPE_FOLDED$ self.ishape_packed = $INPUT_SHAPE_PACKED$ self.oshape_packed = $OUTPUT_SHAPE_PACKED$ # neuron folding factor of output = iterations per sample @@ -126,120 +126,113 @@ class RemoteTest(): # used by TLastMarker to signal end of transmission for AXI CDMA self.REG_OFFSET_NUM_ITERS = 0x10 - def load_input(self): + def load_input(self, inputfile): N = self.N - ishape_normal = $INPUT_SHAPE_NORMAL$ # load desired input .npy file - ibuf_normal = np.load(self.inputfile) + ibuf_normal = np.load(inputfile) # ensure that shape is as expected - assert ibuf_normal.shape == ishape_normal + assert ibuf_normal.shape == self.ishape_normal return ibuf_normal def pack_input(self, ibuf_normal): N = self.N - # input FINN DataType - idt = $INPUT_FINN_DATATYPE$ - ishape_folded = $INPUT_SHAPE_FOLDED$ # convert to folded form - ibuf_folded = ibuf_normal.reshape(ishape_folded) + ibuf_folded = ibuf_normal.reshape(self.ishape_folded) # pack the input buffer, reversing both SIMD dim and endianness ibuf_packed = finnpy_to_packed_bytearray( - ibuf_folded, idt, reverse_endian=True, reverse_inner=True + ibuf_folded, self.idt, reverse_endian=True, reverse_inner=True ) return ibuf_packed def unpack_output(self, obuf_packed): N = self.N - # output FINN DataType - odt = $OUTPUT_FINN_DATATYPE$ - oshape_folded = $OUTPUT_SHAPE_FOLDED$ # unpack the packed output buffer from accelerator obuf_folded = packed_bytearray_to_finnpy( - obuf_packed, odt, oshape_folded, reverse_endian=True, reverse_inner=True + obuf_packed, self.odt, self.oshape_folded, reverse_endian=True, reverse_inner=True ) return obuf_folded - def save_output(self, obuf_folded): + def save_output(self, obuf_folded, outputfile): N = self.N # convert to normal reshape and save - oshape_normal = $OUTPUT_SHAPE_NORMAL$ - obuf_normal = obuf_folded.reshape(oshape_normal) - np.save(self.outputfile, obuf_normal) - - def allocate_pynqbuffer(self, shape, data=None): - buf_device = allocate(shape=shape, dtype=np.uint8) + obuf_normal = obuf_folded.reshape(self.oshape_normal) + np.save(outputfile, obuf_normal) + def allocate_pynqbuffers(self, data=None): + ibuf_packed_device = allocate(shape=self.ishape_packed, dtype=np.uint8) # if necessary copy the packed data into the PYNQ buffer # TODO optimization: pack directly into the PYNQ buffer? if data is not None: - np.copyto(buf_device, data) - - return buf_device + np.copyto(ibuf_packed_device, data) + obuf_packed_device = allocate(shape=self.oshape_packed, dtype=np.uint8) + return [ibuf_packed_device, obuf_packed_device] - def run_nw(self): - exec_mode = self.exec_mode - if exec_mode == "remote_pynq": - ibuf_normal = self.load_input() - ibuf_packed = self.pack_input(ibuf_normal) - elif exec_mode != "throughput_test": - raise Exception("Exec mode has to be set to remote_pynq or throughput_test") - + def setup_TLastMarker(self): + N = self.N # set up TLastMarker with correct num. samples self.ctrl_regs.write(self.REG_OFFSET_NUM_ITERS, N*self.itersPerSample) - # allocate a PYNQ buffer for the packed input buffer - if exec_mode == "remote_pynq": - ibuf_packed_device = self.allocate_pynqbuffer(self.ishape_packed, ibuf_packed) - else: - ibuf_packed_device = self.allocate_pynqbuffer(self.ishape_packed) - - # allocate a PYNQ buffer for the returned packed output buffer - obuf_packed = self.allocate_pynqbuffer(self.oshape_packed) - - if exec_mode == "throughput_test": - # measure runtime of network - start = time.time() - res={} + def run_nw(self, ibuf_packed_device, obuf_packed_device): # set up the DMA and wait until all transfers complete dma = self.dma dma.sendchannel.transfer(ibuf_packed_device) - dma.recvchannel.transfer(obuf_packed) + dma.recvchannel.transfer(obuf_packed_device) dma.sendchannel.wait() dma.recvchannel.wait() - - if exec_mode == "throughput_test": - end = time.time() - runtime = end - start - res["runtime[ms]"] = runtime*1000 - res["throughput[images/s]"] = N / runtime - res["DRAM_in_bandwidth[Mb/s]"] = np.prod(self.ishape_packed)*0.000001 / runtime - res["DRAM_out_bandwidth[Mb/s]"] = np.prod(self.oshape_packed)*0.000001 / runtime - file = open("nw_metrics.txt", "w") - file.write(str(res)) - file.close() - else: - obuf_folded = self.unpack_output(obuf_packed) - self.save_output(obuf_folded) - + return obuf_packed_device if __name__ == "__main__": parser = argparse.ArgumentParser(description='Set exec mode, batchsize N, bitfile name, inputfile name and outputfile name') - parser.add_argument('exec_mode', help='Please select functional verification ("remote_pynq") or throughput test ("throughput_test")') - parser.add_argument('N', help='number of samples for inference', type=int) - parser.add_argument('bitfile', default="resizer.bit") - parser.add_argument('inputfile', default="input.npy") - parser.add_argument('outputfile', default="output.npy") + parser.add_argument('--exec_mode', help='Please select functional verification ("remote_pynq") or throughput test ("throughput_test")') + parser.add_argument('--batchsize', help='number of samples for inference', type=int) + parser.add_argument('--bitfile', default="resizer.bit") + parser.add_argument('--inputfile', default="input.npy") + parser.add_argument('--outputfile', default="output.npy") args = parser.parse_args() exec_mode = args.exec_mode - N = args.N + N = args.batchsize bitfile = args.bitfile inputfile = args.inputfile outputfile = args.outputfile - Test = RemoteTest(exec_mode, N, bitfile, inputfile, outputfile) - Test.run_nw() + NW_test = RemoteTest(N, bitfile) + + if exec_mode == "remote_pynq": + ibuf_normal = NW_test.load_input(inputfile) + ibuf_packed = NW_test.pack_input(ibuf_normal) + elif exec_mode != "throughput_test": + raise Exception("Exec mode has to be set to remote_pynq or throughput_test") + + NW_test.setup_TLastMarker() + + # allocate a PYNQ buffer for the packed input and buffer + if exec_mode == "remote_pynq": + [ibuf_packed_device, obuf_packed_device] = NW_test.allocate_pynqbuffers(ibuf_packed) + else: + [ibuf_packed_device, obuf_packed_device] = NW_test.allocate_pynqbuffers() + + if exec_mode == "throughput_test": + # measure runtime of network + start = time.time() + res={} + + obuf_packed_device = NW_test.run_nw(ibuf_packed_device, obuf_packed_device) + + if exec_mode == "throughput_test": + end = time.time() + runtime = end - start + res["runtime[ms]"] = runtime*1000 + res["throughput[images/s]"] = N / runtime + res["DRAM_in_bandwidth[Mb/s]"] = np.prod(NW_test.ishape_packed)*0.000001 / runtime + res["DRAM_out_bandwidth[Mb/s]"] = np.prod(NW_test.oshape_packed)*0.000001 / runtime + file = open("nw_metrics.txt", "w") + file.write(str(res)) + file.close() + else: + obuf_folded = NW_test.unpack_output(obuf_packed_device) + NW_test.save_output(obuf_folded, outputfile) """