Commit 739b210b authored by Reto Da Forno's avatar Reto Da Forno
Browse files

collection of timesync log added to fetcher

parent 6b38e9cd
......@@ -130,7 +130,7 @@ def parse_serial(buf):
##############################################################################
def convert_serial(obsdata, observer_id, node_id):
try:
result = "%s,%s,%s,%s,%s\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1].decode('utf8').strip())
result = "%s,%s,%s,%s,%s\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1].decode('utf8').rstrip())
except UnicodeDecodeError:
result = "%s,%s,%s,%s,%s\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]], str(obsdata[1]))
return result
......@@ -260,8 +260,8 @@ def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, vizimg
infile = open(obsdbfile_path, "r")
for line in infile:
try:
(timestamp, pin, level) = line.split(',')
outfile.write("%s,%s,%s,%s,%s" % (timestamp, obsid, nodeid, pin, level))
(timestamp, pin, level) = line.strip().split(',', 2)
outfile.write("%s,%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, pin, level))
except ValueError:
logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
break
......@@ -317,10 +317,10 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, vizimgdi
##############################################################################
#
# worker_errorlog
# worker_logs
#
##############################################################################
def worker_errorlog(queueitem=None, nodeid=None, resultfile_path=None, vizimgdir=None, viz_f=None, logqueue=None):
def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, vizimgdir=None, viz_f=None, logqueue=None):
try:
_errors = []
cur_p = multiprocessing.current_process()
......@@ -331,19 +331,19 @@ def worker_errorlog(queueitem=None, nodeid=None, resultfile_path=None, vizimgdir
with open(resultfile_path, "a") as outfile:
infile = open(obsdbfile_path, "r")
for line in infile:
(timestamp, msg) = line.split(',')
outfile.write("%s,%s,%s,%s" % (timestamp, obsid, nodeid, msg))
(timestamp, msg) = line.strip().split(',', 1)
outfile.write("%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, msg))
infile.close()
os.remove(obsdbfile_path)
except:
msg = "Error in errorlog worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
msg = "Error in logs worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
return (_errors, tuple(processeditem))
### END worker_errorlog()
### END worker_logs()
##############################################################################
......@@ -448,7 +448,7 @@ class FetchObsThread(threading.Thread):
rs = p.returncode
if (rs == flocklab.SUCCESS):
services = {}
for servicename in [ "gpio_monitor", "powerprofiling", "serial", "error" ]:
for servicename in [ "gpio_monitor", "powerprofiling", "serial", "error", "timesync" ]:
services[servicename] = ServiceInfo(servicename)
# Read filenames
for dbfile in out.split():
......@@ -917,13 +917,13 @@ def main(argv):
os.makedirs(testresultsdir)
logger.debug("Created %s" % testresultsdir)
manager = multiprocessing.Manager()
for service in ('errorlog', 'gpiotracing', 'powerprofiling', 'serial'):
for service in ('errorlog', 'gpiotracing', 'powerprofiling', 'serial', 'timesynclog'):
path = "%s/%s.csv" % (testresultsdir, service)
lock = manager.Lock()
testresultsfile_dict[service] = (path, lock)
# Create file and write header:
if service == 'errorlog':
header = 'timestamp,observer_id,node_id,errormessage\n'
if service in ('errorlog', 'timesynclog'):
header = 'timestamp,observer_id,node_id,message\n'
elif service == 'gpiotracing':
header = 'timestamp,observer_id,node_id,pin_name,value\n'
elif service == 'powerprofiling':
......@@ -946,12 +946,12 @@ def main(argv):
# Determine the number of CPU's to be used for each aggregating process. If a service is not used, its CPUs are assigned to other services
cpus_free = 0
cpus_errorlog = flocklab.config.getint('fetcher', 'cpus_errorlog')
cpus_logs = flocklab.config.getint('fetcher', 'cpus_errorlog')
# CPUs for serial service:
if servicesUsed_dict['serial'] == True:
cpus_serial = flocklab.config.getint('fetcher', 'cpus_serial')
cpus_serial = flocklab.config.getint('fetcher', 'cpus_serial')
else:
cpus_serial = 0
cpus_serial = 0
cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_serial')
# CPUs for GPIO tracing:
if servicesUsed_dict['gpiotracing'] == True:
......@@ -981,9 +981,9 @@ def main(argv):
# Neither of the services is used, so give it to one of the other services:
if cpus_serial > 0:
cpus_serial = cpus_serial + cpus_free
cpus_total = cpus_errorlog + cpus_serial + cpus_gpiomonitoring + cpus_powerprofiling
cpus_total = cpus_logs + cpus_serial + cpus_gpiomonitoring + cpus_powerprofiling
service_pools_dict = { 'errorlog': cpus_errorlog, 'serial': cpus_serial, 'gpiotracing': cpus_gpiomonitoring, 'powerprofiling': cpus_powerprofiling }
service_pools_dict = { 'logs': cpus_logs, 'serial': cpus_serial, 'gpiotracing': cpus_gpiomonitoring, 'powerprofiling': cpus_powerprofiling }
if (cpus_total > multiprocessing.cpu_count()):
logger.warn("Number of requested CPUs for all aggregating processes (%d) is higher than number of available CPUs (%d) on system." % (cpus_total, multiprocessing.cpu_count()))
......@@ -1057,10 +1057,13 @@ def main(argv):
pool = service_pools_dict['serial']
worker_args = [nextitem, nodeid, testresultsfile_dict['serial'][0], testresultsfile_dict['serial'][1], commitsize, vizimgdir, parse_serial, convert_serial, None, logqueue]
elif (re.search("^error_[0-9]{14}\.log$", f) != None):
logger.debug(loggerprefix + "File %s contains error logs" % f)
pool = service_pools_dict['errorlog']
worker_args = [nextitem, nodeid, testresultsfile_dict['errorlog'][0], vizimgdir, None, logqueue]
worker_f = worker_errorlog
pool = service_pools_dict['logs']
worker_args = [nextitem, nodeid, testresultsfile_dict['errorlog'][0], vizimgdir, None, logqueue]
worker_f = worker_logs
elif (re.search("^timesync_[0-9]{14}\.log$", f) != None):
pool = service_pools_dict['logs']
worker_args = [nextitem, nodeid, testresultsfile_dict['timesynclog'][0], vizimgdir, None, logqueue]
worker_f = worker_logs
else:
logger.warn(loggerprefix + "Results file %s/%s from observer %s did not match any of the known patterns" % (fdir, f, obsid))
continue
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment