Commit d2396a17 authored by Reto Da Forno's avatar Reto Da Forno
Browse files

testmanagement server code updated

parent 019a0760
#! /usr/bin/env python3
import sys, os, getopt, errno, traceback, logging, time, __main__, shutil, glob, datetime
import sys, os, getopt, errno, traceback, logging, time, __main__, shutil, glob, datetime, subprocess
import lib.flocklab as flocklab
......@@ -90,7 +90,6 @@ def main(argv):
FROM `tbl_serv_tests`
WHERE (`test_status` = 'todelete')
"""
#logger.info("Looking in DB for tests which are marked to be deleted...")
if ( cur.execute(sql) <= 0 ):
logger.info("No tests found which are marked to be deleted.")
else:
......@@ -133,9 +132,9 @@ def main(argv):
# Delete test itself ---
if delete_all:
# Delete test itself:
sql = """ DELETE FROM `tbl_serv_tests`
WHERE (`serv_tests_key` = %s)
"""
sql = """DELETE FROM `tbl_serv_tests`
WHERE (`serv_tests_key` = %s)
"""
starttime = time.time()
num_deleted_rows = cur.execute(sql%(testid))
cn.commit()
......@@ -157,12 +156,61 @@ def main(argv):
logger.debug("Removing viz cache %s..."%path)
shutil.rmtree(path)
# Check for tests that are stuck for 60 minutes ---
sql = """SELECT `serv_tests_key` FROM `tbl_serv_tests`
WHERE `test_status` IN ('preparing', 'aborting', 'syncing', 'synced')
AND TIMESTAMPDIFF(MINUTE, `time_end_wish`, NOW()) > 60
"""
if cur.execute(sql) <= 0:
logger.info("No tests found which are marked to be deleted.")
else:
rs = cur.fetchall()
testids = []
for testid in rs:
testids.append(str(testid[0]))
# set test status to failed
sql = "UPDATE `tbl_serv_tests` SET `test_status`='failed' WHERE `serv_tests_key` IN (%s)" % (", ".join(testids))
logger.debug("SQL query: %s" % sql)
cur.execute(sql)
cn.commit()
msg = "Found %d stuck tests in the database (IDs: %s). Test status set to 'failed'." % (len(rs), ", ".join(testids))
logger.debug(msg)
emails = flocklab.get_admin_emails(cur)
if emails != flocklab.FAILED:
flocklab.send_mail(subject="[FlockLab Cleaner]", message=msg, recipients=emails)
# Check for stuck threads that have been running for more than 1 day
cmd = ["ps", "-U", "flocklab", "-o", "pid:5=,cmd:50=,etime="]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
out, err = p.communicate()
lines = out.strip().split("\n")
pids = []
for line in lines:
try:
pid = int(line[0:6].strip())
command = line[6:56].strip()
runtime = line[56:].strip()
if ("flocklab_fetcher" in command) and ("-" in runtime):
pids.append(pid)
except:
logger.debug("Failed to parse output from 'ps'. Line was: %s" % line)
break
if len(pids) > 0:
# kill the stuck threads
for pid in pids:
os.kill(pid, signal.SIGKILL)
msg = "%d stuck threads terminated (PIDs: %s" % (len(pids), ", ".join(pids))
logger.debug(msg)
emails = flocklab.get_admin_emails(cur)
if emails != flocklab.FAILED:
flocklab.send_mail(subject="[FlockLab Cleaner]", message=msg, recipients=emails)
except:
msg = "Encountered error: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
logger.error(msg)
emails = flocklab.get_admin_emails(cur, config)
emails = flocklab.get_admin_emails(cur)
msg = "%s on server %s encountered error:\n\n%s" % (__file__, os.uname()[1], msg)
flocklab.send_mail(subject="[FlockLab %s]"%name, message=msg, recipients=emails)
flocklab.send_mail(subject="[FlockLab Cleaner]", message=msg, recipients=emails)
finally:
cur.close()
cn.close()
......
......@@ -463,7 +463,7 @@ def start_test(testid, cur, cn, obsdict_key, obsdict_id):
else:
xmldict_key[obskey][1].write("\t<firmware>%s</firmware>\n" % (imagedict_key[obskey][0][4]))
for coreimage in imagedict_key[obskey]:
xmldict_key[obskey][1].write("\t<image core=\"%d\">%s%d/%s</image>\n" % (coreimage[5], flocklab.config.get("observer", "testconfigfolder"),testid, os.path.basename(coreimage[0])))
xmldict_key[obskey][1].write("\t<image core=\"%d\">%s/%d/%s</image>\n" % (coreimage[5], flocklab.config.get("observer", "testconfigfolder"),testid, os.path.basename(coreimage[0])))
xmldict_key[obskey][1].write("\t<slotnr>%s</slotnr>\n" % (imagedict_key[obskey][0][1]))
xmldict_key[obskey][1].write("\t<platform>%s</platform>\n" % (imagedict_key[obskey][0][2]))
xmldict_key[obskey][1].write("\t<os>%s</os>\n" % (imagedict_key[obskey][0][3]))
......@@ -473,7 +473,6 @@ def start_test(testid, cur, cn, obsdict_key, obsdict_id):
# update test_image mapping with slot information
cur.execute("UPDATE `tbl_serv_map_test_observer_targetimages` SET `slot` = %s WHERE `observer_fk` = %d AND `test_fk`=%d" % (slot, obskey, testid))
cn.commit()
# serialConf ---
srconfs = tree.xpath('//d:serialConf', namespaces=ns)
......@@ -504,40 +503,42 @@ def start_test(testid, cur, cn, obsdict_key, obsdict_id):
xmldict_key[obskey][1].write(xmlblock)
#logger.debug("Wrote obsSerialConf XML for observer ID %s" %obsid)
else:
logger.debug("No <serialConf> found, not using serial service")
logger.debug("No <serialConf> found, not using serial service.")
# gpioTracingConf ---
gmconfs = tree.xpath('//d:gpioTracingConf', namespaces=ns)
for gmconf in gmconfs:
obsids = gmconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
pinconfs = gmconf.xpath('d:pinConf', namespaces=ns)
xmlblock = "<obsGpioMonitorConf>\n"
for pinconf in pinconfs:
pin = pinconf.xpath('d:pin', namespaces=ns)[0].text.strip()
edge = pinconf.xpath('d:edge', namespaces=ns)[0].text.strip()
mode = pinconf.xpath('d:mode', namespaces=ns)[0].text.strip()
xmlblock += "\t<pinConf>\n\t\t<pin>%s</pin>\n\t\t<edge>%s</edge>\n\t\t<mode>%s</mode>\n" %(pin, edge, mode)
cb_gs_add = pinconf.xpath('d:callbackGpioActAdd', namespaces=ns)
if cb_gs_add:
pin = cb_gs_add[0].xpath('d:pin', namespaces=ns)[0].text.strip()
level = cb_gs_add[0].xpath('d:level', namespaces=ns)[0].text.strip()
offsets = cb_gs_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
offsetms = cb_gs_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
xmlblock += "\t\t<callbackGpioSetAdd>\n\t\t\t<pin>%s</pin>\n\t\t\t<level>%s</level>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackGpioSetAdd>\n" %(pin, level, offsets, offsetms)
cb_pp_add = pinconf.xpath('d:callbackPowerProfAdd', namespaces=ns)
if cb_pp_add:
duration = cb_pp_add[0].xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()
offsets = cb_pp_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
offsetms = cb_pp_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
xmlblock += "\t\t<callbackPowerprofAdd>\n\t\t\t<duration>%s</duration>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackPowerprofAdd>\n" %(duration, offsets, offsetms)
xmlblock += "\t</pinConf>\n"
xmlblock += "</obsGpioMonitorConf>\n\n"
for obsid in obsids:
obsid = int(obsid)
obskey = obsdict_id[obsid][0]
xmldict_key[obskey][1].write(xmlblock)
#logger.debug("Wrote obsGpioMonitorConf XML for observer ID %s" %obsid)
if gmconfs:
for gmconf in gmconfs:
obsids = gmconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
pinconfs = gmconf.xpath('d:pinConf', namespaces=ns)
xmlblock = "<obsGpioMonitorConf>\n"
for pinconf in pinconfs:
pin = pinconf.xpath('d:pin', namespaces=ns)[0].text.strip()
edge = pinconf.xpath('d:edge', namespaces=ns)[0].text.strip()
mode = pinconf.xpath('d:mode', namespaces=ns)[0].text.strip()
xmlblock += "\t<pinConf>\n\t\t<pin>%s</pin>\n\t\t<edge>%s</edge>\n\t\t<mode>%s</mode>\n" %(pin, edge, mode)
cb_gs_add = pinconf.xpath('d:callbackGpioActAdd', namespaces=ns)
if cb_gs_add:
pin = cb_gs_add[0].xpath('d:pin', namespaces=ns)[0].text.strip()
level = cb_gs_add[0].xpath('d:level', namespaces=ns)[0].text.strip()
offsets = cb_gs_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
offsetms = cb_gs_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
xmlblock += "\t\t<callbackGpioSetAdd>\n\t\t\t<pin>%s</pin>\n\t\t\t<level>%s</level>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackGpioSetAdd>\n" %(pin, level, offsets, offsetms)
cb_pp_add = pinconf.xpath('d:callbackPowerProfAdd', namespaces=ns)
if cb_pp_add:
duration = cb_pp_add[0].xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()
offsets = cb_pp_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
offsetms = cb_pp_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
xmlblock += "\t\t<callbackPowerprofAdd>\n\t\t\t<duration>%s</duration>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackPowerprofAdd>\n" %(duration, offsets, offsetms)
xmlblock += "\t</pinConf>\n"
xmlblock += "</obsGpioMonitorConf>\n\n"
for obsid in obsids:
obsid = int(obsid)
obskey = obsdict_id[obsid][0]
xmldict_key[obskey][1].write(xmlblock)
#logger.debug("Wrote obsGpioMonitorConf XML for observer ID %s" %obsid)
else:
logger.debug("No <gpioTracingConf> found, not using GPIO tracing service.")
# gpioActuationConf ---
# Create 2 pin settings for every observer used in the test:
......@@ -595,50 +596,54 @@ def start_test(testid, cur, cn, obsdict_key, obsdict_id):
xmlblock = "</obsGpioSettingConf>\n\n"
for obskey in obsdict_key.keys():
xmldict_key[obskey][1].write(xmlblock)
# powerProfilingConf ---
ppconfs = tree.xpath('//d:powerProfilingConf', namespaces=ns)
for ppconf in ppconfs:
obsids = ppconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
profconfs = ppconf.xpath('d:profConf', namespaces=ns)
xmlblock = "<obsPowerprofConf>\n"
for profconf in profconfs:
duration = profconf.xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()
xmlblock += "\t<profConf>\n\t\t<duration>%s</duration>" %duration
abs_tim = profconf.xpath('d:absoluteTime', namespaces=ns)
if abs_tim:
absdatetime = absolute2absoluteUTC_time(abs_tim[0].xpath('d:absoluteDateTime', namespaces=ns)[0].text.strip()) # parse xml date
ret = abs_tim[0].xpath('d:absoluteMicrosecs', namespaces=ns)
if ret:
absmicrosec = ret[0].text.strip()
else:
absmicrosec = 0
rel_tim = profconf.xpath('d:relativeTime', namespaces=ns)
if rel_tim:
relsec = int(rel_tim[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip())
ret = rel_tim[0].xpath('d:offsetMicrosecs', namespaces=ns)
if ret:
relmicrosec = int(ret[0].text.strip())
if ppconfs:
for ppconf in ppconfs:
obsids = ppconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
profconfs = ppconf.xpath('d:profConf', namespaces=ns)
xmlblock = "<obsPowerprofConf>\n"
for profconf in profconfs:
duration = profconf.xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()
xmlblock += "\t<profConf>\n\t\t<duration>%s</duration>" %duration
abs_tim = profconf.xpath('d:absoluteTime', namespaces=ns)
if abs_tim:
absdatetime = absolute2absoluteUTC_time(abs_tim[0].xpath('d:absoluteDateTime', namespaces=ns)[0].text.strip()) # parse xml date
ret = abs_tim[0].xpath('d:absoluteMicrosecs', namespaces=ns)
if ret:
absmicrosec = ret[0].text.strip()
else:
absmicrosec = 0
rel_tim = profconf.xpath('d:relativeTime', namespaces=ns)
if rel_tim:
relsec = int(rel_tim[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip())
ret = rel_tim[0].xpath('d:offsetMicrosecs', namespaces=ns)
if ret:
relmicrosec = int(ret[0].text.strip())
else:
relmicrosec = 0
# Relative times need to be converted into absolute times:
absmicrosec, absdatetime = relative2absolute_time(starttime, relsec, relmicrosec)
xmlblock += "\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%s</absoluteMicrosecs>\n\t\t</absoluteTime>" %(absdatetime, absmicrosec)
samplingdivider = profconf.xpath('d:samplingDivider', namespaces=ns)
if samplingdivider:
samplingdivider = samplingdivider[0].text.strip()
else:
relmicrosec = 0
# Relative times need to be converted into absolute times:
absmicrosec, absdatetime = relative2absolute_time(starttime, relsec, relmicrosec)
xmlblock += "\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%s</absoluteMicrosecs>\n\t\t</absoluteTime>" %(absdatetime, absmicrosec)
samplingdivider = profconf.xpath('d:samplingDivider', namespaces=ns)
if samplingdivider:
samplingdivider = samplingdivider[0].text.strip()
else:
samplingdivider = flocklab.config.get('dispatcher', 'default_sampling_divider')
xmlblock += "\n\t\t<samplingDivider>%s</samplingDivider>"%samplingdivider
xmlblock += "\n\t</profConf>\n"
xmlblock += "</obsPowerprofConf>\n\n"
for obsid in obsids:
obsid = int(obsid)
obskey = obsdict_id[obsid][0]
xmldict_key[obskey][1].write(xmlblock)
#logger.debug("Wrote obsPowerprofConf XML for observer ID %s" %obsid)
samplingdivider = flocklab.config.get('dispatcher', 'default_sampling_divider')
xmlblock += "\n\t\t<samplingDivider>%s</samplingDivider>"%samplingdivider
xmlblock += "\n\t</profConf>\n"
xmlblock += "</obsPowerprofConf>\n\n"
for obsid in obsids:
obsid = int(obsid)
obskey = obsdict_id[obsid][0]
xmldict_key[obskey][1].write(xmlblock)
#logger.debug("Wrote obsPowerprofConf XML for observer ID %s" %obsid)
else:
logger.debug("No <powerProfilingConf> found, not using power profiling service.")
logger.debug("Wrote all observer XML configs.")
# Close XML files ---
for xmlpath, xmlfhand in xmldict_key.values():
xmlfhand.write("</obsConf>\n")
......
......@@ -52,7 +52,7 @@ class ServiceInfo():
def __init__(self, servicename):
self.servicename = servicename
self.files = []
self.pattern = "^%s_[0-9]*\.db$" % servicename
self.pattern = "^%s_[0-9]+\.[a-z]+$" % servicename
def matchFileName(self, filename):
return re.search(self.pattern, os.path.basename(filename)) is not None
......@@ -158,7 +158,7 @@ def convert_error_log(obsdata, observer_id, node_id):
#
# read_from_db_file: Read from an open DB file from an observer
#
##############################################################################
##############################################################################
def read_from_db_file(dbfile):
_buf = dbfile.read(4)
if len(_buf) < 4:
......@@ -269,18 +269,22 @@ def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=No
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, slotcalib_factor=1, slotcalib_offset=0, vizimgdir=None, viz_f=None, logqueue=None):
try:
_errors = []
#cur_p = multiprocessing.current_process()
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir,f)
#loggername = "(%s).(Observer %d)" % (cur_p.name, obsid)
obsdbfile_path = "%s/%s" % (fdir, f)
loggername = "(%s).(Observer %d)" % (cur_p.name, obsid)
with open(resultfile_path, "a") as outfile:
infile = open(obsdbfile_path, "r")
for line in infile:
outfile.write("%s,%s,%s" % (obsid, nodeid, str(line)))
infile.close()
#os.remove(obsdbfile_path)
infile = open(obsdbfile_path, "r")
for line in infile:
try:
(timestamp, pin, level) = line.split(',')
outfile.write("%s,%s,%s,%s,%s" % (timestamp, obsid, nodeid, pin, level))
except ValueError:
logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
break
infile.close()
os.remove(obsdbfile_path)
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
......@@ -309,9 +313,13 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, slotcali
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir,f)
loggername = "(%s).(Observer %d)" % (cur_p.name, obsid)
obsdbfile_path = "%s/%s" % (fdir, f)
loggername = "(%s.Obs%d) " % (cur_p.name, obsid)
resultsfile = "%s_%s.%s" % (os.path.splitext(resultfile_path)[0], obsid, os.path.splitext(resultfile_path)[1])
# TODO for now, just move the file without modifying it
os.rename(obsdbfile_path, resultsfile)
logger.debug("File '%s' moved to '%s'." % (obsdbfile_path, resultsfile))
'''
with open(resultfile_path, "a") as outfile:
infile = open(obsdbfile_path, "r")
for line in infile:
......@@ -319,7 +327,7 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, slotcali
infile.close()
os.remove(obsdbfile_path)
logger.debug("Finished with powerprof collection.")
'''
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
......@@ -383,7 +391,7 @@ class LogQueueThread(threading.Thread):
self._logger.log(loglevel, loggername + msg)
except queue.Empty:
pass
# Stop the process:
self._logger.info("LogQueueThread stopped")
### END LogQueueThread
......@@ -413,7 +421,7 @@ class FetchObsThread(threading.Thread):
def run(self):
try:
self._loggerprefix = "(FetchObsThread).(Observer %d): "%self._obsid
self._loggerprefix = "(FetchObsThread.Obs%d) "%self._obsid
self._logger.info(self._loggerprefix + "FetchObsThread starting...")
removelast = True
......@@ -437,7 +445,7 @@ class FetchObsThread(threading.Thread):
rs = p.returncode
if (rs == flocklab.SUCCESS):
services = {}
for servicename in [ "gpio_setting","gpio_monitor","powerprofiling","serial" ]:
for servicename in [ "gpio_setting", "gpio_monitor", "powerprofiling", "serial" ]:
services[servicename] = ServiceInfo(servicename)
services["error_%s"%servicename] = ServiceInfo("error_%s"%servicename)
# Read filenames
......@@ -454,14 +462,14 @@ class FetchObsThread(threading.Thread):
for dbfile in service.files:
copyfilelist.append(dbfile)
#if (len(service.files) > 0):
# self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (service.files, service.servicename))
# self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (" ".join(service.files), service.servicename))
if len(copyfilelist) > 0:
# Download the database files:
self._logger.debug(self._loggerprefix + "Downloading database files...")
self._logger.debug(self._loggerprefix + "Downloading database files %s" % (" ".join(copyfilelist)))
cmd = ['scp', '-q' ]
cmd.extend(["%s:%s" % (self._obsethernet, x) for x in copyfilelist])
cmd.append("%s/"%self._obsfiledir)
cmd.append("%s/" % self._obsfiledir)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
out, err = p.communicate(None)
rs = p.wait()
......@@ -469,7 +477,7 @@ class FetchObsThread(threading.Thread):
self._logger.debug(self._loggerprefix + "Could not download all DB files from observer. Dataloss occurred for this observer.")
self._logger.debug(self._loggerprefix + "Tried to execute %s, result was %d, stdout: %s, error: %s" % (str(cmd), rs, out, err))
else:
#self._logger.debug("Downloaded all observer DB files from observer.")
self._logger.debug("Downloaded result DB files from observer.")
# put a copy to the debug directory
for f in copyfilelist:
fname = os.path.basename(f)
......@@ -505,7 +513,8 @@ class FetchObsThread(threading.Thread):
out, err = p.communicate(None)
rs = p.wait()
if (rs != flocklab.SUCCESS):
self._logger.error(self._loggerprefix + "Could not remove db directory from observer, result was %d, stdout: %s, error: %s" % (rs, out, err))
self._logger.error(self._loggerprefix + "Could not remove results directory from observer, result was %d, stdout: %s, error: %s" % (rs, out, err))
else:
self._logger.error(self._loggerprefix + "SSH to observer did not succeed, result was %d, error: %s" % (rs, err))
break # abort
......@@ -541,11 +550,11 @@ def start_fetcher():
except:
flocklab.error_logandexit("Could not connect to database", errno.EAGAIN)
try:
cur.execute(""" SELECT `a`.observer_id, `a`.ethernet_address
FROM `tbl_serv_observer` AS `a`
LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk
WHERE `b`.test_fk = %d GROUP BY `a`.observer_id;
""" %testid)
cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address
FROM `tbl_serv_observer` AS `a`
LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk
WHERE `b`.test_fk = %d GROUP BY `a`.observer_id;
""" % testid)
except MySQLdb.Error as err:
msg = str(err)
flocklab.error_logandexit(msg, errno.EIO)
......@@ -561,10 +570,10 @@ def start_fetcher():
# Start fetcher threads ---
# Create a directory structure to store the downloaded files from the DB:
obsfiledir = "%s/%d" %(flocklab.config.get('fetcher', 'obsfile_dir'), testid)
obsfiledir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_dir'), testid)
if not os.path.exists(obsfiledir):
os.makedirs(obsfiledir)
obsfiledebugdir = "%s/%d" %(flocklab.config.get('fetcher', 'obsfile_debug_dir'), testid)
obsfiledebugdir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_debug_dir'), testid)
if not os.path.exists(obsfiledebugdir):
os.makedirs(obsfiledebugdir)
#DEBUG logger.debug("Created %s"%obsfiledir)
......@@ -618,13 +627,15 @@ def stop_fetcher():
time.sleep(1)
shutdown_timeout = shutdown_timeout - 1
if os.path.exists(pidpath):
logger.warn("Fetcher is still running.")
logger.error("Fetcher is still running, killing process...")
# send kill signal
os.kill(pid, signal.SIGKILL)
raise ValueError
except:
pass
else:
raise ValueError
except (ValueError):
logger.debug("Fetcher daemon was not running, thus it cannot be stopped.")
except ValueError:
# Set DB status in order to allow dispatcher and scheduler to go on.:
logger.debug("Setting test status in DB to 'synced'...")
try:
......@@ -878,9 +889,9 @@ def main(argv):
tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
ns = {'d': flocklab.config.get('xml', 'namespace')}
for service, xmlname in servicesUsed_dict.items():
if tree.xpath('//d:%s'%xmlname, namespaces=ns):
if tree.xpath('//d:%s' % xmlname, namespaces=ns):
servicesUsed_dict[service] = True
logger.debug("Usage of %s detected."%service)
logger.debug("Usage of %s detected." % service)
else:
servicesUsed_dict[service] = False
except:
......@@ -912,10 +923,10 @@ def main(argv):
testresultsdir = "%s/%d" %(flocklab.config.get('fetcher', 'testresults_dir'), testid)
if not os.path.exists(testresultsdir):
os.makedirs(testresultsdir)
logger.debug("Created %s"%testresultsdir)
logger.debug("Created %s" % testresultsdir)
manager = multiprocessing.Manager()
for service in ('errorlog', 'gpiotracing', 'gpioactuation', 'powerprofiling', 'serial', 'powerprofilingstats'):
path = "%s/%s.csv" %(testresultsdir, service)
path = "%s/%s.csv" % (testresultsdir, service)
lock = manager.Lock()
testresultsfile_dict[service] = (path, lock)
# Create file and write header:
......@@ -1015,14 +1026,21 @@ def main(argv):
vizimgdir = flocklab.config.get('viz','imgdir')
commitsize = flocklab.config.getint('fetcher', 'commitsize')
enableviz = flocklab.config.getint('viz','enablepreview')
loggerprefix = "(Mainloop): "
loggerprefix = "(Mainloop) "
workmanager = WorkManager()
# Main loop ---
while 1:
if mainloop_stop and workmanager.finished() and FetchObsThread_queue.empty():
# exit main loop
logger.debug("work manager has nothing more to do, finishing up..")
break
if mainloop_stop:
if workmanager.finished() and FetchObsThread_queue.empty():
# exit main loop
logger.debug("Work manager has nothing more to do, finishing up..")
break
else:
if FetchObsThread_queue.empty():
logger.debug("Received stop signal, but the fetcher queue is not yet empty...")
else:
logger.debug("Received stop signal, but workmanager is still busy...")
time.sleep(5)
# Wait for FetchObsThreads to put items on queue:
try:
item = FetchObsThread_queue.get(block=True, timeout=5)
......@@ -1044,48 +1062,38 @@ def main(argv):
nodeid = obsdict_byid[obsid][0]
callback_f = worker_callback
worker_f = worker_convert_and_aggregate
quick_test = False
# Match the filename against the patterns and schedule an appropriate worker function:
if (re.search("^gpio_setting_[0-9]{14}\.db$", f) != None):
#logger.debug(loggerprefix + "File %s contains GPIO setting results"%f)
pool = service_pools_dict['gpioactuation']
worker_args = [nextitem, nodeid, testresultsfile_dict['gpioactuation'][0], testresultsfile_dict['gpioactuation'][1], commitsize, vizimgdir, parse_gpio_setting, convert_gpio_setting, None, logqueue]
elif (re.search("^gpio_monitor_[0-9]{14}\.db$", f) != None):
#logger.debug(loggerprefix + "File %s contains GPIO monitoring results"%f)
worker_args = [nextitem, nodeid, testresultsfile_dict['gpioactuation'][0], testresultsfile_dict['gpioactuation'][1], commitsize, vizimgdir, parse_gpio_setting, convert_gpio_setting, None, logqueue]
elif (re.search("^gpio_monitor_[0-9]{14}\.csv$", f) != None):
pool = service_pools_dict['gpiotracing']
worker_args = [nextitem, nodeid, testresultsfile_dict['gpiotracing'][0], obsdict_byid[obsid][1][1], obsdict_byid[obsid][1][0], vizimgdir, None, logqueue]
worker_f = worker_gpiotracing
logger.debug(loggerprefix + "resultfile_path: %s" % str(testresultsfile_dict['gpiotracing'][0]))
logger.debug(loggerprefix + "queue item: %s" % str(nextitem))
logger.debug(loggerprefix + "node id: %s" % str(nodeid))
worker_args = [nextitem, nodeid, testresultsfile_dict['gpiotracing'][0], obsdict_byid[obsid][1][1], obsdict_byid[obsid][1][0], vizimgdir, None, logqueue]
worker_f = worker_gpiotracing
quick_test = True
if (enableviz == 1):
worker_args[6] = flocklab.viz_gpio_monitor
elif (re.search("^powerprofiling_[0-9]{14}\.db$", f) != None):
#logger.debug(loggerprefix + "File %s contains power profiling results"%f)
#if (enableviz == 1):
# worker_args[6] = flocklab.viz_gpio_monitor
elif (re.search("^powerprofiling_[0-9]{14}\.rld$", f) != None):
# Power profiling has a special worker function which parses the whole file in a C module:
pool = service_pools_dict['powerprofiling']
worker_args = [nextitem, nodeid, testresultsfile_dict['powerprofiling'][0], obsdict_byid[obsid][1][1], obsdict_byid[obsid][1][0], vizimgdir, None, logqueue, PpStatsQueue]
worker_f = worker_powerprof
if (enableviz == 1):
worker_args[6] = flocklab.viz_powerprofiling
#if (enableviz == 1):
# worker_args[6] = flocklab.viz_powerprofiling
elif (re.search("^serial_[0-9]{14}\.db$", f) != None):
#logger.debug(loggerprefix + "File %s contains serial service results"%f)
pool = service_pools_dict['serial']
worker_args = [nextitem, nodeid, testresultsfile_dict['serial'][0], testresultsfile_dict['serial'][1], commitsize, vizimgdir, parse_serial, convert_serial, None, logqueue]
worker_args = [nextitem, nodeid, testresultsfile_dict['serial'][0], testresultsfile_dict['serial'][1], commitsize, vizimgdir, parse_serial, convert_serial, None, logqueue]
elif (re.search("^error_.*_[0-9]{14}\.db$", f) != None):
logger.debug(loggerprefix + "File %s contains error logs"%f)
pool = service_pools_dict['errorlog']
worker_args = [nextitem, nodeid, testresultsfile_dict['errorlog'][0], testresultsfile_dict['errorlog'][1], commitsize, vizimgdir, parse_error_log, convert_error_log, None, logqueue]
worker_args = [nextitem, nodeid, testresultsfile_dict['errorlog'][0], testresultsfile_dict['errorlog'][1], commitsize, vizimgdir, parse_error_log, convert_error_log, None, logqueue]
else:
logger.warn(loggerprefix + "DB file %s/%s from observer %s did not match any of the known patterns" %(fdir, f, obsid))
continue
# Schedule worker function from the service's pool. The result will be reported to the callback function.
if quick_test:
logger.debug("GPIO MONITOR GETS STARTED")
else:
logger.debug("OTHER SERVICE DATA GETS COLLECTED")
pool.apply_async(func=worker_f, args=tuple(worker_args), callback=callback_f)