import re # import argparse import logging import csv import subprocess import pydot import dataclasses import itertools import networkx as nx from dataclass_csv import DataclassReader from .utils import logging as logutils DEFAULT_RESULTS_CSV_FILE="moss-report.csv" DEFAULT_TOTAL_GRAPH_DOT_FILE="moss-report.dot" DEFAULT_CLUSTERS_DOT_FILE="clusters.dot" DEFAULT_CLUSTER_FILE_PATTERN="cluster-{}.{}" DEFAULT_THRESHOLD_PERCENTAGE=90 DEFAULT_THRESHOLD_LINES=50 DEFAULT_CREATE_SVG_FILES=True DEFAULT_SUMMARY_CSV_FILE="clusters.csv" @dataclasses.dataclass class MossResult: id1: str percentage1: int id2: str percentage2: int percentage_avg: float lines: int match_file: str def fields_flattened(self): return [f.name for f in dataclasses.fields(self)] def values_flattened(self): return dataclasses.astuple(self) @dataclasses.dataclass class ClusterEntry: cluster_id: int result: MossResult dot_file: str svg_file: str def fields_flattened(self): field_names = [f.name for f in dataclasses.fields(self)] result_headers = self.result.fields_flattened() return field_names[:1] + result_headers + field_names[2:] def values_flattened(self): field_data = dataclasses.astuple(self) result_data = self.result.values_flattened() return field_data[:1] + result_data + field_data[2:] def read_results_from_csv_file(csv_file): results = [] logging.info("Reading results from {}".format(csv_file)) with open(csv_file, newline="") as csv_fh: csv_reader = DataclassReader(csv_fh, MossResult, delimiter=",", quotechar='"') # csv_reader = csv.reader(csv_fh, delimiter=",", quotechar='"') # next(csv_reader, None) # Skip CSV header line # results = [MossResult(*row) for row in csv_reader] results = list(csv_reader) logging.debug("Read {} results".format(len(results))) return results def get_weight(result): return max(result.percentage1, result.percentage2) def get_color(percentage): if (percentage >= 90): return "#D83018" # Red elif (percentage >= 80): return "#F07241" # Orange elif (percentage >= 70): return "#601848" # Purple else: return "#000000" # Black def include(result, percentage_threshold, lines_threshold): return ( percentage_threshold <= get_weight(result) and lines_threshold <= result.lines) def get_results_graph(results, percentage_threshold, lines_threshold): graph = nx.Graph() logging.debug("Creating graph from {} initial results".format(len(results))) logging.debug("Thresholds percentages/lines: ".format(percentage_threshold, lines_threshold)) for result in results: if not include(result, percentage_threshold, lines_threshold): continue weight = get_weight(result) edge = (result.id1, result.id2, weight) color = get_color(weight) attributes = { # Attributes for GraphViz "color": color, "penwidth": 2, "label": "{0}% ({1})".format(weight, result.lines), "labelURL": result.match_file, "URL": result.match_file, "target": "match", "fontcolor": color, # Attributes for internal bookkeeping "_result": result } graph.add_weighted_edges_from([edge], **attributes) logging.debug( "Graph contains {} nodes and {} edged".format( graph.number_of_nodes(), graph.number_of_edges())) return graph def create_cluster_dot_and_svg_files(subgraph, index, cluster_dot_file, cluster_svg_file=None): logging.debug( "Writing cluster {} with {}/{} nodes/edge to file {}".format( index, subgraph.number_of_nodes(), subgraph.number_of_edges(), cluster_dot_file)) nx.drawing.nx_pydot.write_dot(subgraph, cluster_dot_file) if cluster_svg_file: dot_command = ["dot", "-Tsvg", "-o{}".format(cluster_svg_file), cluster_dot_file] logging.debug("Calling dot to create SVG {} file from {}".format(cluster_svg_file, cluster_dot_file)) logging.debug("Command: {}".format(" ".join(dot_command))) subprocess.run(dot_command) def create_clusters(graph, cluster_file_pattern, create_svg_files): logging.info("Computing connected component (CC) clusters") clusters = sorted(nx.connected_components(graph), key=len, reverse=True) logging.info( "Found {} CC clusters, will write them to files {}".format( len(clusters), cluster_file_pattern.format("#", "dot"))) cluster_entries = [] for index, cluster in enumerate(clusters): subgraph = graph.subgraph(cluster).copy() dot_file = cluster_file_pattern.format(index, "dot") svg_file = None if create_svg_files: svg_file = cluster_file_pattern.format(index, "svg") create_cluster_dot_and_svg_files(subgraph, index, dot_file, svg_file) for (_, _, data) in subgraph.edges(data=True): cluster_entries.append( ClusterEntry( index, data["_result"], dot_file, svg_file)) return cluster_entries def create_summary_csv_file(cluster_entries, summary_csv_file): logging.info("Writing summary file {}".format(summary_csv_file)) if cluster_entries: with open(summary_csv_file, "w", newline="") as csv_fh: csv_writer = csv.writer(csv_fh) csv_writer.writerow(cluster_entries[0].fields_flattened()) for entry in cluster_entries: csv_writer.writerow(entry.values_flattened()) def main( results_csv_file=DEFAULT_RESULTS_CSV_FILE, total_graph_dot_file=DEFAULT_TOTAL_GRAPH_DOT_FILE, cluster_file_pattern=DEFAULT_CLUSTER_FILE_PATTERN, percentage_threshold=DEFAULT_THRESHOLD_PERCENTAGE, lines_threshold=DEFAULT_THRESHOLD_LINES, create_svg_files=DEFAULT_CREATE_SVG_FILES, summary_csv_file=DEFAULT_SUMMARY_CSV_FILE): logutils.configure_level_and_format() results = read_results_from_csv_file(results_csv_file) graph = get_results_graph(results, percentage_threshold, lines_threshold) logging.info("Writing total graph to {}".format(total_graph_dot_file)) nx.drawing.nx_pydot.write_dot(graph, total_graph_dot_file) cluster_entries = create_clusters(graph, cluster_file_pattern, create_svg_files) create_summary_csv_file(cluster_entries, summary_csv_file) if __name__ == "__main__": main()