# Copyright (c) 2020, Xilinx
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
#   list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
#   this list of conditions and the following disclaimer in the documentation
#   and/or other materials provided with the distribution.
#
# * Neither the name of FINN nor the names of its
#   contributors may be used to endorse or promote products derived from
#   this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import os

import numpy as np

# as of Feb'20 there is a bug that segfaults ONNX shape inference if we
# import pytorch before onnx, so we make sure to import onnx first
import onnx  # NOQA

import pytest
import pkg_resources as pk
from finn.core.modelwrapper import ModelWrapper
from finn.custom_op.registry import getCustomOp
from finn.core.onnx_exec import execute_onnx
from finn.transformation.double_to_single_float import DoubleToSingleFloat
from finn.transformation.infer_shapes import InferShapes
from finn.transformation.move_reshape import MoveReshape
from finn.transformation.fold_constants import FoldConstants
from finn.transformation.general import GiveReadableTensorNames, GiveUniqueNodeNames
from finn.transformation.streamline import Streamline
from finn.transformation.lower_convs_to_matmul import LowerConvsToMatMul
from finn.transformation.bipolar_to_xnor import ConvertBipolarMatMulToXnorPopcount
import finn.transformation.streamline.absorb as absorb
from finn.transformation.streamline.reorder import MakeMaxPoolNHWC
import finn.transformation.fpgadataflow.convert_to_hls_layers as to_hls
from finn.transformation.fpgadataflow.create_dataflow_partition import (
    CreateDataflowPartition,
)
from finn.transformation.fpgadataflow.insert_dwc import InsertDWC
from finn.transformation.fpgadataflow.insert_tlastmarker import InsertTLastMarker
from finn.transformation.fpgadataflow.codegen_ipgen import CodeGen_ipgen
from finn.transformation.fpgadataflow.hlssynth_ipgen import HLSSynth_IPGen
from finn.transformation.fpgadataflow.replace_verilog_relpaths import (
    ReplaceVerilogRelPaths,
)
from finn.transformation.fpgadataflow.codegen_ipstitch import CodeGen_ipstitch
from finn.transformation.fpgadataflow.set_exec_mode import SetExecMode
from finn.transformation.fpgadataflow.codegen_npysim import CodeGen_npysim
from finn.transformation.fpgadataflow.compile import Compile
from finn.transformation.fpgadataflow.make_pynq_driver import MakePYNQDriver
from finn.transformation.fpgadataflow.make_pynq_proj import MakePYNQProject
from finn.transformation.fpgadataflow.synth_pynq_proj import SynthPYNQProject
from finn.transformation.fpgadataflow.make_deployment import DeployToPYNQ
from finn.util.basic import pynq_part_map
from finn.util.test import get_test_model_trained
from finn.transformation.fpgadataflow.annotate_resources import AnnotateResources


build_dir = "/tmp/" + os.environ["FINN_INST_NAME"]
test_pynq_board = os.getenv("PYNQ_BOARD", default="Pynq-Z1")
test_fpga_part = pynq_part_map[test_pynq_board]
target_clk_ns = 5
mem_mode = "const"


def test_end2end_cnv_w1a1_export():
    import brevitas.onnx as bo

    cnv = get_test_model_trained("CNV", 1, 1)
    bo.export_finn_onnx(
        cnv, (1, 3, 32, 32), build_dir + "/end2end_cnv_w1a1_export.onnx"
    )


def test_end2end_cnv_w1a1_import_and_tidy():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_export.onnx")
    model = model.transform(DoubleToSingleFloat())
    model = model.transform(InferShapes())
    model = model.transform(FoldConstants())
    model = model.transform(GiveUniqueNodeNames())
    model = model.transform(GiveReadableTensorNames())
    model.save(build_dir + "/end2end_cnv_w1a1_tidy.onnx")


def test_end2end_cnv_w1a1_streamline():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_tidy.onnx")
    model = model.transform(Streamline())
    model = model.transform(LowerConvsToMatMul())
    model = model.transform(MakeMaxPoolNHWC())
    model = model.transform(absorb.AbsorbTransposeIntoMultiThreshold())
    model = model.transform(ConvertBipolarMatMulToXnorPopcount())
    model = model.transform(Streamline())
    model.save(build_dir + "/end2end_cnv_w1a1_streamlined.onnx")


def test_end2end_cnv_w1a1_convert_to_hls_layers():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_streamlined.onnx")
    model = model.transform(to_hls.InferBinaryStreamingFCLayer(mem_mode))
    model = model.transform(to_hls.InferQuantizedStreamingFCLayer(mem_mode))
    model = model.transform(to_hls.InferConvInpGen())
    model = model.transform(to_hls.InferStreamingMaxPool())
    model = model.transform(MoveReshape())
    model.save(build_dir + "/end2end_cnv_w1a1_hls_layers.onnx")


def test_end2end_cnv_w1a1_create_dataflow_partition():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_hls_layers.onnx")
    parent_model = model.transform(CreateDataflowPartition())
    parent_model.save(build_dir + "/end2end_cnv_w1a1_dataflow_parent.onnx")
    sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
    sdp_node = getCustomOp(sdp_node)
    dataflow_model_filename = sdp_node.get_nodeattr("model")
    dataflow_model = ModelWrapper(dataflow_model_filename)
    dataflow_model.save(build_dir + "/end2end_cnv_w1a1_dataflow_model.onnx")


def test_end2end_cnv_w1a1_fold_and_tlastmarker():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_dataflow_model.onnx")
    fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
    fc0w = getCustomOp(fc_layers[0])
    fc1w = getCustomOp(fc_layers[1])
    fc2w = getCustomOp(fc_layers[2])
    fc3w = getCustomOp(fc_layers[3])
    fc4w = getCustomOp(fc_layers[4])
    fc5w = getCustomOp(fc_layers[5])
    fc6w = getCustomOp(fc_layers[6])
    fc7w = getCustomOp(fc_layers[7])
    fc8w = getCustomOp(fc_layers[8])
    fc0w.set_nodeattr("SIMD", 27)
    fc0w.set_nodeattr("PE", 8)
    fc1w.set_nodeattr("SIMD", 32)
    fc1w.set_nodeattr("PE", 8)
    fc2w.set_nodeattr("SIMD", 32)
    fc2w.set_nodeattr("PE", 16)
    fc3w.set_nodeattr("SIMD", 32)
    fc3w.set_nodeattr("PE", 16)
    fc4w.set_nodeattr("SIMD", 32)
    fc4w.set_nodeattr("PE", 32)
    fc5w.set_nodeattr("SIMD", 64)
    fc5w.set_nodeattr("PE", 16)
    fc6w.set_nodeattr("SIMD", 32)
    fc6w.set_nodeattr("PE", 16)
    fc7w.set_nodeattr("SIMD", 64)
    fc7w.set_nodeattr("PE", 8)
    fc8w.set_nodeattr("SIMD", 16)
    fc8w.set_nodeattr("PE", 10)

    model = model.transform(InsertDWC())
    model = model.transform(InsertTLastMarker())
    model = model.transform(GiveUniqueNodeNames())
    model = model.transform(AnnotateResources("estimate"))
    model.save(build_dir + "/end2end_cnv_w1a1_folded.onnx")


def test_end2end_cnv_w1a1_gen_hls_ip():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_folded.onnx")
    model = model.transform(CodeGen_ipgen(test_fpga_part, target_clk_ns))
    model = model.transform(HLSSynth_IPGen())
    model = model.transform(AnnotateResources("hls"))
    model.save(build_dir + "/end2end_cnv_w1a1_ipgen.onnx")


def test_end2end_cnv_w1a1_ip_stitch():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_ipgen.onnx")
    model = model.transform(ReplaceVerilogRelPaths())
    model = model.transform(CodeGen_ipstitch(test_fpga_part))
    model.save(build_dir + "/end2end_cnv_w1a1_ipstitch.onnx")


def test_end2end_cnv_w1a1_verify_dataflow_part():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_ipstitch.onnx")
    x = np.zeros((1, 32, 32, 3), dtype=np.float32)
    inp_name = model.graph.input[0].name
    out_name = model.graph.output[0].name
    inp_dict = {inp_name: x}
    # npysim
    model = model.transform(CodeGen_npysim())
    model = model.transform(Compile())
    model = model.transform(SetExecMode("npysim"))
    model.save(build_dir + "/end2end_cnv_w1a1_ipgen_npysim.onnx")
    ret_npysim = execute_onnx(model, inp_dict, True)
    res_npysim = ret_npysim[out_name]
    # node-by-node rtlsim
    model = model.transform(SetExecMode("rtlsim"))
    fc_layers = model.get_nodes_by_op_type("StreamingFCLayer_Batch")
    for fcl in fc_layers:
        getCustomOp(fcl).set_nodeattr("rtlsim_trace", "default")
    model.save(build_dir + "/end2end_cnv_w1a1_ipgen_nodebynode_rtlsim.onnx")
    ret_rtlsim_nodebynode = execute_onnx(model, inp_dict, True)
    res_rtlsim_nodebynode = ret_rtlsim_nodebynode[out_name]
    # whole-network (ip-stitched) rtlsim
    model.set_metadata_prop("exec_mode", "rtlsim")
    model.set_metadata_prop("rtlsim_trace", "whole_trace.vcd")
    model.save(build_dir + "/end2end_cnv_w1a1_ipstitch_whole_rtlsim.onnx")
    # this is a particularly long-running test, set liveness thr. to unlimited
    os.environ["LIVENESS_THRESHOLD"] = "-1"
    ret_rtlsim_whole = execute_onnx(model, inp_dict, True)
    res_rtlsim_whole = ret_rtlsim_whole[out_name]
    assert np.isclose(res_npysim, res_rtlsim_nodebynode).all()
    assert np.isclose(res_npysim, res_rtlsim_whole).all()


def test_end2end_cnv_w1a1_verify_all():
    # use the streamlined model as the "golden" model for right answers
    golden = ModelWrapper(build_dir + "/end2end_cnv_w1a1_streamlined.onnx")
    iname = golden.graph.input[0].name
    oname = golden.graph.output[0].name
    # load one of the test vectors
    fn = pk.resource_filename("finn", "data/cifar10/cifar10-test-data-class3.npz")
    input_tensor = np.load(fn)["arr_0"].astype(np.float32)
    assert input_tensor.shape == (1, 3, 32, 32)
    x = input_tensor
    # x = np.zeros(ishape, dtype=np.float32)
    ret_golden = execute_onnx(golden, {iname: x}, True)
    y_golden = ret_golden[oname]
    # set up parent+child graph to test
    # we'll use models from the previous step as the child model
    parent_model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_dataflow_parent.onnx")
    iname = parent_model.graph.input[0].name
    oname = parent_model.graph.output[0].name
    # produce results with npysim
    sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
    sdp_node = getCustomOp(sdp_node)
    sdp_node.set_nodeattr("model", build_dir + "/end2end_cnv_w1a1_ipgen_npysim.onnx")
    ret_npysim = execute_onnx(parent_model, {iname: x}, True)
    y_npysim = ret_npysim[oname]
    # produce results with node-by-node rtlsim
    sdp_node.set_nodeattr(
        "model", build_dir + "/end2end_cnv_w1a1_ipgen_nodebynode_rtlsim.onnx"
    )
    ret_nodebynode_rtlsim = execute_onnx(parent_model, {iname: x}, True)
    y_nodebynode_rtlsim = ret_nodebynode_rtlsim[oname]
    # produce results with whole-network (stitched ip) rtlsim
    sdp_node.set_nodeattr(
        "model", build_dir + "/end2end_cnv_w1a1_ipstitch_whole_rtlsim.onnx"
    )
    # this is a particularly long-running test, set liveness thr. to unlimited
    os.environ["LIVENESS_THRESHOLD"] = "-1"
    ret_whole_rtlsim = execute_onnx(parent_model, {iname: x}, True)
    y_whole_rtlsim = ret_whole_rtlsim[oname]
    assert np.isclose(y_golden, y_npysim).all()
    assert np.isclose(y_golden, y_nodebynode_rtlsim).all()
    assert np.isclose(y_golden, y_whole_rtlsim).all()


def test_end2end_cnv_w1a1_make_pynq_proj():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_ipstitch.onnx")
    model = model.transform(MakePYNQProject(test_pynq_board))
    model.save(build_dir + "/end2end_cnv_w1a1_pynq_project.onnx")


def test_end2end_cnv_w1a1_synth_pynq_project():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_pynq_project.onnx")
    model = model.transform(SynthPYNQProject())
    model = model.transform(AnnotateResources("synth"))
    model.save(build_dir + "/end2end_cnv_w1a1_synth.onnx")


def test_end2end_cnv_w1a1_make_driver():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_synth.onnx")
    model = model.transform(MakePYNQDriver())
    model.save(build_dir + "/end2end_cnv_w1a1_pynq_driver.onnx")


def test_end2end_cnv_w1a1_deploy_on_pynq():
    model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_pynq_driver.onnx")
    try:
        ip = os.environ["PYNQ_IP"]  # no fault for this one; skip if not defined
        if ip == "":
            pytest.skip("PYNQ board IP address not specified")
        username = os.getenv("PYNQ_USERNAME", "xilinx")
        password = os.getenv("PYNQ_PASSWORD", "xilinx")
        target_dir = os.getenv("PYNQ_TARGET_DIR", "/home/xilinx/finn")
        model = model.transform(DeployToPYNQ(ip, username, password, target_dir))
        # save the model to be able to link it to the parent
        model.save(build_dir + "/end2end_cnv_w1a1_pynq_deploy.onnx")
    except KeyError:
        pytest.skip("PYNQ board IP address not specified")


def test_end2end_cnv_w1a1_run_on_pynq():
    # use the streamlined model as the "golden" model for right answers
    golden = ModelWrapper(build_dir + "/end2end_cnv_w1a1_streamlined.onnx")
    iname = golden.graph.input[0].name
    oname = golden.graph.output[0].name
    # load one of the test vectors
    fn = pk.resource_filename("finn", "data/cifar10/cifar10-test-data-class3.npz")
    input_tensor = np.load(fn)["arr_0"].astype(np.float32)
    assert input_tensor.shape == (1, 3, 32, 32)
    x = input_tensor
    # run using FINN-based execution
    ret_golden = execute_onnx(golden, {iname: x}, True)
    y_golden = ret_golden[oname]
    # set up parent+child graph to test
    # we'll use models from the previous step as the child model
    parent_model = ModelWrapper(build_dir + "/end2end_cnv_w1a1_dataflow_parent.onnx")
    iname = parent_model.graph.input[0].name
    oname = parent_model.graph.output[0].name
    try:
        ip = os.environ["PYNQ_IP"]  # NOQA
        if ip == "":
            pytest.skip("PYNQ board IP address not specified")
        # produce results with npysim
        sdp_node = parent_model.get_nodes_by_op_type("StreamingDataflowPartition")[0]
        sdp_node = getCustomOp(sdp_node)
        sdp_node.set_nodeattr("model", build_dir + "/end2end_cnv_w1a1_pynq_deploy.onnx")
        ret = execute_onnx(parent_model, {iname: x}, True)
        y = ret[oname]
        assert np.isclose(y, y_golden).all()

    except KeyError:
        pytest.skip("PYNQ board IP address not specified")