Commit 23184bdd authored by Reto Da Forno's avatar Reto Da Forno
Browse files

error handling in dispatcher and fetcher improved

parent 30fc22dc
......@@ -119,7 +119,7 @@ class StopTestThread(threading.Thread):
logger.error(msg)
else:
errors.append(("Test stop script on observer ID %s failed with error code %d." % (str(self._obsdict_key[self._obskey][1]), rs), rs, self._obsdict_key[self._obskey][1]))
logger.error("Test stop script on observer ID %s failed with error code %d:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out).strip()))
logger.error("Test stop script on observer ID %s failed with error code %d:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(err).strip()))
logger.error("Tried to execute: %s" % (" ".join(cmd)))
except:
logger.debug("Exception: %s, %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
......@@ -221,6 +221,7 @@ class StartTestThread(threading.Thread):
while p.returncode == None:
self._abortEvent.wait(1.0)
p.poll()
out = ""
if self._abortEvent.is_set():
p.kill()
logger.debug("Abort is set, start test process for observer %s killed." % (self._obsdict_key[self._obskey][1]))
......@@ -229,7 +230,7 @@ class StartTestThread(threading.Thread):
rs = p.wait()
if rs != flocklab.SUCCESS:
errors.append(("Test start script on observer ID %s failed with error code %d." % (self._obsdict_key[self._obskey][1], rs), rs, self._obsdict_key[self._obskey][1]))
logger.error("Test start script on observer ID %s failed with error code %d and message:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out)))
logger.error("Test start script on observer ID %s failed with error code %d:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(err)))
else:
logger.debug("Test start script on observer ID %s succeeded (took %us)." % (self._obsdict_key[self._obskey][1], int(time.time() - starttime)))
# Remove image file and xml on server:
......@@ -727,7 +728,7 @@ def start_test(testid, cur, cn, obsdict_key, obsdict_id):
# check if we're still in time ---
if len(errors) == 0:
now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime(time.time() - 10)) # allow 10s tolerance
cur.execute("SELECT `serv_tests_key` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d AND `time_start_wish` <= '%s'" % (testid, now))
if cur.fetchone() is not None:
msg = "Setup for test ID %d took too much time." % (testid)
......@@ -766,14 +767,14 @@ def stop_test(testid, cur, cn, obsdict_key, obsdict_id, abort=False):
warnings = []
try:
logger.info("Stopping test %d..."%testid)
logger.info("Stopping test %d..." % testid)
# Update DB status ---
if abort:
status = 'aborting'
else:
status = 'cleaning up'
logger.debug("Setting test status in DB to %s..." %status)
logger.debug("Setting test status in DB to %s..." % status)
if flocklab.set_test_status(cur, cn, testid, status) != flocklab.SUCCESS:
msg = "Failed to set test status in DB."
errors.append(msg)
......@@ -781,10 +782,10 @@ def stop_test(testid, cur, cn, obsdict_key, obsdict_id, abort=False):
# Stop serial proxy ---
# Get the XML config from the database and check if the serial service was used in the test:
cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" % testid)
ret = cur.fetchone()
if not ret:
msg = "No XML found in database for testid %d." %testid
msg = "No XML found in database for testid %d." % testid
errors.append(msg)
logger.error(msg)
else:
......
......@@ -233,7 +233,7 @@ def worker_dbfiles(queueitem=None, nodeid=None, resultfile_path=None, resultfile
break # dbfile has been closed in parser (most likely because EOF was reached)
except DbFileReadError as err:
msg = "%s: Packet size (%i) did not match payload size (%i) @ %d." %(input_filename, err.expectedSize, err.actualSize, err.fpos)
_errors.append((msg, errno.EIO, obsid))
_errors.append((msg, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
if (len(conv_values) > 0):
# There is still data left. Do a last commit
......@@ -248,7 +248,7 @@ def worker_dbfiles(queueitem=None, nodeid=None, resultfile_path=None, resultfile
os.unlink(input_filename)
except:
msg = "Error in worker process: %s: %s\n%s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
_errors.append((msg, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
processeditem = list(queueitem)
......@@ -285,7 +285,7 @@ def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, logque
os.remove(inputfilename)
except:
msg = "Error in gpiotracing worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
_errors.append((msg, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
processeditem = list(queueitem)
......@@ -318,7 +318,7 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue
except FileExistsError:
# TODO: properly handle case where file already exists (several rld files per observer)
msg = "File '%s' already exists, dropping test results." % (resfilename)
_errors.append((msg, errno.EEXIST, obsid))
_errors.append((msg, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
else:
# CSV file format
......@@ -336,7 +336,7 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue
os.remove(inputfilename)
except:
msg = "Error in powerprof worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
_errors.append((msg, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
processeditem = list(queueitem)
......@@ -367,7 +367,7 @@ def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None
os.remove(inputfilename)
except:
msg = "Error in logs worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
_errors.append((msg, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
processeditem = list(queueitem)
......@@ -409,7 +409,7 @@ def worker_serial(queueitem=None, nodeid=None, resultfile_path=None, logqueue=No
os.remove(inputfilename)
except:
msg = "Error in serial worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
_errors.append((msg, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
processeditem = list(queueitem)
......@@ -439,28 +439,29 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue
(fd2, tmpfile2) = tempfile.mkstemp()
dwt.parse_dwt_output(input_filename, tmpfile1)
# apply linear regression to correct the timestamps
dwt.correct_ts_with_regression(tmpfile1, tmpfile2)
with open(resultfile_path, "a") as outfile:
infile = open(tmpfile2, "r")
for line in infile:
# input format: global_ts, comparator, data, PC, operation, local_ts
(timestamp, var, val, pc, access, localts) = line.strip().split(',')
if access == 'operation' or access == '' or var == '':
continue
if flocklab.parse_int(var) < len(varnames):
var = varnames[flocklab.parse_int(var)]
# output format: timestamp,observer_id,node_id,variable,value,access,pc
outfile.write("%s,%s,%s,%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, var, val, access, pc))
infile.close()
try:
dwt.correct_ts_with_regression(tmpfile1, tmpfile2)
except ValueError:
logqueue.put_nowait((loggername, logging.WARNING, "Empty data trace results file."))
else:
with open(resultfile_path, "a") as outfile:
infile = open(tmpfile2, "r")
for line in infile:
# input format: global_ts, comparator, data, PC, operation, local_ts
(timestamp, var, val, pc, access, localts) = line.strip().split(',')
if access == 'operation' or access == '' or var == '':
continue
if flocklab.parse_int(var) < len(varnames):
var = varnames[flocklab.parse_int(var)]
# output format: timestamp,observer_id,node_id,variable,value,access,pc
outfile.write("%s,%s,%s,%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, var, val, access, pc))
infile.close()
# debug
#shutil.copyfile(input_filename, "%s_raw" % resultfile_path)
#shutil.copyfile(tmpfile1, "%s_uncorrected.csv" % resultfile_path)
except:
msg = "Error in datatrace worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
# for some reason, the logging below does not work properly -> print msg into the log directly
#logger = flocklab.get_logger()
#logger.error(msg)
_errors.append((msg, errno.ECOMM, obsid))
_errors.append((msg, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
# delete files
......@@ -483,11 +484,17 @@ def worker_callback(result):
global errors
global FetchObsThread_queue
# the result contains two elements:
# 1st: a list of errors
if len(result[0]) > 0:
for (err, eno, obsid) in result:
msg = "Error %d when processing results for Observer ID %s: %s" % (eno, obsid, err)
errors.append(msg)
try:
for (err, obsid) in result[0]:
msg = "Error %d when processing results for Observer ID %s: %s" % (obsid, err)
errors.append(msg)
except:
errors.append("Failed to convert the error list in worker_callback.")
# 2nd: a list of the processed elements
try:
FetchObsThread_queue.put(item=result[1], block=True, timeout=10)
except queue.Full:
......@@ -636,7 +643,7 @@ class FetchObsThread(threading.Thread):
else:
self._logger.debug(self._loggerprefix + "No files to download from observer.")
if removelast == False: # this is the last execution of the while loop
if False and removelast == False: # this is the last execution of the while loop
cmd = ['ssh' ,'%s'%(self._obsethernet), "rm -rf %s" % self._obstestresfolder]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
out, err = p.communicate(None)
......@@ -818,7 +825,6 @@ class WorkManager():
self.workcount = self.workcount + 1
return None
except:
logger = flocklab.get_logger()
logger.error("Error in WorkManager.add(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
def done(self, item):
......@@ -837,7 +843,6 @@ class WorkManager():
else:
return None
except:
logger = flocklab.get_logger()
logger.error("Error in WorkManager.done(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
def finished(self):
......@@ -1179,7 +1184,6 @@ def main(argv):
(itemtype, obsid, fdir, f, workerstate) = nextitem
#logger.debug(loggerprefix + "Next item is %s/%s (Obs%s)." % (fdir, f, str(obsid)))
nodeid = obsdict_byid[obsid][1]
callback_f = worker_callback
# Match the filename against the patterns and schedule an appropriate worker function:
if (re.search("^gpio_monitor_[0-9]{14}\.csv$", f) != None):
pool = service_pools_dict['gpiotracing']
......@@ -1213,7 +1217,7 @@ def main(argv):
logger.warning(loggerprefix + "Results file %s/%s from observer %s did not match any of the known patterns" % (fdir, f, obsid))
continue
# Schedule worker function from the service's pool. The result will be reported to the callback function.
pool.apply_async(func=worker_f, args=tuple(worker_args), callback=callback_f)
pool.apply_async(func=worker_f, args=tuple(worker_args), callback=worker_callback)
# Stop signal for main loop has been set ---
# Stop worker pool:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment