Commit 64352161 authored by Reto Da Forno's avatar Reto Da Forno
Browse files

new config option <debugConf> added and some bugfixes in dispatcher and cleaner

parent ff603d27
......@@ -3,8 +3,8 @@
#CRONLOG=/dev/null
CRONLOG=/home/flocklab/logs/cron.log
* * * * * /home/flocklab/testmanagementserver/flocklab_scheduler.py --debug >> $CRONLOG 2>&1
0 * * * * /home/flocklab/testmanagementserver/flocklab_cleaner.py --debug >> $CRONLOG 2>&1
0 5 * * * /home/flocklab/testmanagementserver/flocklab_retention_cleaner.py --debug >> $CRONLOG 2>&1
0 0 * * * /usr/sbin/logrotate --state /home/flocklab/logrotate.state /home/flocklab/logrotate >> $CRONLOG 2>&1
0 2 * * 1 php /home/flocklab/webserver/update_stats.php >> $CRONLOG 2>&1
* * * * * /home/flocklab/testmanagementserver/flocklab_scheduler.py --debug >> $CRONLOG 2>&1
*/10 * * * * /home/flocklab/testmanagementserver/flocklab_cleaner.py --debug >> $CRONLOG 2>&1
0 5 * * * /home/flocklab/testmanagementserver/flocklab_retention_cleaner.py --debug >> $CRONLOG 2>&1
0 0 * * * /usr/sbin/logrotate --state /home/flocklab/logrotate.state /home/flocklab/logrotate >> $CRONLOG 2>&1
0 2 * * 1 php /home/flocklab/webserver/update_stats.php >> $CRONLOG 2>&1
......@@ -70,7 +70,6 @@ def main(argv):
except:
msg = "Could not connect to database"
flocklab.error_logandexit(msg, errno.EAGAIN)
#logger.debug("Connected to database")
# Check for running tests ---
testisrunning = flocklab.is_test_running(cur)
......@@ -86,7 +85,7 @@ def main(argv):
WHERE (`test_status` = 'todelete')
"""
if ( cur.execute(sql) <= 0 ):
logger.info("No tests found which are marked to be deleted.")
logger.debug("No tests found which are marked to be deleted.")
else:
rs = cur.fetchall()
for (testid, starttime) in rs:
......@@ -143,14 +142,15 @@ def main(argv):
keeptime = flocklab.config.getint('cleaner', 'keeptime_viz')
earliest_keeptime = time.time() - (keeptime*86400)
imgdir_path = flocklab.config.get('viz','imgdir')
if not os.path.isdir(imgdir_path):
os.mkdir(imgdir_path)
for f in os.listdir(imgdir_path):
path = os.path.join(imgdir_path, f)
if os.stat(path).st_mtime < earliest_keeptime:
logger.debug("Removing viz cache %s..."%path)
shutil.rmtree(path)
if os.path.isdir(imgdir_path):
for f in os.listdir(imgdir_path):
path = os.path.join(imgdir_path, f)
if os.stat(path).st_mtime < earliest_keeptime:
logger.debug("Removing viz cache %s..."%path)
shutil.rmtree(path)
else:
logger.debug("Directory '%s' does not exist." % imgdir_path)
# Get parameters ---
now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
maxtestcleanuptime = flocklab.config.getint('cleaner', 'max_test_cleanuptime')
......@@ -168,14 +168,16 @@ def main(argv):
logger.debug("Call process to stop test %d (status: %s)." % (testid, test[1]))
p = multiprocessing.Process(target=scheduler.test_startstopabort, args=(testid, True))
p.start()
else:
logger.debug("No tests found that need to be aborted.")
# Check for tests that are stuck ---
sql = """SELECT `serv_tests_key` FROM `tbl_serv_tests`
WHERE `test_status` IN ('preparing', 'aborting', 'syncing', 'synced')
AND TIMESTAMPDIFF(MINUTE, `time_end_wish`, NOW()) > %d
AND TIMESTAMPDIFF(MINUTE, `time_end_wish`, '%s') > %d
"""
if cur.execute(sql % (maxtestcleanuptime)) <= 0:
logger.info("No stuck tests found.")
if cur.execute(sql % (now, maxtestcleanuptime)) <= 0:
logger.debug("No stuck tests found.")
else:
rs = cur.fetchall()
testids = []
......@@ -186,7 +188,7 @@ def main(argv):
cur.execute(sql)
cn.commit()
msg = "Found %d stuck tests in the database (IDs: %s). Test status set to 'failed'." % (len(rs), ", ".join(testids))
logger.debug(msg)
logger.info(msg)
emails = flocklab.get_admin_emails(cur)
if emails != flocklab.FAILED:
flocklab.send_mail(subject="[FlockLab Cleaner]", message=msg, recipients=emails)
......@@ -203,29 +205,29 @@ def main(argv):
pid = int(line[0:6].strip())
command = line[6:106].strip()
runtime = line[106:].strip()
if "testid=" in command:
testid = int(command.split('testid=', 1)[1])
# check stop time of this test
sql = """SELECT `serv_tests_key` FROM `tbl_serv_tests`
WHERE `serv_tests_key`=%d AND TIMESTAMPDIFF(MINUTE, `time_end_wish`, NOW()) > %d
"""
if cur.execute(sql % (testid, maxtestcleanuptime)) > 0:
# thread is stuck -> add to kill list
pids.append(pid)
except:
logger.debug("Failed to parse output of 'ps'. Line was: '%s'" % line)
logger.warn("Failed to parse output of 'ps'. Line was: '%s'" % line)
break
if "testid=" in command:
testid = int(command.split('testid=', 1)[1].split()[0])
# check stop time of this test
sql = """SELECT `serv_tests_key` FROM `tbl_serv_tests`
WHERE `serv_tests_key`=%d AND TIMESTAMPDIFF(MINUTE, `time_end_wish`, '%s') > %d
"""
if cur.execute(sql % (testid, now, maxtestcleanuptime)) > 0:
# thread is stuck -> add to kill list
pids.append(pid)
if len(pids) > 0:
# kill the stuck threads
for pid in pids:
os.kill(pid, signal.SIGKILL)
msg = "%d stuck threads terminated (PIDs: %s)" % (len(pids), ", ".join(pids))
logger.debug(msg)
logger.info(msg)
emails = flocklab.get_admin_emails(cur)
if emails != flocklab.FAILED:
flocklab.send_mail(subject="[FlockLab Cleaner]", message=msg, recipients=emails)
else:
logger.info("No stuck threads found.")
logger.debug("No stuck threads found.")
except:
msg = "Encountered error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
......
This diff is collapsed.
......@@ -1012,7 +1012,7 @@ def main(argv):
# Start a worker process pool for every service:
for service, cpus in service_pools_dict.items():
if cpus != 1:
if cpus > 1:
# currently only 1 CPU / process can be used per task since processing functions are NOT thread safe!
logger.warning("%d is an invalid number of CPUs for service %s, using default value of 1." % (cpus, service))
cpus = 1
......
#! /usr/bin/env python3
import sys, os, getopt, errno, time, datetime, subprocess, MySQLdb, logging, __main__, traceback, types, calendar, multiprocessing
import sys, os, getopt, errno, time, datetime, subprocess, MySQLdb, logging, __main__, traceback, types, calendar, multiprocessing, signal
import lib.flocklab as flocklab
......@@ -24,8 +24,8 @@ class Error(Exception):
# Start/stop/abort a test
#
##############################################################################
def test_startstopabort(testid=None, mode='stop',delay=0):
if ((type(testid) != int) or (testid <= 0) or (mode not in ('start', 'stop', 'abort'))):
def test_startstopabort(testid=None, abort=False, delay=0):
if ((type(testid) != int) or (testid <= 0)):
return -1
# change status of test that the next scheduler will skip this test
......@@ -39,11 +39,12 @@ def test_startstopabort(testid=None, mode='stop',delay=0):
# wait for the actual start time of the test
time.sleep(delay)
logger.info("Found test ID %d which should be %sed." % (testid, mode))
# Add testid to logger name
logger.name += " (Test %d)"%testid
logger.name += " (Test %d)" % testid
# Call the dispatcher:
cmd = [flocklab.config.get("dispatcher", "dispatcherscript"), '--testid=%d' % testid, '--%s' % mode]
cmd = [flocklab.config.get("dispatcher", "dispatcherscript"), '--testid=%d' % testid]
if abort:
cmd.append("--abort")
# Make sure no other instance of the scheduler is running for the same task:
cmd2 = ['pgrep', '-o', '-f', ' '.join(cmd)]
p = subprocess.Popen(cmd2, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
......@@ -59,14 +60,13 @@ def test_startstopabort(testid=None, mode='stop',delay=0):
p.wait()
rs = p.returncode
if (rs != flocklab.SUCCESS):
logger.error("Dispatcher to %s test returned with error %d" % (mode, rs))
logger.debug("Command executed was: %s"%(str(cmd)))
logger.error("Dispatcher returned with error %d." % (rs))
logger.debug("Command executed was: %s" % (str(cmd)))
conn.close()
return errno.EFAULT
else:
logger.info("Test %d %s done." % (testid, mode))
conn.close()
return flocklab.SUCCESS
conn.close()
return flocklab.SUCCESS
### END test_startstopabort()
......@@ -163,8 +163,8 @@ def main(argv):
delay = int(calendar.timegm(time.strptime(str(test[1]), '%Y-%m-%d %H:%M:%S'))) - flocklab.config.getint("tests", "setuptime") - int(time.time())
if delay < 0:
delay = 0
logger.info("Call process to start test %s with delay %s"%(testid,delay))
p = multiprocessing.Process(target=test_startstopabort,args=(testid, 'start', delay))
logger.info("Call process to start test %s with delay %s" % (testid,delay))
p = multiprocessing.Process(target=test_startstopabort, args=(testid, False, delay))
p.start()
else:
logger.debug("No test is to be started within the next %s seconds" % (flocklab.config.get("tests", "setuptime")))
......@@ -195,39 +195,38 @@ def main(argv):
owner_email = rs[4]
msg = "The test with ID %d could not be started as planned because of the following errors:\n\n" % testid
msg += "\t * Scheduler missed start time of test (probably because the previous test took too long to stop). Try re-scheduling your test.\n"
flocklab.send_mail(subject="[FlockLab Scheduler] Missed test %d"%(testid), message=msg, recipients=owner_email)
flocklab.send_mail(subject="[FlockLab Scheduler] Missed test %d" % (testid), message=msg, recipients=owner_email)
else:
logger.error("Error %s returned when trying to get test owner information"%str(rs))
logger.debug("Updated test status of %d missed tests to 'failed' and informed users."%nmissed)
logger.error("Error %s returned when trying to get test owner information" % str(rs))
logger.debug("Updated test status of %d missed tests to 'failed' and informed users." % nmissed)
else:
logger.debug("No missed tests found.")
rs = errno.ENODATA
# Check if a test has to be stopped ---
# Check if there is a running test which is to be stopped:
# Check if a test needs to be aborted ---
sql = """SELECT `serv_tests_key`, `test_status`
FROM `tbl_serv_tests`
WHERE ((`test_status` = 'aborting')
OR ((`test_status` = 'running') AND (`time_end_wish` <= '%s')))
WHERE (`test_status` = 'aborting')
AND (`dispatched` = 0)
"""
status2mode = {'running':'stop', 'aborting':'abort'}
cur.execute(sql % (now))
# start process for each test which has to be stopped
cur.execute(sql)
rs = cur.fetchall()
if rs:
for test in rs:
testid = int(test[0])
logger.debug("Call process to stop test %d, status %s" % (testid, test[1]))
p = multiprocessing.Process(target=test_startstopabort,args=(testid, status2mode[test[1]]))
p.start()
dispatcher_pid = flocklab.get_dispatcher_pid(testid)
if dispatcher_pid != flocklab.FAILED:
logger.debug("Telling dispatcher with pid %d to abort test %d (status: %s)." % (dispatcher_pid, testid, test[1]))
os.kill(dispatcher_pid, signal.SIGTERM)
# Release Lock ---
flocklab.release_db_lock(cur, cn, 'scheduler')
cur.close()
cn.close()
sys.exit(flocklab.SUCCESS)
### END main()
if __name__ == "__main__":
try:
main(sys.argv[1:])
......
......@@ -947,37 +947,45 @@ def write_errorlog(cursor=None, conn=None, testid=0, obsid=0, message="", timest
### END write_errorlog()
##############################################################################
#
# error_logandexit - Logs an error (to log and email to admins) and exits the script
#
##############################################################################
def send_mail_to_admin(message):
if message is None:
return FAILED
logger = get_logger()
# Send email to admin:
try:
admin_emails = get_admin_emails()
if admin_emails == FAILED:
logger.error("Error when getting admin emails from database")
else:
send_mail(subject="[FlockLab %s]" % (scriptname.replace('.', '_').split('_')[1].capitalize()), message=message, recipients=admin_emails)
except:
logger.error("error_logandexit(): Failed to send email to admin.")
return FAILED
return SUCCESS
### END send_mail_to_admin()
##############################################################################
#
# error_logandexit - Logs an error (to log and email to admins) and exits the script
#
##############################################################################
def error_logandexit(message=None, exitcode=FAILED):
global logger, config
# Check the arguments:
if (type(message) != str) or (message == "") or (type(exitcode) != int):
return FAILED
# Log error - if available, use logger, otherwise get it first:
logger = get_logger()
if logger:
logger.error(message)
else:
log_fallback(message)
# Send email to admin:
try:
admin_emails = get_admin_emails()
if admin_emails == FAILED:
msg = "Error when getting admin emails from database"
if logger:
logger.error(msg)
else:
logger.error(msg)
raise Exception
send_mail(subject="[FlockLab %s]" % (scriptname.replace('.', '_').split('_')[1].capitalize()), message=message, recipients=admin_emails)
except:
if logger:
logger.error("error_logandexit(): Failed to send email to admin.")
else:
log_fallback("error_logandexit(): Failed to send email to admin.")
send_mail_to_admin(message)
# Exit program
if logger:
logger.debug("Exiting with error code %u." % exitcode)
......@@ -1048,15 +1056,16 @@ def get_admin_emails(cursor=None):
#
##############################################################################
def is_test_running(cursor=None):
if not cursor:
if not cursor or not config:
return None
try:
maxcleanuptime = config.getint('cleaner', 'max_test_cleanuptime')
now = time.strftime(config.get("database", "timeformat"), time.gmtime())
cursor.execute("""
SELECT COUNT(serv_tests_key) FROM tbl_serv_tests
WHERE test_status IN('preparing', 'running', 'aborting', 'cleaning up')
AND TIMESTAMPDIFF(MINUTE, time_end_wish, NOW()) <= %d
""" % (maxcleanuptime))
AND TIMESTAMPDIFF(MINUTE, time_end_wish, '%s') <= %d
""" % (now, maxcleanuptime))
rs = cursor.fetchone()
if rs[0] != 0:
return True
......
......@@ -1064,16 +1064,22 @@ function update_add_test($xml_config, &$errors, $existing_test_id = NULL, $abort
}
if ($valid) {
$testconfig = new SimpleXMLElement($xml_config);
// If no IP address is given for serial service, use the one from which the test was uploaded:
foreach($testconfig->serialConf as $sc) {
if (!isset($sc->remoteIp)) {
if (preg_match ('/((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])/' , $_SERVER['REMOTE_ADDR'])) // we do not support ipv6 on our backend server
// check if client IP is IPv4
if (preg_match('/((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])/', $_SERVER['REMOTE_ADDR'])) {
// If no IP address is given for serial service, use the one from which the test was uploaded:
foreach ($testconfig->serialConf as $sc) {
if (isset($sc->remoteIp) && trim($sc->remoteIp) == "") {
$sc->addChild('remoteIp', $_SERVER['REMOTE_ADDR']);
else {
array_push($errors, 'remoteIp: FlockLab does not support IPv6 addresses ('.$_SERVER['REMOTE_ADDR'].'). To use the <a href="https://www.flocklab.ethz.ch/wiki/wiki/Public/Man/Tutorials/Tutorial2#notes">Serial socket feature</a>, please provide an IPv4 address.');
}
}
}
// If no IP address is given for debug service, use the one from which the test was uploaded:
foreach ($testconfig->debugConf as $sc) {
if (isset($sc->remoteIp) && trim($sc->remoteIp) == "") {
$sc->addChild('remoteIp', $_SERVER['REMOTE_ADDR']);
}
}
} //else: array_push($errors, 'remoteIp: FlockLab does not support IPv6 addresses ('.$_SERVER['REMOTE_ADDR'].'). To use the <a href="https://www.flocklab.ethz.ch/wiki/wiki/Public/Man/Tutorials/Tutorial2#notes">Serial socket feature</a>, please provide an IPv4 address.');
// extract embedded images
$used_embeddedImages = Array();
$used_dbImages = Array();
......
......@@ -51,6 +51,8 @@
<xs:element name="powerProfilingConf" type="powerProfilingConfType"/>
<!-- Target image configuration. If only images from database are to be used, do not specify this element. -->
<xs:element name="imageConf" type="imageConfType"/>
<!-- Debug service configuration. If service is not to be used, do not specify this element. -->
<xs:element name="debugConf" type="debugConfType"/>
</xs:choice>
</xs:group>
......@@ -203,6 +205,18 @@
</xs:complexType>
<!-- Configuration of the debug service -->
<xs:complexType name="debugConfType">
<xs:sequence>
<xs:element name="obsIds" type="obsIdListRestType"/>
<xs:choice minOccurs="0" maxOccurs="2">
<xs:element name="gdbPort" type="portType" minOccurs="0" maxOccurs="1"/>
<xs:element name="remoteIp" type="ipType" minOccurs="0" maxOccurs="1"/>
</xs:choice>
</xs:sequence>
</xs:complexType>
<!-- Configuration of the GPIO tracing service -->
<xs:complexType name="gpioTracingConfType">
<xs:sequence>
......@@ -536,9 +550,16 @@
<!-- Type definition for IP addresses -->
<xs:simpleType name="ipType">
<xs:restriction base="xs:string">
<xs:pattern value="((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])"/>
<xs:pattern value="((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])\.){3}(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9][0-9]|[0-9])|(\s*)"/>
</xs:restriction>
</xs:simpleType>
<!-- Type definition for TCP port -->
<xs:simpleType name="portType">
<xs:restriction base="xs:integer">
<xs:minInclusive value="1025"/>
<xs:maxInclusive value="65535"/>
</xs:restriction>
</xs:simpleType>
</xs:schema>
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment