Skip to content
Snippets Groups Projects
Commit 88e4d89f authored by auphelia's avatar auphelia
Browse files

[Trafo-templates] Restructure driver.py in order to make it more generic

parent 6dcfa872
No related branches found
No related tags found
No related merge requests found
......@@ -97,109 +97,142 @@ from finn.util.data_packing import (
)
from finn.core.datatype import DataType
def load_input(N):
ishape_normal = $INPUT_SHAPE_NORMAL$
# load desired input .npy file
ibuf_normal = np.load("input.npy")
# ensure that shape is as expected
assert ibuf_normal.shape == ishape_normal
return ibuf_normal
def pack_input(ibuf_normal, N):
# input FINN DataType
idt = $INPUT_FINN_DATATYPE$
ishape_folded = $INPUT_SHAPE_FOLDED$
# convert to folded form
ibuf_folded = ibuf_normal.reshape(ishape_folded)
# pack the input buffer, reversing both SIMD dim and endianness
ibuf_packed = finnpy_to_packed_bytearray(
ibuf_folded, idt, reverse_endian=True, reverse_inner=True
)
return ibuf_packed
def unpack_output(obuf_packed, N):
# output FINN DataType
odt = $OUTPUT_FINN_DATATYPE$
oshape_folded = $OUTPUT_SHAPE_FOLDED$
# unpack the packed output buffer from accelerator
obuf_folded = packed_bytearray_to_finnpy(
obuf_packed, odt, oshape_folded, reverse_endian=True, reverse_inner=True
)
return obuf_folded
def save_output(obuf_folded, N):
# convert to normal reshape and save
oshape_normal = $OUTPUT_SHAPE_NORMAL$
obuf_normal = obuf_folded.reshape(oshape_normal)
np.save("output.npy", obuf_normal)
class RemoteTest():
def __init__(
self,
exec_mode,
N,
bitfile="resizer.bit",
inputfile="input.npy",
outputfile="output.npy"):
self.exec_mode = exec_mode
self.N = N
self.inputfile = inputfile
self.outputfile = outputfile
self.ol = Overlay(bitfile)
self.dma = self.ol.axi_dma_0
self.ctrl_regs = self.ol.resize_accel_0
self.ishape_packed = $INPUT_SHAPE_PACKED$
self.oshape_packed = $OUTPUT_SHAPE_PACKED$
# AXI lite register offset for number of iterations
# used by TLastMarker to signal end of transmission for AXI CDMA
self.REG_OFFSET_NUM_ITERS = 0x10
def load_input(self):
N = self.N
ishape_normal = $INPUT_SHAPE_NORMAL$
# load desired input .npy file
ibuf_normal = np.load(self.inputfile)
# ensure that shape is as expected
assert ibuf_normal.shape == ishape_normal
return ibuf_normal
def pack_input(self, ibuf_normal):
N = self.N
# input FINN DataType
idt = $INPUT_FINN_DATATYPE$
ishape_folded = $INPUT_SHAPE_FOLDED$
# convert to folded form
ibuf_folded = ibuf_normal.reshape(ishape_folded)
# pack the input buffer, reversing both SIMD dim and endianness
ibuf_packed = finnpy_to_packed_bytearray(
ibuf_folded, idt, reverse_endian=True, reverse_inner=True
)
return ibuf_packed
def unpack_output(self, obuf_packed):
N = self.N
# output FINN DataType
odt = $OUTPUT_FINN_DATATYPE$
oshape_folded = $OUTPUT_SHAPE_FOLDED$
# unpack the packed output buffer from accelerator
obuf_folded = packed_bytearray_to_finnpy(
obuf_packed, odt, oshape_folded, reverse_endian=True, reverse_inner=True
)
return obuf_folded
def save_output(self, obuf_folded):
N = self.N
# convert to normal reshape and save
oshape_normal = $OUTPUT_SHAPE_NORMAL$
obuf_normal = obuf_folded.reshape(oshape_normal)
np.save(self.outputfile, obuf_normal)
def allocate_pynqbuffer(self, shape, data=None):
buf_device = allocate(shape=shape, dtype=np.uint8)
# if necessary copy the packed data into the PYNQ buffer
# TODO optimization: pack directly into the PYNQ buffer?
if data is not None:
np.copyto(buf_device, data)
return buf_device
def run_nw(self):
exec_mode = self.exec_mode
if exec_mode == "remote_pynq":
ibuf_normal = self.load_input()
ibuf_packed = self.pack_input(ibuf_normal)
elif exec_mode != "throughput_test":
raise Exception("Exec mode has to be set to remote_pynq or throughput_test")
# set up TLastMarker with correct num. samples
self.ctrl_regs.write(self.REG_OFFSET_NUM_ITERS, N)
# allocate a PYNQ buffer for the packed input buffer
if exec_mode == "remote_pynq":
ibuf_packed_device = self.allocate_pynqbuffer(self.ishape_packed, ibuf_packed)
else:
ibuf_packed_device = self.allocate_pynqbuffer(self.ishape_packed)
# allocate a PYNQ buffer for the returned packed output buffer
obuf_packed = self.allocate_pynqbuffer(self.oshape_packed)
if exec_mode == "throughput_test":
# measure runtime of network
start = time.time()
res={}
# set up the DMA and wait until all transfers complete
dma = self.dma
dma.sendchannel.transfer(ibuf_packed_device)
dma.recvchannel.transfer(obuf_packed)
dma.sendchannel.wait()
dma.recvchannel.wait()
if exec_mode == "throughput_test":
end = time.time()
runtime = end - start
res["runtime[ms]"] = runtime*1000
res["throughput[images/s]"] = N / runtime
res["DRAM_in_bandwidth[Mb/s]"] = np.prod(self.ishape_packed)*0.000001 / runtime
res["DRAM_out_bandwidth[Mb/s]"] = np.prod(self.oshape_packed)*0.000001 / runtime
file = open("nw_metrics.txt", "w")
file.write(str(res))
file.close()
else:
obuf_folded = self.unpack_output(obuf_packed)
self.save_output(obuf_folded)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Please select functional verification ("remote_pynq") or throughput test ("throughput_test")')
parser.add_argument('exec_mode', help='metadata prop exec_mode as string')
parser = argparse.ArgumentParser(description='Set exec mode, batchsize N, bitfile name, inputfile name and outputfile name')
parser.add_argument('exec_mode', help='Please select functional verification ("remote_pynq") or throughput test ("throughput_test")')
parser.add_argument('N', help='number of samples for inference', type=int)
parser.add_argument('bitfile', default="resizer.bit")
parser.add_argument('inputfile', default="input.npy")
parser.add_argument('outputfile', default="output.npy")
args = parser.parse_args()
exec_mode = args.exec_mode
N = args.N
bitfile = args.bitfile
inputfile = args.inputfile
outputfile = args.outputfile
bitfile_path = "resizer.bit"
ol = Overlay(bitfile_path)
dma=ol.axi_dma_0
ctrl_regs=ol.resize_accel_0
# AXI lite register offset for number of iterations
# used by TLastMarker to signal end of transmission for AXI CDMA
REG_OFFSET_NUM_ITERS = 0x10
# number of samples for inference
if exec_mode == "remote_pynq":
N = 1
elif exec_mode == "throughput_test":
res={}
N = 1000
else:
raise Exception("Exec mode has to be set to remote_pynq or throughput_test")
# declare input/output types and shapes for the accelerator
ishape_packed = $INPUT_SHAPE_PACKED$
oshape_packed = $OUTPUT_SHAPE_PACKED$
if exec_mode == "remote_pynq":
ibuf_normal = load_input(N)
ibuf_packed = pack_input(ibuf_normal, N)
elif exec_mode == "throughput_test":
ibuf_packed = np.asarray(np.random.uniform(low=0, high=1, size=tuple(ishape_packed)), dtype=np.uint8)
# set up TLastMarker with correct num. samples
ctrl_regs.write(REG_OFFSET_NUM_ITERS, N)
# allocate a PYNQ buffer for the packed input buffer
ibuf_packed_device = allocate(shape=ishape_packed, dtype=np.uint8)
# copy the packed data into the PYNQ buffer
# TODO optimization: pack directly into the PYNQ buffer?
np.copyto(ibuf_packed_device, ibuf_packed)
# allocate a PYNQ buffer for the returned packed output buffer
obuf_packed = allocate(shape=oshape_packed, dtype=np.uint8)
if exec_mode == "throughput_test":
# measure runtime of network
start = time.time()
# set up the DMA and wait until all transfers complete
dma.sendchannel.transfer(ibuf_packed_device)
dma.recvchannel.transfer(obuf_packed)
dma.sendchannel.wait()
dma.recvchannel.wait()
if exec_mode == "throughput_test":
end = time.time()
runtime = end - start
res["runtime[ms]"] = runtime*1000
res["throughput[images/s]"] = N / runtime
file = open("nw_metrics.txt", "w")
file.write(str(res))
file.close()
else:
obuf_folded = unpack_output(obuf_packed, N)
save_output(obuf_folded, N)
Test = RemoteTest(exec_mode, N, bitfile, inputfile, outputfile)
Test.run_nw()
"""
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment