From 88e4d89f7ddc80dd9b261048e05ed835a60a09d3 Mon Sep 17 00:00:00 2001 From: auphelia <jakobapk@web.de> Date: Mon, 27 Apr 2020 17:53:29 +0100 Subject: [PATCH] [Trafo-templates] Restructure driver.py in order to make it more generic --- .../transformation/fpgadataflow/templates.py | 231 ++++++++++-------- 1 file changed, 132 insertions(+), 99 deletions(-) diff --git a/src/finn/transformation/fpgadataflow/templates.py b/src/finn/transformation/fpgadataflow/templates.py index 20b1bd58e..6ff4a4f0b 100644 --- a/src/finn/transformation/fpgadataflow/templates.py +++ b/src/finn/transformation/fpgadataflow/templates.py @@ -97,109 +97,142 @@ from finn.util.data_packing import ( ) from finn.core.datatype import DataType -def load_input(N): - ishape_normal = $INPUT_SHAPE_NORMAL$ - # load desired input .npy file - ibuf_normal = np.load("input.npy") - # ensure that shape is as expected - assert ibuf_normal.shape == ishape_normal - return ibuf_normal - -def pack_input(ibuf_normal, N): - # input FINN DataType - idt = $INPUT_FINN_DATATYPE$ - ishape_folded = $INPUT_SHAPE_FOLDED$ - # convert to folded form - ibuf_folded = ibuf_normal.reshape(ishape_folded) - # pack the input buffer, reversing both SIMD dim and endianness - ibuf_packed = finnpy_to_packed_bytearray( - ibuf_folded, idt, reverse_endian=True, reverse_inner=True - ) - return ibuf_packed - -def unpack_output(obuf_packed, N): - # output FINN DataType - odt = $OUTPUT_FINN_DATATYPE$ - oshape_folded = $OUTPUT_SHAPE_FOLDED$ - # unpack the packed output buffer from accelerator - obuf_folded = packed_bytearray_to_finnpy( - obuf_packed, odt, oshape_folded, reverse_endian=True, reverse_inner=True - ) - return obuf_folded - -def save_output(obuf_folded, N): - # convert to normal reshape and save - oshape_normal = $OUTPUT_SHAPE_NORMAL$ - obuf_normal = obuf_folded.reshape(oshape_normal) - np.save("output.npy", obuf_normal) +class RemoteTest(): + def __init__( + self, + exec_mode, + N, + bitfile="resizer.bit", + inputfile="input.npy", + outputfile="output.npy"): + + self.exec_mode = exec_mode + self.N = N + self.inputfile = inputfile + self.outputfile = outputfile + self.ol = Overlay(bitfile) + self.dma = self.ol.axi_dma_0 + self.ctrl_regs = self.ol.resize_accel_0 + self.ishape_packed = $INPUT_SHAPE_PACKED$ + self.oshape_packed = $OUTPUT_SHAPE_PACKED$ + # AXI lite register offset for number of iterations + # used by TLastMarker to signal end of transmission for AXI CDMA + self.REG_OFFSET_NUM_ITERS = 0x10 + + def load_input(self): + N = self.N + ishape_normal = $INPUT_SHAPE_NORMAL$ + # load desired input .npy file + ibuf_normal = np.load(self.inputfile) + # ensure that shape is as expected + assert ibuf_normal.shape == ishape_normal + return ibuf_normal + + def pack_input(self, ibuf_normal): + N = self.N + # input FINN DataType + idt = $INPUT_FINN_DATATYPE$ + ishape_folded = $INPUT_SHAPE_FOLDED$ + # convert to folded form + ibuf_folded = ibuf_normal.reshape(ishape_folded) + # pack the input buffer, reversing both SIMD dim and endianness + ibuf_packed = finnpy_to_packed_bytearray( + ibuf_folded, idt, reverse_endian=True, reverse_inner=True + ) + return ibuf_packed + + def unpack_output(self, obuf_packed): + N = self.N + # output FINN DataType + odt = $OUTPUT_FINN_DATATYPE$ + oshape_folded = $OUTPUT_SHAPE_FOLDED$ + # unpack the packed output buffer from accelerator + obuf_folded = packed_bytearray_to_finnpy( + obuf_packed, odt, oshape_folded, reverse_endian=True, reverse_inner=True + ) + return obuf_folded + + def save_output(self, obuf_folded): + N = self.N + # convert to normal reshape and save + oshape_normal = $OUTPUT_SHAPE_NORMAL$ + obuf_normal = obuf_folded.reshape(oshape_normal) + np.save(self.outputfile, obuf_normal) + + def allocate_pynqbuffer(self, shape, data=None): + buf_device = allocate(shape=shape, dtype=np.uint8) + + # if necessary copy the packed data into the PYNQ buffer + # TODO optimization: pack directly into the PYNQ buffer? + if data is not None: + np.copyto(buf_device, data) + + return buf_device + + + def run_nw(self): + exec_mode = self.exec_mode + if exec_mode == "remote_pynq": + ibuf_normal = self.load_input() + ibuf_packed = self.pack_input(ibuf_normal) + elif exec_mode != "throughput_test": + raise Exception("Exec mode has to be set to remote_pynq or throughput_test") + + # set up TLastMarker with correct num. samples + self.ctrl_regs.write(self.REG_OFFSET_NUM_ITERS, N) + + # allocate a PYNQ buffer for the packed input buffer + if exec_mode == "remote_pynq": + ibuf_packed_device = self.allocate_pynqbuffer(self.ishape_packed, ibuf_packed) + else: + ibuf_packed_device = self.allocate_pynqbuffer(self.ishape_packed) + + # allocate a PYNQ buffer for the returned packed output buffer + obuf_packed = self.allocate_pynqbuffer(self.oshape_packed) + + if exec_mode == "throughput_test": + # measure runtime of network + start = time.time() + res={} + + # set up the DMA and wait until all transfers complete + dma = self.dma + dma.sendchannel.transfer(ibuf_packed_device) + dma.recvchannel.transfer(obuf_packed) + dma.sendchannel.wait() + dma.recvchannel.wait() + + + if exec_mode == "throughput_test": + end = time.time() + runtime = end - start + res["runtime[ms]"] = runtime*1000 + res["throughput[images/s]"] = N / runtime + res["DRAM_in_bandwidth[Mb/s]"] = np.prod(self.ishape_packed)*0.000001 / runtime + res["DRAM_out_bandwidth[Mb/s]"] = np.prod(self.oshape_packed)*0.000001 / runtime + file = open("nw_metrics.txt", "w") + file.write(str(res)) + file.close() + else: + obuf_folded = self.unpack_output(obuf_packed) + self.save_output(obuf_folded) + if __name__ == "__main__": - parser = argparse.ArgumentParser(description='Please select functional verification ("remote_pynq") or throughput test ("throughput_test")') - parser.add_argument('exec_mode', help='metadata prop exec_mode as string') + parser = argparse.ArgumentParser(description='Set exec mode, batchsize N, bitfile name, inputfile name and outputfile name') + parser.add_argument('exec_mode', help='Please select functional verification ("remote_pynq") or throughput test ("throughput_test")') + parser.add_argument('N', help='number of samples for inference', type=int) + parser.add_argument('bitfile', default="resizer.bit") + parser.add_argument('inputfile', default="input.npy") + parser.add_argument('outputfile', default="output.npy") args = parser.parse_args() exec_mode = args.exec_mode + N = args.N + bitfile = args.bitfile + inputfile = args.inputfile + outputfile = args.outputfile - bitfile_path = "resizer.bit" - ol = Overlay(bitfile_path) - dma=ol.axi_dma_0 - ctrl_regs=ol.resize_accel_0 - # AXI lite register offset for number of iterations - # used by TLastMarker to signal end of transmission for AXI CDMA - REG_OFFSET_NUM_ITERS = 0x10 - - # number of samples for inference - if exec_mode == "remote_pynq": - N = 1 - elif exec_mode == "throughput_test": - res={} - N = 1000 - else: - raise Exception("Exec mode has to be set to remote_pynq or throughput_test") - - # declare input/output types and shapes for the accelerator - ishape_packed = $INPUT_SHAPE_PACKED$ - oshape_packed = $OUTPUT_SHAPE_PACKED$ - - if exec_mode == "remote_pynq": - ibuf_normal = load_input(N) - ibuf_packed = pack_input(ibuf_normal, N) - elif exec_mode == "throughput_test": - ibuf_packed = np.asarray(np.random.uniform(low=0, high=1, size=tuple(ishape_packed)), dtype=np.uint8) - - # set up TLastMarker with correct num. samples - ctrl_regs.write(REG_OFFSET_NUM_ITERS, N) - - # allocate a PYNQ buffer for the packed input buffer - ibuf_packed_device = allocate(shape=ishape_packed, dtype=np.uint8) - # copy the packed data into the PYNQ buffer - # TODO optimization: pack directly into the PYNQ buffer? - np.copyto(ibuf_packed_device, ibuf_packed) - - # allocate a PYNQ buffer for the returned packed output buffer - obuf_packed = allocate(shape=oshape_packed, dtype=np.uint8) - - if exec_mode == "throughput_test": - # measure runtime of network - start = time.time() - - # set up the DMA and wait until all transfers complete - dma.sendchannel.transfer(ibuf_packed_device) - dma.recvchannel.transfer(obuf_packed) - dma.sendchannel.wait() - dma.recvchannel.wait() - - - if exec_mode == "throughput_test": - end = time.time() - runtime = end - start - res["runtime[ms]"] = runtime*1000 - res["throughput[images/s]"] = N / runtime - file = open("nw_metrics.txt", "w") - file.write(str(res)) - file.close() - - else: - obuf_folded = unpack_output(obuf_packed, N) - save_output(obuf_folded, N) + Test = RemoteTest(exec_mode, N, bitfile, inputfile, outputfile) + Test.run_nw() """ -- GitLab