Commit 9095291b authored by Reto Da Forno's avatar Reto Da Forno
Browse files

support for new serial logger added

parent 2cbafaee
......@@ -158,12 +158,10 @@ def read_from_db_file(dbfile):
##############################################################################
#
# worker_convert_and_aggregate: Worker function for multiprocessing pools.
# Parses observer DB files for all services, converts values (if needed)
# and aggregates them into single test result files.
# worker_dbfiles: Parses observer DB files.
#
##############################################################################
def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, commitsize=1, parse_f=None, convert_f=None, logqueue=None):
def worker_dbfiles(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, commitsize=1, parse_f=None, convert_f=None, logqueue=None):
try:
_errors = []
cur_p = multiprocessing.current_process()
......@@ -207,7 +205,6 @@ def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=No
if (len(conv_values) > 0):
# There is still data left. Do a last commit
# Write data to file:
#logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for final writing..." % (resultfile_path)))
resultfile_lock.acquire()
f = open(resultfile_path, 'a')
f.writelines(conv_values)
......@@ -215,7 +212,6 @@ def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=No
resultfile_lock.release()
logqueue.put_nowait((loggername, logging.DEBUG, "Committed final results to %s after %d rows" % (resultfile_path, rows)))
# Remove processed file:
#logqueue.put_nowait((loggername, logging.DEBUG, "Remove %s" % (obsdbfile_path)))
os.unlink(obsdbfile_path)
except:
msg = "Error in worker process: %s: %s\n%s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
......@@ -225,7 +221,7 @@ def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=No
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
return (_errors, tuple(processeditem))
### END worker_convert_and_aggregate
### END worker_dbfiles
##############################################################################
......@@ -240,11 +236,11 @@ def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, logque
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir, f)
inputfilename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
with open(resultfile_path, "a") as outfile:
infile = open(obsdbfile_path, "r")
infile = open(inputfilename, "r")
for line in infile:
try:
(timestamp, ticks, pin, level) = line.strip().split(',', 3)
......@@ -253,7 +249,7 @@ def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, logque
logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
break
infile.close()
os.remove(obsdbfile_path)
os.remove(inputfilename)
except:
msg = "Error in gpiotracing worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
......@@ -277,7 +273,7 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir, f)
inputfilename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
if arg and arg == 'rld':
......@@ -285,7 +281,7 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue
# simply move the file into the results directory
try:
resfilename = "%s.%s.%s.rld" % (os.path.splitext(resultfile_path)[0], obsid, nodeid)
os.rename(obsdbfile_path, resfilename)
os.rename(inputfilename, resfilename)
except FileExistsError:
# TODO: properly handle case where file already exists (several rld files per observer)
msg = "File '%s' already exists, dropping test results." % (resfilename)
......@@ -293,7 +289,7 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue
logqueue.put_nowait((loggername, logging.ERROR, msg))
else:
# CSV file format
rld_data = RocketLoggerData(obsdbfile_path).merge_channels()
rld_data = RocketLoggerData(inputfilename).merge_channels()
# get network time and convert to UNIX timestamp (UTC)
timeidx = rld_data.get_time(absolute_time=True, time_reference='network') # TODO adjust parameters for RL 1.99+
timeidxunix = timeidx.astype('uint64') / 1e9 # convert to s
......@@ -304,7 +300,7 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue
rld_dataframe.insert(1, 'node_id', nodeid)
rld_dataframe.to_csv(resultfile_path, sep=',', index_label='time', header=False, mode='a')
os.remove(obsdbfile_path)
os.remove(inputfilename)
except:
msg = "Error in powerprof worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
......@@ -326,16 +322,16 @@ def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir, f)
inputfilename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
with open(resultfile_path, "a") as outfile:
infile = open(obsdbfile_path, "r")
infile = open(inputfilename, "r")
for line in infile:
(timestamp, msg) = line.strip().split(',', 1)
outfile.write("%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, msg))
infile.close()
os.remove(obsdbfile_path)
os.remove(inputfilename)
except:
msg = "Error in logs worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
......@@ -347,6 +343,41 @@ def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None
### END worker_logs()
##############################################################################
#
# worker_serial
#
##############################################################################
def worker_serial(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
try:
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
inputfilename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
with open(resultfile_path, "a") as outfile:
infile = open(inputfilename, "r")
for line in infile:
try:
(timestamp, msg) = line.strip().split(',', 1)
except:
continue
result = "%s,%s,%s,r,%s\n" % (timestamp, obsid, nodeid, msg.rstrip())
outfile.write(result)
infile.close()
os.remove(inputfilename)
except:
msg = "Error in serial worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
return (_errors, tuple(processeditem))
### END worker_serial()
##############################################################################
#
# worker_datatrace
......@@ -1076,13 +1107,11 @@ def main(argv):
#logger.debug(loggerprefix + "Next item is %s/%s (Obs%s)." % (fdir, f, str(obsid)))
nodeid = obsdict_byid[obsid][1]
callback_f = worker_callback
worker_f = worker_convert_and_aggregate
# Match the filename against the patterns and schedule an appropriate worker function:
if (re.search("^gpio_monitor_[0-9]{14}\.csv$", f) != None):
pool = service_pools_dict['gpiotracing']
worker_args = [nextitem, nodeid, testresultsfile_dict['gpiotracing'][0], logqueue, None]
worker_f = worker_gpiotracing
logger.debug(loggerprefix + "resultfile_path: %s" % str(testresultsfile_dict['gpiotracing'][0]))
elif (re.search("^powerprofiling_[0-9]{14}\.rld$", f) != None):
pool = service_pools_dict['powerprofiling']
worker_args = [nextitem, nodeid, testresultsfile_dict['powerprofiling'][0], logqueue, ppFileFormat]
......@@ -1090,6 +1119,11 @@ def main(argv):
elif (re.search("^serial_[0-9]{14}\.db$", f) != None):
pool = service_pools_dict['serial']
worker_args = [nextitem, nodeid, testresultsfile_dict['serial'][0], testresultsfile_dict['serial'][1], commitsize, parse_serial, convert_serial, logqueue]
worker_f = worker_dbfiles
elif (re.search("^serial_[0-9]{14}\.csv$", f) != None):
pool = service_pools_dict['serial']
worker_args = [nextitem, nodeid, testresultsfile_dict['serial'][0], logqueue, None]
worker_f = worker_serial
elif (re.search("^datatrace_[0-9]{14}\.csv$", f) != None):
pool = service_pools_dict['datatrace']
worker_args = [nextitem, nodeid, testresultsfile_dict['datatrace'][0], logqueue, None]
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment