Commit ff603d27 authored by Reto Da Forno's avatar Reto Da Forno
Browse files

cleaner updated to better handle stuck tests

parent 4341a243
......@@ -59,6 +59,7 @@ include_xmlconfig = 1
[cleaner]
max_instances = 1 ;Maximum of concurrently running instances of the cleaner
keeptime_viz = 10 ;Time in days to keep viz data
max_test_cleanuptime = 20 ;Maximum time for a test cleanup in minutes
; Config for the dispatcher
[dispatcher]
......
#! /usr/bin/env python3
import sys, os, getopt, errno, traceback, logging, time, __main__, shutil, glob, datetime, subprocess
import sys, os, getopt, errno, traceback, logging, time, __main__, shutil, glob, datetime, subprocess, signal
import lib.flocklab as flocklab
logger = None
import flocklab_scheduler as scheduler
##############################################################################
......@@ -27,9 +25,6 @@ def usage():
##############################################################################
def main(argv):
### Global Variables ###
global logger
# Get logger:
logger = flocklab.get_logger()
......@@ -96,11 +91,11 @@ def main(argv):
rs = cur.fetchall()
for (testid, starttime) in rs:
testid = str(testid)
logger.debug("Found test ID %s to delete."%testid)
logger.debug("Found test ID %s to delete." % testid)
# If a test is to be deleted which has not run yet, delete it completely. Otherwise, keep the metadata of the test for statistics:
if (starttime > datetime.datetime.today()):
delete_all = True
logger.debug("Test ID %s did not run yet, thus all data (including the test metadata) will be deleted."%testid)
logger.debug("Test ID %s did not run yet, thus all data (including the test metadata) will be deleted." % testid)
else:
delete_all = False
# Clean through all relevant tables ---
......@@ -108,17 +103,17 @@ def main(argv):
if delete_all:
relevant_tables.append('tbl_serv_map_test_observer_targetimages')
for table in relevant_tables:
sql = """ DELETE FROM %s
WHERE (`test_fk` = %s)
"""
sql = """DELETE FROM %s
WHERE (`test_fk` = %s)
"""
starttime = time.time()
num_deleted_rows = cur.execute(sql%(table, testid))
cn.commit()
logger.debug("Deleted %i rows of data in table %s for test ID %s in %f seconds" %(num_deleted_rows, table, testid, (time.time()-starttime)))
logger.debug("Deleted %i rows of data in table %s for test ID %s in %f seconds" % (num_deleted_rows, table, testid, (time.time()-starttime)))
# Delete cached test results ---
archive_path = "%s/%s%s"%(flocklab.config.get('archiver','archive_dir'), testid, flocklab.config.get('archiver','archive_ext'))
viz_pathes = glob.glob("%s/%s_*"%(flocklab.config.get('viz','imgdir'), testid))
archive_path = "%s/%s%s" % (flocklab.config.get('archiver','archive_dir'), testid, flocklab.config.get('archiver','archive_ext'))
viz_pathes = glob.glob("%s/%s_*" % (flocklab.config.get('viz','imgdir'), testid))
pathes = [archive_path]
pathes.extend(viz_pathes)
for path in pathes:
......@@ -156,12 +151,30 @@ def main(argv):
logger.debug("Removing viz cache %s..."%path)
shutil.rmtree(path)
# Check for tests that are stuck for 60 minutes ---
# Get parameters ---
now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
maxtestcleanuptime = flocklab.config.getint('cleaner', 'max_test_cleanuptime')
# Check for tests that are still running, but should have been stopped ---
sql = """SELECT `serv_tests_key`, `test_status` FROM `tbl_serv_tests`
WHERE (`test_status` = 'running') AND (`time_end_wish` <= '%s') AND (`dispatched` = 0)
"""
cur.execute(sql % (now))
rs = cur.fetchall()
if rs:
# start process for each test which has to be stopped
for test in rs:
testid = int(test[0])
logger.debug("Call process to stop test %d (status: %s)." % (testid, test[1]))
p = multiprocessing.Process(target=scheduler.test_startstopabort, args=(testid, True))
p.start()
# Check for tests that are stuck ---
sql = """SELECT `serv_tests_key` FROM `tbl_serv_tests`
WHERE `test_status` IN ('preparing', 'aborting', 'syncing', 'synced')
AND TIMESTAMPDIFF(MINUTE, `time_end_wish`, NOW()) > 60
AND TIMESTAMPDIFF(MINUTE, `time_end_wish`, NOW()) > %d
"""
if cur.execute(sql) <= 0:
if cur.execute(sql % (maxtestcleanuptime)) <= 0:
logger.info("No stuck tests found.")
else:
rs = cur.fetchall()
......@@ -170,7 +183,6 @@ def main(argv):
testids.append(str(testid[0]))
# set test status to failed
sql = "UPDATE `tbl_serv_tests` SET `test_status`='failed' WHERE `serv_tests_key` IN (%s)" % (", ".join(testids))
logger.debug("SQL query: %s" % sql)
cur.execute(sql)
cn.commit()
msg = "Found %d stuck tests in the database (IDs: %s). Test status set to 'failed'." % (len(rs), ", ".join(testids))
......@@ -179,7 +191,7 @@ def main(argv):
if emails != flocklab.FAILED:
flocklab.send_mail(subject="[FlockLab Cleaner]", message=msg, recipients=emails)
# Check for stuck threads that have been running for more than 1 day
# Check for stuck threads
cmd = ["ps", "-U", "flocklab", "-o", "pid:5=,cmd:100=,etime="]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
out, err = p.communicate()
......@@ -191,8 +203,15 @@ def main(argv):
pid = int(line[0:6].strip())
command = line[6:106].strip()
runtime = line[106:].strip()
if ("flocklab_fetcher" in command) and ("-" in runtime):
pids.append(pid)
if "testid=" in command:
testid = int(command.split('testid=', 1)[1])
# check stop time of this test
sql = """SELECT `serv_tests_key` FROM `tbl_serv_tests`
WHERE `serv_tests_key`=%d AND TIMESTAMPDIFF(MINUTE, `time_end_wish`, NOW()) > %d
"""
if cur.execute(sql % (testid, maxtestcleanuptime)) > 0:
# thread is stuck -> add to kill list
pids.append(pid)
except:
logger.debug("Failed to parse output of 'ps'. Line was: '%s'" % line)
break
......@@ -200,7 +219,7 @@ def main(argv):
# kill the stuck threads
for pid in pids:
os.kill(pid, signal.SIGKILL)
msg = "%d stuck threads terminated (PIDs: %s" % (len(pids), ", ".join(pids))
msg = "%d stuck threads terminated (PIDs: %s)" % (len(pids), ", ".join(pids))
logger.debug(msg)
emails = flocklab.get_admin_emails(cur)
if emails != flocklab.FAILED:
......@@ -209,7 +228,7 @@ def main(argv):
logger.info("No stuck threads found.")
except:
msg = "Encountered error: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
msg = "Encountered error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
logger.error(msg)
emails = flocklab.get_admin_emails(cur)
msg = "%s on server %s encountered error:\n\n%s" % (__file__, os.uname()[1], msg)
......@@ -218,7 +237,6 @@ def main(argv):
cur.close()
cn.close()
#logger.debug("Finished. Exit program.")
sys.exit(flocklab.SUCCESS)
### END main()
......
......@@ -440,11 +440,33 @@ def get_fetcher_pid(testid):
return FAILED
except:
logger = get_logger()
logger.error("%s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
logger.error("%s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
return FAILED
### END get_fetcher_pid()
##############################################################################
#
# get_dispatcher_pid - Returns the process ID of the dispatcher for a test.
#
##############################################################################
def get_dispatcher_pid(testid):
try:
searchterm = "flocklab_dispatcher.py (.)*-(-)?t(estid=)?%d" % (testid)
cmd = ['pgrep', '-o', '-f', searchterm]
p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
out, err = p.communicate()
if (p.returncode == 0):
return int(out)
else:
return FAILED
except:
logger = get_logger()
logger.error("%s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
return FAILED
### END get_dispatcher_pid()
##############################################################################
#
# get_test_owner - Get information about the owner of a test
......@@ -1026,18 +1048,15 @@ def get_admin_emails(cursor=None):
#
##############################################################################
def is_test_running(cursor=None):
"""Arguments:
cursor: cursor of the database connection to be used for the query
Return value:
True if a test is running
False if no test is running
None otherwise
"""
if not cursor:
return None
try:
cursor.execute("SELECT COUNT(serv_tests_key) FROM tbl_serv_tests WHERE test_status IN('preparing', 'running', 'aborting', 'cleaning up');")
maxcleanuptime = config.getint('cleaner', 'max_test_cleanuptime')
cursor.execute("""
SELECT COUNT(serv_tests_key) FROM tbl_serv_tests
WHERE test_status IN('preparing', 'running', 'aborting', 'cleaning up')
AND TIMESTAMPDIFF(MINUTE, time_end_wish, NOW()) <= %d
""" % (maxcleanuptime))
rs = cursor.fetchone()
if rs[0] != 0:
return True
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment