import re # import argparse import logging import csv import subprocess import pydot import networkx as nx from dataclasses import dataclass from .utils import logging as logutils DEFAULT_RESULTS_CSV_FILE="moss-report.csv" DEFAULT_TOTAL_GRAPH_DOT_FILE="moss-report.dot" DEFAULT_CLUSTERS_DOT_FILE="clusters.dot" DEFAULT_CLUSTER_DOT_FILE_PATTERN="cluster-{}-{}.dot" DEFAULT_THRESHOLD_PERCENTAGE=90 DEFAULT_THRESHOLD_LINES=50 DEFAULT_CREATE_SVG_FILES=True @dataclass # USE AS IF frozen=True class MossResult: id1: str percentage1: int id2: str percentage2: int avg_percentage: int lines: int match_file: str def __post_init__(self): # Despite the (mandatory) type annotations above, there is no guarantee that # the field values have the expected type, hence the explicit conversions. self.percentage1 = int(self.percentage1) self.percentage2 = int(self.percentage2) self.avg_percentage = float(self.avg_percentage) self.lines = int(self.lines) def read_results_from_csv_file(csv_file): results = [] logging.info("Reading results from {}".format(csv_file)) with open(csv_file, newline="") as csv_fh: csv_reader = csv.reader(csv_fh, delimiter=",", quotechar='"') next(csv_reader, None) # Skip CSV header line results = [MossResult(*row) for row in csv_reader] logging.debug("Read {} results".format(len(results))) return results def get_weight(result): return max(result.percentage1, result.percentage2) def get_color(percentage): if (percentage >= 90): return "#D83018" # Red elif (percentage >= 80): return "#F07241" # Orange elif (percentage >= 70): return "#601848" # Purple else: return "#000000" # Black def include(result, percentage_threshold, lines_threshold): return ( percentage_threshold <= get_weight(result) and lines_threshold <= result.lines) def create_results_graph(results, percentage_threshold, lines_threshold): graph = nx.Graph() logging.debug("Creating graph from {} initial results".format(len(results))) logging.debug("Thresholds percentages/lines: ".format(percentage_threshold, lines_threshold)) for result in results: if not include(result, percentage_threshold, lines_threshold): continue weight = get_weight(result) edge = (result.id1, result.id2, weight) color = get_color(weight) attributes = { "color": color, "penwidth": 2, "label": "{0}% ({1})".format(weight, result.lines), "labelURL": result.match_file, "URL": result.match_file, "target": "match", "fontcolor": color } graph.add_weighted_edges_from([edge], **attributes) logging.debug( "Graph contains {} nodes and {} edged".format( graph.number_of_nodes(), graph.number_of_edges())) return graph def write_cluster_files(subgraph, index, cluster_dot_file, create_svg_files): logging.debug( "Writing cluster {} with {}/{} nodes/edge to file {}".format( index, subgraph.number_of_nodes(), subgraph.number_of_edges(), cluster_dot_file)) nx.drawing.nx_pydot.write_dot(subgraph, cluster_dot_file) if create_svg_files: logging.debug("Calling dot to create SVG file from {}".format(cluster_dot_file)) subprocess.run(["dot", "-Tsvg", "-O", cluster_dot_file]) def main( results_csv_file=DEFAULT_RESULTS_CSV_FILE, total_graph_dot_file=DEFAULT_TOTAL_GRAPH_DOT_FILE, cluster_dot_file_pattern=DEFAULT_CLUSTER_DOT_FILE_PATTERN, percentage_threshold=DEFAULT_THRESHOLD_PERCENTAGE, lines_threshold=DEFAULT_THRESHOLD_LINES, create_svg_files=DEFAULT_CREATE_SVG_FILES): logutils.configure_level_and_format() results = read_results_from_csv_file(results_csv_file) graph = create_results_graph(results, percentage_threshold, lines_threshold) logging.info("Writing total graph to {}".format(total_graph_dot_file)) nx.drawing.nx_pydot.write_dot(graph, total_graph_dot_file) logging.info("Computing connected component (CC) clusters") clusters = sorted(nx.connected_components(graph), key=len, reverse=True) cluster_dot_file_pattern = cluster_dot_file_pattern.format("cc", "{}") logging.info( "Found {} CC clusters, will write them to files {}".format( len(clusters), cluster_dot_file_pattern.format("#"))) for index, cluster in enumerate(clusters): subgraph = graph.subgraph(cluster).copy() dot_file = cluster_dot_file_pattern.format(index) write_cluster_files(subgraph, index, dot_file, create_svg_files) if __name__ == "__main__": main()