Commit 8f103a63 authored by Roman Trüb's avatar Roman Trüb
Browse files

cleanup of worker_datatrace (conversion of float to int)

parent a1612eef
......@@ -43,7 +43,7 @@ import numpy as np
logger = None
debug = False
testid = None
testid = None
errors = []
FetchObsThread_list = []
FetchObsThread_stopEvent = None
......@@ -87,13 +87,13 @@ class ServiceInfo():
self.servicename = servicename
self.files = []
self.pattern = "^%s_[0-9]+\.[a-z]+$" % servicename
def matchFileName(self, filename):
return re.search(self.pattern, os.path.basename(filename)) is not None
def addFile(self, filename):
self.files.append(filename)
def stripFileList(self, removelast=True):
self.files.sort()
if ((len(self.files) > 0) and removelast):
......@@ -107,15 +107,15 @@ class ServiceInfo():
#
##############################################################################
def sigterm_handler(signum, frame):
"""If the program is terminated by sending it the signal SIGTERM
(e.g. by executing 'kill') or SIGINT (pressing ctrl-c),
"""If the program is terminated by sending it the signal SIGTERM
(e.g. by executing 'kill') or SIGINT (pressing ctrl-c),
this signal handler is invoked for cleanup."""
global mainloop_stop
global FetchObsThread_stopEvent
logger.info("Process received SIGTERM or SIGINT signal")
# Signal all observer fetcher threads to stop:
logger.debug("Stopping observer fetcher threads...")
shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
......@@ -137,7 +137,7 @@ def sigterm_handler(signum, frame):
cn.close()
except:
logger.warning("Could not connect to database.")
# Tell the main loop to stop:
mainloop_stop = True
logger.debug("Set stop signal for main loop.")
......@@ -445,25 +445,31 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue
logqueue.put_nowait((loggername, logging.WARNING, "Empty data trace results file."))
else:
with open(resultfile_path, "a") as outfile:
infile = open(tmpfile2, "r")
for line in infile:
# input format: global_ts, comparator, data, PC, operation, local_ts
(timestamp, var, val, pc, access, localts) = line.strip().split(',')
if access == 'operation' or access == '' or var == '':
continue
if flocklab.parse_int(var) < len(varnames):
var = varnames[flocklab.parse_int(var)]
# output format: timestamp,observer_id,node_id,variable,value,access,pc
outfile.write("%s,%d,%s,%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, var, val, access, pc))
infile.close()
df = pd.read_csv(tmpfile2)
# debug
df.to_csv('/home/flocklab/tmp/tmp_df.txt')
# nan values cannot be converted to int -> drop corresponding lines
df.dropna(inplace=True)
# since there were nan values, comparator column was stored as nan but we need int; round is necessary otherwise 0.999999 is converted to 0 which is wrong
df.comparator = df.comparator.round().astype(int)
df.data = df.data.round().astype(int)
df['obsid'] = obsid
df['nodeid'] = nodeid
df['varname'] = df.comparator.apply(lambda x: (varnames[x] if x < len(varnames) else str(x)))
df.to_csv(
outfile,
columns=['global_ts', 'obsid', 'nodeid', 'varname', 'data', 'operation', 'PC'],
index=False,
header=False
)
except:
msg = "Error in datatrace worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
# debug
#shutil.copyfile(input_filename, "%s_raw" % resultfile_path)
#shutil.copyfile(tmpfile1, "%s_uncorrected.csv" % resultfile_path)
# shutil.copyfile(input_filename, "%s_raw" % resultfile_path)
# shutil.copyfile(tmpfile1, "%s_uncorrected.csv" % resultfile_path)
# delete files
os.remove(input_filename)
os.remove(tmpfile1)
......@@ -483,7 +489,7 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue
def worker_callback(result):
global errors
global FetchObsThread_queue
# the result contains two elements:
# 1st: a list of errors
if len(result[0]) > 0:
......@@ -493,7 +499,7 @@ def worker_callback(result):
errors.append(msg)
except:
errors.append("Failed to convert the error list in worker_callback (%s)." % str(result[0]))
# 2nd: a list of the processed elements
try:
FetchObsThread_queue.put(item=result[1], block=True, timeout=10)
......@@ -511,16 +517,16 @@ def worker_callback(result):
##############################################################################
class LogQueueThread(threading.Thread):
""" Thread which logs from queue to logfile.
"""
"""
def __init__(self, logqueue, logger, stopEvent):
threading.Thread.__init__(self)
threading.Thread.__init__(self)
self._logger = logger
self._stopEvent = stopEvent
self._logqueue = logqueue
def run(self):
self._logger.info("LogQueueThread started")
# Let thread run until someone calls terminate() on it:
while not self._stopEvent.is_set():
try:
......@@ -528,7 +534,7 @@ class LogQueueThread(threading.Thread):
self._logger.log(loglevel, loggername + msg)
except queue.Empty:
pass
# Stop the process:
self._logger.info("LogQueueThread stopped")
### END LogQueueThread
......@@ -541,9 +547,9 @@ class LogQueueThread(threading.Thread):
##############################################################################
class FetchObsThread(threading.Thread):
""" Thread which downloads database files from an observer to the server.
"""
"""
def __init__(self, obsid, obsethernet, dirname, debugdirname, workQueue, stopEvent):
threading.Thread.__init__(self)
threading.Thread.__init__(self)
self._obsid = obsid
self._obsethernet = obsethernet
self._obsfiledir = dirname
......@@ -551,21 +557,21 @@ class FetchObsThread(threading.Thread):
self._workQueue = workQueue
self._stopEvent = stopEvent
self._logger = logger
self._min_sleep = flocklab.config.getint("fetcher", "min_sleeptime")
self._max_randsleep = flocklab.config.getint("fetcher", "max_rand_sleeptime")
self._obstestresfolder = "%s/%d" % (flocklab.config.get("observer", "testresultfolder"), testid)
def run(self):
try:
self._loggerprefix = "(FetchObsThread.%d) "%self._obsid
self._logger.info(self._loggerprefix + "FetchObsThread starting...")
removelast = True
# Let thread run until someone calls terminate() on it:
while removelast == True:
""" Get data from the observer over SCP.
Then request data from the observer and store it in the server's filesystem.
""" Get data from the observer over SCP.
Then request data from the observer and store it in the server's filesystem.
Then sleep some random time before fetching data again.
"""
# Wait for some random time:
......@@ -599,7 +605,7 @@ class FetchObsThread(threading.Thread):
copyfilelist.append(resfile)
#if (len(service.files) > 0):
# self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (" ".join(service.files), service.servicename))
if len(copyfilelist) > 0:
# Download the database files:
self._logger.debug(self._loggerprefix + "Downloading results files %s" % (" ".join(copyfilelist)))
......@@ -654,10 +660,10 @@ class FetchObsThread(threading.Thread):
else:
self._logger.error(self._loggerprefix + "SSH to observer did not succeed, fetcher thread terminated with code %d. Error: %s" % (rs, err.strip()))
break # abort
except:
logger.error(self._loggerprefix + "FetchObsThread crashed: %s, %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
# Stop the process:
self._logger.info(self._loggerprefix + "FetchObsThread stopped")
### END FetchObsThread
......@@ -674,21 +680,21 @@ def start_fetcher():
global FetchObsThread_queue
global FetchObsThread_stopEvent
global obsfetcher_dict
# Daemonize the process ---
daemon.daemonize(None, closedesc=False)
logger.info("Daemon started")
logger.info("Going to fetch data for test ID %d" %testid)
# Get needed metadata from database ---
try:
(cn, cur) = flocklab.connect_to_db()
except:
flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
try:
cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address
FROM `tbl_serv_observer` AS `a`
LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk
cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address
FROM `tbl_serv_observer` AS `a`
LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk
WHERE `b`.test_fk = %d GROUP BY `a`.observer_id;
""" % testid)
except MySQLdb.Error as err:
......@@ -703,7 +709,7 @@ def start_fetcher():
if not rs:
logger.info("No observers found for this test. Nothing has to be done, thus exiting...")
return errno.ENODATA
# Start fetcher threads ---
# Create a directory structure to store the downloaded files from the DB:
obsfiledir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_dir'), testid)
......@@ -725,7 +731,7 @@ def start_fetcher():
debugdirname = "%s/%d" % (obsfiledebugdir, obsid)
if (not os.path.exists(debugdirname)):
os.makedirs(debugdirname)
# Start thread:
# Start thread:
try:
thread = FetchObsThread(obsid, observer[1], dirname, debugdirname, FetchObsThread_queue, FetchObsThread_stopEvent)
FetchObsThread_list.append(thread)
......@@ -734,7 +740,7 @@ def start_fetcher():
except:
logger.warning("Error when starting fetcher thread for observer %d: %s, %s" % (obsid, str(sys.exc_info()[0]), str(sys.exc_info()[1])))
continue
return flocklab.SUCCESS
### END start_fetcher
......@@ -786,7 +792,7 @@ def stop_fetcher():
except:
logger.warning("Could not connect to database.")
return errno.ENOPKG
return flocklab.SUCCESS
### END stop_fetcher
......@@ -801,12 +807,12 @@ class WorkManager():
self.worklist = {}
self.pattern = re.compile("_[0-9].*")
self.workcount = 0
def _next_item_with_state(self, service, obsid):
stateitem = list(self.worklist[service][obsid][1][0])
stateitem[4] = self.worklist[service][obsid][0]
return tuple(stateitem)
def add(self, item):
try:
service = self.pattern.sub("",item[3])
......@@ -826,7 +832,7 @@ class WorkManager():
return None
except:
logger.error("Error in WorkManager.add(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
def done(self, item):
try:
service = self.pattern.sub("",item[3])
......@@ -844,10 +850,10 @@ class WorkManager():
return None
except:
logger.error("Error in WorkManager.done(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
def finished(self):
return self.workcount == 0
### END WorkManager
......@@ -872,7 +878,7 @@ def usage():
#
##############################################################################
def main(argv):
### Get global variables ###
global logger
global debug
......@@ -882,12 +888,12 @@ def main(argv):
global owner_fk
global obsdict_byid
global serialdict
stop = False
# Get logger:
logger = flocklab.get_logger()
# Get the config file ---
flocklab.load_config()
......@@ -924,13 +930,13 @@ def main(argv):
print("Wrong API usage")
logger.warning("Wrong API usage")
sys.exit(errno.EINVAL)
# Check if the necessary parameters are set ---
if not testid:
print("Wrong API usage")
logger.warning("Wrong API usage")
sys.exit(errno.EINVAL)
# Check if the Test ID exists in the database ---
try:
(cn, cur) = flocklab.connect_to_db()
......@@ -946,10 +952,10 @@ def main(argv):
else:
msg = "Error when trying to get test ID from database: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
flocklab.error_logandexit(msg, errno.EIO)
# Add Test ID to logger name ---
logger.name += " (Test %d)"%testid
# Start / stop the fetcher ---
ret = flocklab.SUCCESS
if stop:
......@@ -957,7 +963,7 @@ def main(argv):
logger.info("FlockLab fetcher stopped.")
sys.exit(ret)
# Start the fetcher processes which download data from the observers:
# Start the fetcher processes which download data from the observers:
ret = start_fetcher()
if ret == flocklab.SUCCESS:
logger.info("FlockLab fetcher started.")
......@@ -965,7 +971,7 @@ def main(argv):
msg = "Start function returned error. Exiting..."
os.kill(os.getpid(), signal.SIGTERM)
rs = flocklab.error_logandexit(msg, ret)
# Get needed metadata ---
try:
(cn, cur) = flocklab.connect_to_db()
......@@ -983,15 +989,15 @@ def main(argv):
obsdict_byid = None
# Dict for serial service: 'r' means reader (data read from the target), 'w' means writer (data written to the target):
serialdict = {0: 'r', 1: 'w'}
#find out the start and stoptime of the test
cur.execute("SELECT `time_start_wish`, `time_end_wish` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
# Times are going to be of datetime type:
ret = cur.fetchone()
ret = cur.fetchone()
teststarttime = ret[0]
teststoptime = ret[1]
ppFileFormat = None
# Find out which services are used to allocate working threads later on ---
# Get the XML config from the database and check which services are used in the test.
servicesUsed_dict = {'gpiotracing': 'gpioTracingConf', 'powerprofiling': 'powerProfilingConf', 'serial': 'serialConf', 'datatrace': 'dataTraceConf'}
......@@ -1029,7 +1035,7 @@ def main(argv):
# Append log services (always used)
servicesUsed_dict['errorlog'] = True
servicesUsed_dict['timesynclog'] = True
cur.close()
cn.close()
if ((owner_fk==None) or (obsdict_byid==None)):
......@@ -1041,12 +1047,12 @@ def main(argv):
flocklab.error_logandexit(msg, errno.EAGAIN)
else:
logger.debug("Got all needed metadata.")
# Start aggregating processes ---
""" There is an infinite loop which gets files to process from the fetcher threads which download data from the observers.
Downloaded data is then assigned to a worker process for the corresponding service and in the worker process parsed,
converted (if needed) and aggregated into a single file per service.
The loop is stopped if the program receives a stop signal. In this case, the loops runs until no more database files
The loop is stopped if the program receives a stop signal. In this case, the loops runs until no more database files
are there to be processed.
"""
if __name__ == '__main__':
......@@ -1088,7 +1094,7 @@ def main(argv):
thread.start()
except:
logger.warning("Error when starting log queue thread: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
# Determine the number of CPU's to be used for each aggregating process. If a service is not used, its CPUs are assigned to other services
cpus_free = 0
cpus_logs = flocklab.config.getint('fetcher', 'cpus_errorlog')
......@@ -1131,11 +1137,11 @@ def main(argv):
if cpus_serial > 0:
cpus_serial = cpus_serial + cpus_free
cpus_total = cpus_logs + cpus_serial + cpus_gpiomonitoring + cpus_powerprofiling
service_pools_dict = { 'logs': cpus_logs, 'serial': cpus_serial, 'gpiotracing': cpus_gpiomonitoring, 'powerprofiling': cpus_powerprofiling, 'datatrace': cpus_datatrace }
if (cpus_total > multiprocessing.cpu_count()):
logger.warning("Number of requested CPUs for all aggregating processes (%d) is higher than number of available CPUs (%d) on system." % (cpus_total, multiprocessing.cpu_count()))
# Start a worker process pool for every service:
for service, cpus in service_pools_dict.items():
if cpus > 1:
......@@ -1156,7 +1162,7 @@ def main(argv):
commitsize = flocklab.config.getint('fetcher', 'commitsize')
loggerprefix = "(Mainloop) "
workmanager = WorkManager()
# Main loop ---
while True:
if mainloop_stop:
......@@ -1190,7 +1196,7 @@ def main(argv):
worker_args = [nextitem, nodeid, testresultsfile_dict['gpiotracing'][0], logqueue, None]
worker_f = worker_gpiotracing
elif (re.search("^powerprofiling_[0-9]{14}\.rld$", f) != None):
pool = service_pools_dict['powerprofiling']
pool = service_pools_dict['powerprofiling']
worker_args = [nextitem, nodeid, testresultsfile_dict['powerprofiling'][0], logqueue, ppFileFormat]
worker_f = worker_powerprof
elif (re.search("^serial_[0-9]{14}\.db$", f) != None):
......@@ -1230,11 +1236,11 @@ def main(argv):
logger.debug("Waiting for pool for %s to close..." % service)
pool.join()
logger.debug("Closed all pools.")
# Stop logging:
logger.debug("Stopping log queue thread...")
LogQueueThread_stopEvent.set()
# Set DB status:
logger.debug("Setting test status in DB to 'synced'...")
try:
......@@ -1244,7 +1250,7 @@ def main(argv):
cn.close()
except:
logger.warning("Could not connect to database.")
# Delete the obsfile directories as they are not needed anymore:
if ((obsfiledir != None) and (os.path.exists(obsfiledir))):
shutil.rmtree(obsfiledir)
......@@ -1259,7 +1265,7 @@ def main(argv):
flocklab.error_logandexit(msg, errno.EBADMSG)
else:
ret = flocklab.SUCCESS
sys.exit(ret)
### END main()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment