Skip to content
Snippets Groups Projects
Commit 4f8d8108 authored by auphelia's avatar auphelia
Browse files

[Trafo - templates] Restructure driver.py and add more comments

parent 65e1e8b0
No related branches found
No related tags found
No related merge requests found
......@@ -102,13 +102,10 @@ from finn.util.data_packing import (
)
from finn.core.datatype import DataType
class RemoteTest():
class FINNAccelDriver():
def __init__(self, N, bitfile):
# batchsize
self.N = N
self.ol = Overlay(bitfile)
self.dma = self.ol.axi_dma_0
self.ctrl_regs = self.ol.resize_accel_0
# input FINN DataType
self.idt = $INPUT_FINN_DATATYPE$
# output FINN DataType
......@@ -120,77 +117,72 @@ class RemoteTest():
self.oshape_folded = $OUTPUT_SHAPE_FOLDED$
self.ishape_packed = $INPUT_SHAPE_PACKED$
self.oshape_packed = $OUTPUT_SHAPE_PACKED$
# load bitfile and set up accelerator
self.ol = Overlay(bitfile)
self.dma = self.ol.axi_dma_0
self.ctrl_regs = self.ol.resize_accel_0
# neuron folding factor of output = iterations per sample
self.itersPerSample = self.oshape_packed[-2]
# AXI lite register offset for number of iterations
# used by TLastMarker to signal end of transmission for AXI CDMA
self.REG_OFFSET_NUM_ITERS = 0x10
# set up TLastMarker with correct num. samples
self.ctrl_regs.write(self.REG_OFFSET_NUM_ITERS, self.N*self.itersPerSample)
def load_input(self, inputfile):
N = self.N
# load desired input .npy file
ibuf_normal = np.load(inputfile)
# ensure that shape is as expected
assert ibuf_normal.shape == self.ishape_normal
return ibuf_normal
# allocate a PYNQ buffer for the packed input and buffer
self.ibuf_packed_device = allocate(shape=self.ishape_packed, dtype=np.uint8)
self.obuf_packed_device = allocate(shape=self.oshape_packed, dtype=np.uint8)
def pack_input(self, ibuf_normal):
def fold_input(self, ibuf_normal):
# reshape input in desired shape
N = self.N
# convert to folded form
ibuf_folded = ibuf_normal.reshape(self.ishape_folded)
return ibuf_folded
def pack_input(self, ibuf_folded):
# pack the input buffer, reversing both SIMD dim and endianness
N = self.N
ibuf_packed = finnpy_to_packed_bytearray(
ibuf_folded, self.idt, reverse_endian=True, reverse_inner=True
)
return ibuf_packed
def unpack_output(self, obuf_packed):
N = self.N
# unpack the packed output buffer from accelerator
N = self.N
obuf_folded = packed_bytearray_to_finnpy(
obuf_packed, self.odt, self.oshape_folded, reverse_endian=True, reverse_inner=True
)
return obuf_folded
def save_output(self, obuf_folded, outputfile):
def unfold_output(self, obuf_folded):
# unfold to normal shape
N = self.N
# convert to normal reshape and save
obuf_normal = obuf_folded.reshape(self.oshape_normal)
np.save(outputfile, obuf_normal)
def allocate_pynqbuffers(self, data=None):
ibuf_packed_device = allocate(shape=self.ishape_packed, dtype=np.uint8)
# if necessary copy the packed data into the PYNQ buffer
# TODO optimization: pack directly into the PYNQ buffer?
if data is not None:
np.copyto(ibuf_packed_device, data)
obuf_packed_device = allocate(shape=self.oshape_packed, dtype=np.uint8)
return [ibuf_packed_device, obuf_packed_device]
def setup_TLastMarker(self):
N = self.N
# set up TLastMarker with correct num. samples
self.ctrl_regs.write(self.REG_OFFSET_NUM_ITERS, N*self.itersPerSample)
return obuf_normal
def copy_data_to_pynqbuffer(self, data):
# copy data to PYNQ buffer
np.copyto(self.ibuf_packed_device, data)
def run_nw(self, ibuf_packed_device, obuf_packed_device):
def execute_accel(self):
# set up the DMA and wait until all transfers complete
dma = self.dma
dma.sendchannel.transfer(ibuf_packed_device)
dma.recvchannel.transfer(obuf_packed_device)
dma.sendchannel.transfer(self.ibuf_packed_device)
dma.recvchannel.transfer(self.obuf_packed_device)
dma.sendchannel.wait()
dma.recvchannel.wait()
return obuf_packed_device
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Set exec mode, batchsize N, bitfile name, inputfile name and outputfile name')
parser.add_argument('--exec_mode', help='Please select functional verification ("remote_pynq") or throughput test ("throughput_test")')
parser.add_argument('--batchsize', help='number of samples for inference', type=int)
parser.add_argument('--bitfile', default="resizer.bit")
parser.add_argument('--inputfile', default="input.npy")
parser.add_argument('--outputfile', default="output.npy")
parser.add_argument('--bitfile', help='name of bitfile (i.e. "resizer.bit")', default="resizer.bit")
parser.add_argument('--inputfile', help='name of input npy file (i.e. "input.npy")', default="input.npy")
parser.add_argument('--outputfile', help='name of output npy file (i.e. "output.npy")', default="output.npy")
# parse arguments
args = parser.parse_args()
exec_mode = args.exec_mode
N = args.batchsize
......@@ -198,41 +190,49 @@ if __name__ == "__main__":
inputfile = args.inputfile
outputfile = args.outputfile
NW_test = RemoteTest(N, bitfile)
# instantiate FINN accelerator driver and pass batchsize and bitfile
finnDriver = FINNAccelDriver(N, bitfile)
# for the remote execution the data from the input npy file has to be loaded,
# packed and copied to the PYNQ buffer
if exec_mode == "remote_pynq":
ibuf_normal = NW_test.load_input(inputfile)
ibuf_packed = NW_test.pack_input(ibuf_normal)
# load desired input .npy file
ibuf_normal = np.load(inputfile)
# ensure that shape is as expected
assert ibuf_normal.shape == finnDriver.ishape_normal
ibuf_folded = finnDriver.fold_input(ibuf_normal)
ibuf_packed = finnDriver.pack_input(ibuf_folded)
finnDriver.copy_data_to_pynqbuffer(ibuf_packed)
elif exec_mode != "throughput_test":
raise Exception("Exec mode has to be set to remote_pynq or throughput_test")
NW_test.setup_TLastMarker()
# allocate a PYNQ buffer for the packed input and buffer
if exec_mode == "remote_pynq":
[ibuf_packed_device, obuf_packed_device] = NW_test.allocate_pynqbuffers(ibuf_packed)
else:
[ibuf_packed_device, obuf_packed_device] = NW_test.allocate_pynqbuffers()
# for the throughput test the runtime of the network has to be measured
if exec_mode == "throughput_test":
# measure runtime of network
start = time.time()
# dictionary for results of throughput test
res={}
obuf_packed_device = NW_test.run_nw(ibuf_packed_device, obuf_packed_device)
# execute accelerator
finnDriver.execute_accel()
# measure run time and fill dictionary with results of the throughput test
if exec_mode == "throughput_test":
end = time.time()
runtime = end - start
res["runtime[ms]"] = runtime*1000
res["throughput[images/s]"] = N / runtime
res["DRAM_in_bandwidth[Mb/s]"] = np.prod(NW_test.ishape_packed)*0.000001 / runtime
res["DRAM_out_bandwidth[Mb/s]"] = np.prod(NW_test.oshape_packed)*0.000001 / runtime
res["DRAM_in_bandwidth[Mb/s]"] = np.prod(finnDriver.ishape_packed)*0.000001 / runtime
res["DRAM_out_bandwidth[Mb/s]"] = np.prod(finnDriver.oshape_packed)*0.000001 / runtime
file = open("nw_metrics.txt", "w")
file.write(str(res))
file.close()
# if remote execution is selected unpack, unfold and save output to output npy file
else:
obuf_folded = NW_test.unpack_output(obuf_packed_device)
NW_test.save_output(obuf_folded, outputfile)
obuf_folded = finnDriver.unpack_output(finnDriver.obuf_packed_device)
obuf_normal = finnDriver.unfold_output(obuf_folded)
np.save(outputfile, obuf_normal)
"""
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment