Commit 86c6aa3d authored by Bowen Wu's avatar Bowen Wu
Browse files

Build fuzzing framework

parents
__pycache__
*.pyc
ckpt/
diff/
mutated/
*.log
*.sol
.DS_Store
\ No newline at end of file
[submodule "pysmps"]
path = pysmps
url = https://github.com/jmaerte/pysmps.git
import random
import numpy as np
import sys
import subprocess as sp
class MPSProgram:
"""
Class to represent MPS programs.
"""
# def __init__(self,
# name,
# objsense,
# objective_name,
# row_names,
# col_names,
# col_types,
# types,
# c,
# A,
# rhs_names,
# rhs,
# bnd_names,
# bnd
# ):
def __init__(self,
**kwargs
):
self.name = kwargs["name"]
self.objsense = kwargs["objsense"]
self.objective_name = kwargs["obj_name"]
self.row_names = kwargs["row_names"]
self.col_names = kwargs["col_names"]
self.col_types = kwargs["col_types"]
self.types = kwargs["types"]
self.c = kwargs["obj_coeff"]
self.A = kwargs["con_mat"]
self.rhs_names = kwargs["rhs_names"]
self.rhs = kwargs["rhs"]
self.bnd_names = kwargs["bnd_names"]
self.bnd = kwargs["bnd"]
def shift(self, prefix):
self.col_names = [prefix+"_"+c for c in self.col_names]
self.row_names = [prefix+"_"+c for c in self.row_names]
self.rhs_names = [prefix+"_"+r for r in self.rhs_names]
self.rhs = {prefix+"_"+k: v for k, v in self.rhs.items()}
def to_string(self):
out = "NAME "+ self.name + "\n"
if self.objsense != "":
out += "OBJSENSE \n " + self.objsense + "\n"
out+="ROWS\n"
out+=" N {}\n".format(self.objective_name)
for i,row in enumerate(self.row_names):
out+=" {} {} \n".format(self.types[i], row)
out+="COLUMNS\n"
Aprime = np.concatenate(([self.c],self.A),axis=0)
row_names_prime = [self.objective_name]+self.row_names
x_dim, y_dim = Aprime.shape[0],Aprime.shape[1]
for j in range(y_dim):
for i in range(x_dim):
if Aprime[i][j] == 0.0: continue
out +=" {}\t\t{}\t\t{}\n".format(self.col_names[j],row_names_prime[i],Aprime[i][j])
out+="RHS\n"
for rhs_name, rhs_bounds in self.rhs.items():
for i in range(len(rhs_bounds)):
out += " {}\t\t{}\t\t{}\n".format(rhs_name, self.row_names[i], rhs_bounds[i])
out+="BOUNDS\n"
# TODO : output bounds
out+="ENDATA"
return out
def __str__(self):
return self.to_string()
def generate_prog():
"""
Naive random generation of MPS programs. At the moment only generating LPs.
"""
name = "prog"
objsense = random.choice(["MAX","MIN"])
objective_name = "OBJ"
num_rows = random.randint(1,100)
row_names = []
for i in range(num_rows):
row_names.append("R"+str(i))
num_cols = random.randint(1,100)
col_names =[]
for i in range(num_cols):
col_names.append("C"+str(i))
c = np.random.randint(-1000,1000, size=num_cols)
col_types = ["continous"]*num_cols
types = ["L"]*num_rows
# Modify the logic to generate A!!!
# How many of them are 0?? - test to decide
A = np.random.randint(-1000, 1000, size=(num_rows,num_cols))
rhs_names=["RHS1"]
rhs = {"RHS": np.random.randint(-1000, 1000, size=(num_rows,))}
bnd_names =[]
bnd = []
return MPSProgram(name,
objsense,
objective_name,
row_names,
col_names,
col_types,
types,
c,
A,
rhs_names,
rhs,
bnd_names,
bnd)
# Dominik: in this code snippet, I threw the generated programs at CPLEX
# and observed its output. For a fuzzing approach, we would need to parse CPLEX's
# Gurobi's and Gurobi's ouputs (i.e. the optimum value) and cross-check them.
# As mentioned in the meeting, the crux will be to generate interesting MPS programs.
#
if __name__ == "__main__":
prog = generate_prog()
mps_file = open("mutant.mps", "w")
mps_file.write(prog.__str__())
mps_file.flush()
out = sp.getoutput('cplex -c "read mutant.mps" "optimize"')
print(out)
This diff is collapsed.
This diff is collapsed.
NAME TESTPROB
ROWS
N COST
L LIM1
G LIM2
E MYEQN
COLUMNS
XONE COST 1 LIM1 1
XONE LIM2 1
YTWO COST 4 LIM1 1
YTWO MYEQN -1
ZTHREE COST 9 LIM2 1
ZTHREE MYEQN 1
RHS
RHS1 LIM1 5 LIM2 10
RHS1 MYEQN 7
BOUNDS
UP BND1 XONE 4
LO BND1 YTWO -1
UP BND1 YTWO 1
ENDATA
NAME TESTPROB
ROWS
N COST
G LIM1
G LIM2
COLUMNS
XONE COST -1 LIM1 1
XONE LIM2 1
YTWO COST -4 LIM1 1
ZTHREE COST -9 LIM2 1
RHS
RHS1 LIM1 5 LIM2 10
BOUNDS
UP BND1 XONE 4
LO BND1 YTWO -1
UP BND1 YTWO 1
FR BND1 ZTHREE
ENDATA
NAME TESTPROB
ROWS
N COST
L LIM1
G LIM2
E MYEQN
COLUMNS
XONE COST 1 LIM1 1
XONE LIM2 1
YTWO COST 4 LIM1 1
YTWO MYEQN -1
INT1 'MARKER' 'INTORG'
ZTHREE COST 9 LIM2 1
ZTHREE MYEQN 1
INT1END 'MARKER' 'INTEND'
RHS
RHS1 LIM1 5 LIM2 10
RHS1 MYEQN 7
BOUNDS
UP BND1 XONE 4
LO BND1 YTWO -1
UP BND1 YTWO 1
ENDATA
from mps_util import load_mps_dict, print_mps_dict
import pysmps.pysmps.smps_loader
from generator import MPSProgram
import numpy as np
class MutantMethod(object):
def __init__(self) -> None:
self.mut_cnt = 0 # number of mutations produced
self.mut_diff = [] # mutants that cause differences
def inc_cnt(self):
self.mut_cnt += 1
class ScaleMut(MutantMethod):
def __init__(self, scale) -> None:
super().__init__()
self.scale = scale
def __call__(self, mps):
mps["obj_coeff"] *= self.scale
mps["con_mat"] *= self.scale
for v in mps["rhs"].values():
v *= self.scale
for v in mps["bnd"].values():
for vv in v.values():
vv *= self.scale
self.inc_cnt()
return mps
class SparseMatMut(MutantMethod):
def __init__(self, prob) -> None:
super().__init__()
self.prob = prob
def __call__(self, mps):
shape = mps["con_mat"].shape
size = shape[0] * shape[1]
# https://stackoverflow.com/questions/48536969/how-to-randomly-set-elements-in-numpy-array-to-0
indices = np.random.choice(size, replace=False, size=int(size * self.prob))
mps["con_mat"][np.unravel_index(indices, shape)] = 0
self.inc_cnt()
return mps
class DenseMatMut(MutantMethod):
def __init__(self, prob) -> None:
super().__init__()
self.prob = prob
def __call__(self, mps):
shape = mps["con_mat"].shape
size = shape[0] * shape[1]
indices = np.random.choice(size, replace=False, size=int(size * self.prob))
for i, j in zip(*np.unravel_index(indices, shape)):
mps["con_mat"][i,j] = np.random.random()
self.inc_cnt()
return mps
class FlipSignMut(MutantMethod):
def __init__(self, prob) -> None:
super().__init__()
def __call__(self, mps):
shape = mps["con_mat"].shape
size = shape[0] * shape[1]
indices = np.random.choice(size, replace=False, size=int(size * self.prob))
mps["con_mat"][np.unravel_index(indices, shape)] *= -1
self.inc_cnt()
return mps
if __name__ == "__main__":
test_mps = "mps/testprob.mps"
sm = ScaleMut(scale = 0.5)
smm = SparseMatMut(prob = 0.5)
dmm = DenseMatMut(prob = 0.9)
fsm = FlipSignMut(prob = 0.9)
print_mps_dict(sm(load_mps_dict(test_mps)))
print_mps_dict(smm(load_mps_dict(test_mps)))
print_mps_dict(dmm(load_mps_dict(test_mps)))
print_mps_dict(fsm(load_mps_dict(test_mps)))
\ No newline at end of file
import os
import sys
import pysmps.pysmps.smps_loader
from generator import MPSProgram
# A simple wrapper to suppress talkative APIs from printing
# Inspired from https://stackoverflow.com/a/45669280
class VerbosePrint:
def __init__(self, verbose) -> None:
self.verbose = verbose
def __enter__(self):
if not self.verbose:
self._original_stdout = sys.stdout
sys.stdout = open(os.devnull, 'w')
def __exit__(self, exc_type, exc_val, exc_tb):
if not self.verbose:
sys.stdout.close()
sys.stdout = self._original_stdout
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
def load_mps_dict(path):
name, objective_name, row_names, col_names, col_types, types, c, A, rhs_names, rhs, bnd_names, bnd = pysmps.pysmps.smps_loader.load_mps(path)
ret = dict()
ret["name"] = name
ret["obj_name"] = objective_name
ret["row_names"] = row_names
ret["col_names"] = col_names
ret["col_types"] = col_types
ret["types"] = types
ret["obj_coeff"] = c
ret["con_mat"] = A
ret["rhs_names"] = rhs_names
ret["rhs"] = rhs
ret["bnd_names"] = bnd_names
ret["bnd"] = bnd
ret["objsense"] = "MAX" # TODO: don't hardcode it
return ret
def print_mps_dict(mps: dict):
for k in mps:
print(f"{bcolors.OKBLUE}{k}\n{bcolors.ENDC}", mps[k])
def print_load_mps_ret(path):
name, objective_name, row_names, col_names, col_types, types, c, A, rhs_names, rhs, bnd_names, bnd = pysmps.pysmps.smps_loader.load_mps(path)
print(f"{bcolors.OKBLUE}Program Name: {bcolors.ENDC}", name)
print(f"{bcolors.OKBLUE}Objective Name: {bcolors.ENDC}", objective_name)
print(f"{bcolors.OKBLUE}Row Names: {bcolors.ENDC}", row_names)
print(f"{bcolors.OKBLUE}Column Names: {bcolors.ENDC}", col_names)
print(f"{bcolors.OKBLUE}Column Types: {bcolors.ENDC}", col_types)
print(f"{bcolors.OKBLUE}Types: {bcolors.ENDC}", types)
print(f"{bcolors.OKBLUE}Objective Coeff: {bcolors.ENDC}", c)
print(f"{bcolors.OKBLUE}Constraint Matrix\n{bcolors.ENDC}", A)
print(f"{bcolors.OKBLUE}RHS Names: {bcolors.ENDC}", rhs_names)
print(f"{bcolors.OKBLUE}RHS\n{bcolors.ENDC}", rhs)
print(f"{bcolors.OKBLUE}Bound Names: {bcolors.ENDC}", bnd_names)
print(f"{bcolors.OKBLUE}Bound\n{bcolors.ENDC}", bnd)
def main():
assert len(sys.argv) >= 2, "Number of arguments must be larger than 2"
print_load_mps_ret(sys.argv[1])
print(MPSProgram(**load_mps_dict(sys.argv[1])))
if __name__ == "__main__":
main()
\ No newline at end of file
import time
import random
import os
import math
import pickle
from typing import List
from queue import Queue
from numpy import diff
from generator import MPSProgram
from mps_mut import DenseMatMut
from mps_util import load_mps_dict
from optimizer import OPTIMAL, CplexSolver, GurobiSolver
# TODO
# 1. Make this class checkpoint-able
# 2. Use two threads for cplex and gurobi each
# 3. Collect more stats
class OptFuzzer(object):
def __init__(self, seed : List, mut_method : List, cplex : CplexSolver, gurobi : GurobiSolver,
save_path : str, checkpoint_freq : int, checkpoint_path : str, diff_path: str, rel_tol = 1e-6) -> None:
self.seed = seed # a list of seed mps in dict format
self.mut_method = mut_method # a list of mutation methods
self.next_mut = 0
self.n_mut = len(self.mut_method)
self.cplex = cplex # cplex solver
self.gurobi = gurobi # gurobi solver
self.save_path = save_path # path to save the mutated mps files
os.makedirs(self.save_path, exist_ok=True)
self.checkpoint_freq = checkpoint_freq
self.checkpoint_path = checkpoint_path # path to store the checkpoint files
os.makedirs(self.checkpoint_path, exist_ok=True)
self.diff_path = diff_path # path to store the mutants that cause differences
os.makedirs(self.diff_path, exist_ok=True)
self.rel_tol = rel_tol
self.diff = [] # mutants that cause differences
self.all_mutant = []
self.queue = Queue(maxsize=0) # unbounded queue for seeds
self.next_id = 0 # id assigned to the next mutant
for s in self.seed:
mutant = {
"id" : self.next_id,
"parent" : [-1], # root seed, no parent
"mut_method" : "",
"mps" : load_mps_dict(s)
}
self.next_id += 1
self.queue.put_nowait(mutant)
def get_mutant(self, mut_choice : str) -> dict:
parent = self.queue.get()
mps = parent["mps"]
idx = 0
if mut_choice == "fixed":
idx = 0
elif mut_choice == "round_robin":
idx = self.next_mut
self.next_mut = (self.next_mut + 1) % self.n_mut
elif mut_choice == "random":
idx = random.randint(0, self.n_mut - 1)
else:
print(f"Unsupported mutant choice {mut_choice}")
os.abort()
mps_prime = self.mut_method[idx](mps)
ret = {
"id" : self.next_id,
"parent" : [parent["id"]], # root seed, no parent
"mut_method" : self.mut_method[idx].__str__(),
"mps" : mps_prime
}
self.next_id += 1
self.queue.put(parent)
return ret
def sol_equal(self, sol1 : dict, sol2 : dict) -> bool:
if sol1['status'] != sol2['status']:
return False
if sol1['status'] == OPTIMAL:
# check if the obj value is the same
if not math.isclose(sol1['obj_val'], sol2['obj_val'], rel_tol=self.rel_tol):
return False
# check if the variable values are the same
for v1 in sol1['var_val']:
var1 = sol1['var_val'][v1]
# Gurobi omits variables if 0
var2 = 0
try:
var2 = sol2['var_val'][v1]
except KeyError:
var2 = 0
if not math.isclose(var1, var2, rel_tol=self.rel_tol):
return False
return True
def fuzz_once(self, mut_choice):
mut = self.get_mutant(mut_choice)
mps_prog = MPSProgram(**mut["mps"])
fname = os.path.join(self.save_path, "mutant_" + str(mut["id"])+".mps")
with open(fname, "w") as f:
f.write(mps_prog.__str__())
cplex_sol = self.cplex(fname)
gurobi_sol = self.gurobi(fname)
if cplex_sol is None or gurobi_sol is None:
print("[Fatal] Cannot load the solution")
self.checkpoint()
os.abort()
if not self.sol_equal(cplex_sol, gurobi_sol):
dfname = os.path.join(self.diff_path, f"diff_{len(self.diff)}.mps")
with open(dfname, "w") as f:
f.write(mps_prog.__str__())
self.diff.append(mut)
self.all_mutant.append(mut)
def print_stats(self):
print(f"Seeds: {self.seed}")
print("Mutation methods used: ", self.mut_method)
print("# Mutations: ", self.next_id)
print("# Mutants that caused a difference: ", len(self.diff))
def checkpoint(self):
# store all the mutants that have been generated to disks
fname = os.path.join(self.checkpoint_path, f"ckpt_{self.next_id - 1}")
with open(fname, "wb") as f:
pickle.dump(self.all_mutant, f)
self.all_mutant = []
def save_states(self):
self.checkpoint()
fname = os.path.join(self.diff_path, "diff.pickle")
with open(fname, "wb") as f:
pickle.dump(self.diff, f)
def fuzz(self, max_time, mut_choice = "fixed"):
elapse = 0 # in seconds
nround = 0
while elapse < max_time:
start = time.time()
self.fuzz_once(mut_choice)
elapse += time.time() - start
if nround % self.checkpoint_freq == 0 and nround != 0:
self.checkpoint()
nround += 1
self.print_stats()
self.save_states()
def main():
dmm = DenseMatMut(0.5)
seed = ["mps/testprob.mps"]
mut_method = [dmm]
cpx = CplexSolver()
grb = GurobiSolver()
save_path = "mutated/"
checkpoint_freq = 100
checkpoint_path = "ckpt/"
diff_path = "diff/"
max_time = 1
mut_choice = "fixed"
fuzzer = OptFuzzer(seed, mut_method, cpx, grb, save_path, checkpoint_freq, checkpoint_path, diff_path)
fuzzer.fuzz(max_time, mut_choice)
fuzzer.print_stats()
cpx.print_solver_stats()
grb.print_solver_stats()
if __name__ == "__main__":
main()
import xml.etree.ElementTree as ET
from cplex import Cplex
import gurobipy as gp
from mps_util import VerbosePrint
# Solver status
OPTIMAL = 0
INFEASIBLE = 1
INF_OR_UNBD = 2
UNBOUNDED = 3
UNKNOWN = 4
status_text = {
OPTIMAL : "optimal",
INFEASIBLE : "infeasible",
INF_OR_UNBD : "infeasible or unbounded",
UNBOUNDED : "unbounded",
UNKNOWN : "unknown"
}
class Solver(object):
def __init__(self, verbose = False) -> None:
# by default, the solvers are very talky but we can make it less verbose
self.verbose = verbose
self.stats = dict()
for k in [OPTIMAL, INFEASIBLE, INF_OR_UNBD, UNBOUNDED, UNKNOWN]:
self.stats[k] = 0
def print_solver_stats(self) -> None:
print("\n", self.__class__.__name__, " Stats:")
for k in self.stats:
print(status_text[k], " = ", self.stats[k], " times")
class CplexSolver(Solver):
# https://www.tu-chemnitz.de/mathematik/discrete/manuals/cplex/doc/refman/html/appendixB.html
status_code = {
# LP
1 : OPTIMAL,
2 : UNBOUNDED,
3 : INFEASIBLE,
4 : INF_OR_UNBD,
# MIP
101 : OPTIMAL,
102 : OPTIMAL,
103 : INFEASIBLE
}
def __init__(self, verbose = False) -> None:
super().__init__(verbose)