cluster.py 5.95 KB
Newer Older
1 2 3 4
import re
# import argparse
import logging
import csv
scmalte's avatar
scmalte committed
5 6
import subprocess
import pydot
7 8
import dataclasses
import itertools
scmalte's avatar
scmalte committed
9
import networkx as nx
10
from dataclass_csv import DataclassReader
scmalte's avatar
scmalte committed
11
from .utils import logging as logutils
12 13

DEFAULT_RESULTS_CSV_FILE="moss-report.csv"
scmalte's avatar
scmalte committed
14 15
DEFAULT_TOTAL_GRAPH_DOT_FILE="moss-report.dot"
DEFAULT_CLUSTERS_DOT_FILE="clusters.dot"
16
DEFAULT_CLUSTER_FILE_PATTERN="cluster-{}.{}"
17 18 19
DEFAULT_THRESHOLD_PERCENTAGE=90
DEFAULT_THRESHOLD_LINES=50
DEFAULT_CREATE_SVG_FILES=True
20
DEFAULT_SUMMARY_CSV_FILE="clusters.csv"
21

22
@dataclasses.dataclass
scmalte's avatar
scmalte committed
23 24
class MossResult:
  id1: str
25
  percentage1: int
scmalte's avatar
scmalte committed
26
  id2: str
27
  percentage2: int
28
  percentage_avg: float
29
  lines: int
scmalte's avatar
scmalte committed
30
  match_file: str
31

32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
  def flat_headers(self):
    return [f.name for f in dataclasses.fields(self)]

  def flat_data(self):
    return dataclasses.astuple(self)

@dataclasses.dataclass
class ClusterEntry:
  cluster_id: int
  result: MossResult
  dot_file: str
  svg_file: str

  def flat_headers(self):
    field_names = [f.name for f in dataclasses.fields(self)]
    result_headers = self.result.flat_headers()

    return field_names[:1] + result_headers + field_names[2:]

  def flat_data(self):
    field_data = dataclasses.astuple(self)
    result_data = self.result.flat_data()

    return field_data[:1] + result_data + field_data[2:]
56

scmalte's avatar
scmalte committed
57 58
def read_results_from_csv_file(csv_file):
  results = []
59

scmalte's avatar
scmalte committed
60
  logging.info("Reading results from {}".format(csv_file))
61

scmalte's avatar
scmalte committed
62
  with open(csv_file, newline="") as csv_fh:
63
    csv_reader = DataclassReader(csv_fh, MossResult, delimiter=",", quotechar='"')
scmalte's avatar
scmalte committed
64

65 66 67
    # csv_reader = csv.reader(csv_fh, delimiter=",", quotechar='"')
    # next(csv_reader, None) # Skip CSV header line
    # results = [MossResult(*row) for row in csv_reader]
scmalte's avatar
scmalte committed
68

69
    results = list(csv_reader)
scmalte's avatar
scmalte committed
70 71 72 73 74 75

  logging.debug("Read {} results".format(len(results)))

  return results

def get_weight(result):
76
  return max(result.percentage1, result.percentage2)
scmalte's avatar
scmalte committed
77 78 79 80 81 82 83

def get_color(percentage):
  if (percentage >= 90): return "#D83018" # Red
  elif (percentage >= 80): return "#F07241" # Orange
  elif (percentage >= 70): return "#601848" # Purple
  else: return "#000000" # Black

84 85 86 87 88
def include(result, percentage_threshold, lines_threshold):
  return (
    percentage_threshold <= get_weight(result) and
    lines_threshold <= result.lines)

89
def get_results_graph(results, percentage_threshold, lines_threshold):
scmalte's avatar
scmalte committed
90 91
  graph = nx.Graph()

92 93
  logging.debug("Creating graph from {} initial results".format(len(results)))
  logging.debug("Thresholds percentages/lines: ".format(percentage_threshold, lines_threshold))
scmalte's avatar
scmalte committed
94 95

  for result in results:
96 97 98
    if not include(result, percentage_threshold, lines_threshold):
      continue

scmalte's avatar
scmalte committed
99 100 101 102 103
    weight = get_weight(result)
    edge = (result.id1, result.id2, weight)
    color = get_color(weight)

    attributes = {
104
      # Attributes for GraphViz
scmalte's avatar
scmalte committed
105 106 107 108 109 110
      "color": color,
      "penwidth": 2,
      "label": "{0}% ({1})".format(weight, result.lines),
      "labelURL": result.match_file,
      "URL": result.match_file,
      "target": "match",
111 112 113
      "fontcolor": color,
      # Attributes for internal bookkeeping
      "_result": result
scmalte's avatar
scmalte committed
114 115 116 117
    }

    graph.add_weighted_edges_from([edge], **attributes)

118 119 120 121 122
  logging.debug(
    "Graph contains {} nodes and {} edged".format(
      graph.number_of_nodes(),
      graph.number_of_edges()))

scmalte's avatar
scmalte committed
123 124
  return graph

125
def create_cluster_dot_and_svg_files(subgraph, index, cluster_dot_file, cluster_svg_file=None):
126 127 128 129 130 131 132 133 134
  logging.debug(
    "Writing cluster {} with {}/{} nodes/edge to file {}".format(
      index, 
      subgraph.number_of_nodes(),
      subgraph.number_of_edges(),
      cluster_dot_file))
  
  nx.drawing.nx_pydot.write_dot(subgraph, cluster_dot_file)

135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180
  if cluster_svg_file:
    logging.debug("Calling dot to create SVG {} file from {}".format(cluster_svg_file, cluster_dot_file))
    subprocess.run(["dot", "-Tsvg", "-o{}".format(cluster_svg_file), cluster_dot_file])

def create_clusters(graph, cluster_file_pattern, create_svg_files):
  logging.info("Computing connected component (CC) clusters")
  clusters = sorted(nx.connected_components(graph), key=len, reverse=True)
  
  logging.info(
    "Found {} CC clusters, will write them to files {}".format(
      len(clusters),
      cluster_file_pattern.format("#", "dot")))

  cluster_entries = []

  for index, cluster in enumerate(clusters):
    subgraph = graph.subgraph(cluster).copy()
    dot_file = cluster_file_pattern.format(index, "dot")

    svg_file = None
    if create_svg_files:
      svg_file = cluster_file_pattern.format(index, "svg")

    create_cluster_dot_and_svg_files(subgraph, index, dot_file, svg_file)

    for (_, _, data) in subgraph.edges(data=True):
      cluster_entries.append(
        ClusterEntry(
          index,
          data["_result"],
          dot_file,
          svg_file))  

  return cluster_entries

def create_summary_csv_file(cluster_entries, summary_csv_file):
  logging.info("Writing summary file {}".format(summary_csv_file))

  if cluster_entries:
    with open(summary_csv_file, "w", newline="") as csv_fh:
      csv_writer = csv.writer(csv_fh)

      csv_writer.writerow(cluster_entries[0].flat_headers())
      
      for entry in cluster_entries:
        csv_writer.writerow(entry.flat_data())  
181

scmalte's avatar
scmalte committed
182 183 184
def main(
    results_csv_file=DEFAULT_RESULTS_CSV_FILE,
    total_graph_dot_file=DEFAULT_TOTAL_GRAPH_DOT_FILE,
185
    cluster_file_pattern=DEFAULT_CLUSTER_FILE_PATTERN,
186 187
    percentage_threshold=DEFAULT_THRESHOLD_PERCENTAGE,
    lines_threshold=DEFAULT_THRESHOLD_LINES,
188 189
    create_svg_files=DEFAULT_CREATE_SVG_FILES,
    summary_csv_file=DEFAULT_SUMMARY_CSV_FILE):
scmalte's avatar
scmalte committed
190 191 192 193

  logutils.configure_level_and_format()

  results = read_results_from_csv_file(results_csv_file)
194
  graph = get_results_graph(results, percentage_threshold, lines_threshold)
scmalte's avatar
scmalte committed
195 196 197 198

  logging.info("Writing total graph to {}".format(total_graph_dot_file))
  nx.drawing.nx_pydot.write_dot(graph, total_graph_dot_file)

199
  cluster_entries = create_clusters(graph, cluster_file_pattern, create_svg_files)
200

201
  create_summary_csv_file(cluster_entries, summary_csv_file)
202 203 204 205


if __name__ == "__main__":
  main()