To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit cbdd6e76 authored by Reto Da Forno's avatar Reto Da Forno
Browse files

file locks added

parent 32ddaada
......@@ -1333,7 +1333,12 @@ def main(argv):
else:
action = "stop"
starttime = time.time()
errors, warnings = stop_test(testid, cur, cn, obsdict_key, obsdict_id, abort)
try:
errors, warnings = stop_test(testid, cur, cn, obsdict_key, obsdict_id, abort)
except MySQLdb._exceptions.OperationalError:
# if stop test takes longer than usual, the DB connection might get dropped by the server -> reconnect
(cn, cur) = flocklab.connect_to_db()
# Record time needed to set up test for statistics in DB:
time_needed = time.time() - starttime
sql = """ UPDATE `tbl_serv_tests`
......
......@@ -195,12 +195,38 @@ def read_from_db_file(dbfile):
#
##############################################################################
def write_to_error_log(timestamp, obsid, nodeid, message):
# TODO make this more efficient and thread-safe
with open(testresultsfile_dict['errorlog'][0], "a") as errorlog:
try:
testresultsfile_dict['errorlog'][1].acquire()
errorlog = open(testresultsfile_dict['errorlog'][0], "a")
errorlog.write("%s,%d,%d,%s\n" % (str(timestamp), obsid, nodeid, message))
errorlog.close()
testresultsfile_dict['errorlog'][1].release()
except Exception:
testresultsfile_dict['errorlog'][1].release()
raise
### END write_to_error_log
##############################################################################
#
# append_lines_to_file: append lines to results file
#
##############################################################################
def append_lines_to_file(filename=None, filelock=None, lines=None):
if not filename or not filelock or not lines:
return
try:
filelock.acquire()
f = open(filename, "a")
f.writelines(lines)
f.close()
filelock.release()
except Exception:
filelock.release()
raise
### END append_lines_to_file()
##############################################################################
#
# worker_dbfiles: Parses observer DB files.
......@@ -276,28 +302,28 @@ def worker_dbfiles(queueitem=None, nodeid=None, resultfile_path=None, resultfile
# whole observer DB files.
#
##############################################################################
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, logqueue=None, arg=None):
try:
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
inputfilename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
with open(resultfile_path, "a") as outfile:
infile = open(inputfilename, "r")
for line in infile:
try:
values = line.strip().split(',')
if len(values) > 3:
# monotonic time is included -> append at the end
outfile.write("%s,%d,%s,%s,%s,%s\n" % (values[0], obsid, nodeid, values[2], values[3], values[1]))
else:
outfile.write("%s,%d,%s,%s,%s\n" % (values[0], obsid, nodeid, values[1], values[2]))
except ValueError:
logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
break
infile.close()
result = []
infile = open(inputfilename, "r")
for line in infile:
try:
values = line.strip().split(',')
if len(values) > 3:
# monotonic time is included -> append at the end
result.append("%s,%d,%s,%s,%s,%s\n" % (values[0], obsid, nodeid, values[2], values[3], values[1]))
else:
result.append("%s,%d,%s,%s,%s\n" % (values[0], obsid, nodeid, values[1], values[2]))
except ValueError:
logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
break
infile.close()
append_lines_to_file(resultfile_path, resultfile_lock, result)
os.remove(inputfilename)
except:
msg = "Error in gpiotracing worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
......@@ -317,7 +343,7 @@ def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, logque
# whole observer DB files.
#
##############################################################################
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, logqueue=None, arg=None):
try:
_errors = []
cur_p = multiprocessing.current_process()
......@@ -347,8 +373,9 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue
rld_dataframe = pd.DataFrame(np.hstack((current_ch, voltage_ch)), index=timeidxunix, columns=['I', 'V'])
rld_dataframe.insert(0, 'observer_id', obsid)
rld_dataframe.insert(1, 'node_id', nodeid)
resultfile_lock.acquire()
rld_dataframe.to_csv(resultfile_path, sep=',', index_label='time', header=False, mode='a')
resultfile_lock.release()
os.remove(inputfilename)
except:
msg = "Error in powerprof worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
......@@ -366,20 +393,20 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue
# worker_logs
#
##############################################################################
def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, logqueue=None, arg=None):
try:
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
inputfilename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
with open(resultfile_path, "a") as outfile:
infile = open(inputfilename, "r")
for line in infile:
(timestamp, msg) = line.strip().split(',', 1)
outfile.write("%s,%d,%s,%s\n" % (timestamp, obsid, nodeid, msg))
infile.close()
result = []
infile = open(inputfilename, "r")
for line in infile:
(timestamp, msg) = line.strip().split(',', 1)
result.append("%s,%d,%s,%s\n" % (timestamp, obsid, nodeid, msg))
infile.close()
append_lines_to_file(resultfile_path, resultfile_lock, result)
os.remove(inputfilename)
except:
msg = "Error in logs worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
......@@ -397,31 +424,30 @@ def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None
# worker_serial
#
##############################################################################
def worker_serial(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
def worker_serial(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, logqueue=None, arg=None):
try:
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
inputfilename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
with open(resultfile_path, "a") as outfile:
infile = open(inputfilename, "r")
# read line by line and check for decode errors
while True:
try:
line = infile.readline()
except UnicodeDecodeError:
continue # ignore invalid lines
if not line:
break
try:
(timestamp, msg) = line.strip().split(',', 1)
except:
continue
result = "%s,%d,%s,r,%s\n" % (timestamp, obsid, nodeid, msg.rstrip())
outfile.write(result)
infile.close()
result = []
infile = open(inputfilename, "r")
# read line by line and check for decode errors
while True:
try:
line = infile.readline()
except UnicodeDecodeError:
continue # ignore invalid lines
if not line:
break
try:
(timestamp, msg) = line.strip().split(',', 1)
except:
continue
result.append("%s,%d,%s,r,%s\n" % (timestamp, obsid, nodeid, msg.rstrip()))
infile.close()
append_lines_to_file(resultfile_path, resultfile_lock, result)
os.remove(inputfilename)
except:
msg = "Error in serial worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
......@@ -439,7 +465,7 @@ def worker_serial(queueitem=None, nodeid=None, resultfile_path=None, logqueue=No
# worker_datatrace
#
##############################################################################
def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, logqueue=None, arg=None):
try:
_errors = []
cur_p = multiprocessing.current_process()
......@@ -465,6 +491,7 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue
# convert comparator ID to variable name
dfData['varname'] = dfData.comparator.apply(lambda x: (varnames[x] if x < len(varnames) else str(x)))
# append datatrace elements from obsever to datatrace log file
resultfile_lock.acquire()
with open(resultfile_path, "a") as outfile:
dfData.to_csv(
outfile,
......@@ -472,6 +499,7 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue
index=False,
header=False
)
resultfile_lock.release()
# append overflow events to errorlog
for idx, row in dfOverflow.iterrows():
write_to_error_log(row['global_ts_uncorrected'], obsid, nodeid, 'Datatrace: event rate too high (overflow occurred)!')
......@@ -1209,11 +1237,11 @@ def main(argv):
# Match the filename against the patterns and schedule an appropriate worker function:
if (re.search("^gpio_monitor_[0-9]{14}\.csv$", f) != None):
pool = service_pools_dict['gpiotracing']
worker_args = [nextitem, nodeid, testresultsfile_dict['gpiotracing'][0], logqueue, None]
worker_args = [nextitem, nodeid, testresultsfile_dict['gpiotracing'][0], testresultsfile_dict['gpiotracing'][1], logqueue, None]
worker_f = worker_gpiotracing
elif (re.search("^powerprofiling_[0-9]{14}\.rld$", f) != None):
pool = service_pools_dict['powerprofiling']
worker_args = [nextitem, nodeid, testresultsfile_dict['powerprofiling'][0], logqueue, ppFileFormat]
worker_args = [nextitem, nodeid, testresultsfile_dict['powerprofiling'][0], testresultsfile_dict['powerprofiling'][1], logqueue, ppFileFormat]
worker_f = worker_powerprof
elif (re.search("^serial_[0-9]{14}\.db$", f) != None):
pool = service_pools_dict['serial']
......@@ -1221,19 +1249,19 @@ def main(argv):
worker_f = worker_dbfiles
elif (re.search("^serial_[0-9]{14}\.csv$", f) != None):
pool = service_pools_dict['serial']
worker_args = [nextitem, nodeid, testresultsfile_dict['serial'][0], logqueue, None]
worker_args = [nextitem, nodeid, testresultsfile_dict['serial'][0], testresultsfile_dict['serial'][1], logqueue, None]
worker_f = worker_serial
elif (re.search("^datatrace_[0-9]{14}\.log$", f) != None):
pool = service_pools_dict['datatrace']
worker_args = [nextitem, nodeid, testresultsfile_dict['datatrace'][0], logqueue, None]
worker_args = [nextitem, nodeid, testresultsfile_dict['datatrace'][0], testresultsfile_dict['datatrace'][1], logqueue, None]
worker_f = worker_datatrace
elif (re.search("^error_[0-9]{14}\.log$", f) != None):
pool = service_pools_dict['logs']
worker_args = [nextitem, nodeid, testresultsfile_dict['errorlog'][0], logqueue, None]
worker_args = [nextitem, nodeid, testresultsfile_dict['errorlog'][0], testresultsfile_dict['errorlog'][1], logqueue, None]
worker_f = worker_logs
elif (re.search("^timesync_[0-9]{14}\.log$", f) != None):
pool = service_pools_dict['logs']
worker_args = [nextitem, nodeid, testresultsfile_dict['timesynclog'][0], logqueue, None]
worker_args = [nextitem, nodeid, testresultsfile_dict['timesynclog'][0], testresultsfile_dict['timesynclog'][1], logqueue, None]
worker_f = worker_logs
else:
logger.warning(loggerprefix + "Results file %s/%s from observer %s did not match any of the known patterns" % (fdir, f, obsid))
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment