Commit 00d04653 authored by Reto Da Forno's avatar Reto Da Forno
Browse files

datatrace parsing adjusted

parent d18855ec
......@@ -513,15 +513,15 @@ def start_test(testid, cur, cn, obsdict_key, obsdict_id):
obskey = int(float(obsids[0]))
if obskey in symtable:
if var in symtable[obskey]:
logger.debug("Variable %s replaced with address 0x%x." % (var, symtable[obskey][var][0]))
var = "0x%x" % symtable[obskey][var][0]
logger.debug("Variable %s replaced by address 0x%x." % (var, symtable[obskey][var][0]))
varaddr = "0x%x" % symtable[obskey][var][0]
else:
logger.warning("Variable %s not found in symbol table." % var)
continue
else:
else:
logger.debug("Key %u not found in symbol table." % (obskey))
mode = dwtconf.xpath('d:mode', namespaces=ns)[0].text.strip()
xmlblock += "\t<dataTraceConf>\n\t\t<variable>%s</variable>\n\t\t<mode>%s</mode>\n\t</dataTraceConf>\n" % (var, mode)
xmlblock += "\t<dataTraceConf>\n\t\t<variable>%s</variable>\n\t\t<varName>%s</varName>\n\t\t<mode>%s</mode>\n\t</dataTraceConf>\n" % (varaddr, var, mode)
xmlblock += "</obsDebugConf>\n\n"
for obsid in obsids:
obsid = int(obsid)
......
......@@ -199,10 +199,10 @@ def worker_dbfiles(queueitem=None, nodeid=None, resultfile_path=None, resultfile
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir,f)
input_filename = "%s/%s" % (fdir,f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
# Open file:
dbfile = open(obsdbfile_path, 'rb')
dbfile = open(input_filename, 'rb')
rows = 0
conv_values = []
while not dbfile.closed:
......@@ -232,7 +232,7 @@ def worker_dbfiles(queueitem=None, nodeid=None, resultfile_path=None, resultfile
# logqueue.put_nowait((loggername, logging.DEBUG, "DbFileEof has occurred."))
break # dbfile has been closed in parser (most likely because EOF was reached)
except DbFileReadError as err:
msg = "%s: Packet size (%i) did not match payload size (%i) @ %d." %(obsdbfile_path, err.expectedSize, err.actualSize, err.fpos)
msg = "%s: Packet size (%i) did not match payload size (%i) @ %d." %(input_filename, err.expectedSize, err.actualSize, err.fpos)
_errors.append((msg, errno.EIO, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
if (len(conv_values) > 0):
......@@ -245,7 +245,7 @@ def worker_dbfiles(queueitem=None, nodeid=None, resultfile_path=None, resultfile
resultfile_lock.release()
logqueue.put_nowait((loggername, logging.DEBUG, "Committed final results to %s after %d rows" % (resultfile_path, rows)))
# Remove processed file:
os.unlink(obsdbfile_path)
os.unlink(input_filename)
except:
msg = "Error in worker process: %s: %s\n%s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
_errors.append((msg, errno.ECOMM, obsid))
......@@ -428,30 +428,41 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir, f)
input_filename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
# parse the file
parsed_filename = "%s/datatrace_uncorrected.csv" % (os.path.dirname(resultfile_path))
#(fd, tmpfile1) = tempfile.mkstemp()
(fd, tmpfile2) = tempfile.mkstemp()
#parser_output = os.fdopen(fd, 'w')
dwt.parse_dwt_output(obsdbfile_path, parsed_filename) # tmpfile1)
# first line of the log file contains the variable names
varnames = ""
with open(input_filename, "r") as f:
varnames = f.readline().strip().split()
(fd1, tmpfile1) = tempfile.mkstemp()
(fd2, tmpfile2) = tempfile.mkstemp()
dwt.parse_dwt_output(input_filename, tmpfile1)
# apply linear regression to correct the timestamps
dwt.correct_ts_with_regression(parsed_filename, tmpfile2) # (tmpfile1
dwt.correct_ts_with_regression(tmpfile1, tmpfile2)
with open(resultfile_path, "a") as outfile:
infile = open(tmpfile2, "r")
for line in infile:
#(timestamp, msg) = line.strip().split(',', 1)
#outfile.write("%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, msg))
outfile.write(line)
# input format: global_ts, comparator, data, PC, operation, local_ts
(timestamp, var, val, pc, access, localts) = line.strip().split(',')
if access == 'operation' or access == '' or var == '':
continue
if flocklab.parse_int(var) < len(varnames):
var = varnames[flocklab.parse_int(var)]
if pc:
pc = "0x%08x" % flocklab.parse_int(pc)
# output format: timestamp,observer_id,node_id,variable,value,access,pc
outfile.write("%s,%s,%s,%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, var, val, access, pc))
infile.close()
# delete files
os.remove(obsdbfile_path)
#os.remove(tmpfile1)
os.remove(input_filename)
os.remove(tmpfile1)
os.remove(tmpfile2)
except:
msg = "Error in datatrace worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
# for some reason, the logging below does not work properly -> print msg into the log directly
logger = flocklab.get_logger()
logger.error(msg)
_errors.append((msg, errno.ECOMM, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
......@@ -566,18 +577,18 @@ class FetchObsThread(threading.Thread):
for servicename in [ "gpio_monitor", "powerprofiling", "serial", "error", "timesync", "datatrace" ]:
services[servicename] = ServiceInfo(servicename)
# Read filenames
for dbfile in out.split():
for resfile in out.split():
# Check name and append to corresponding list
for service in services.values():
if service.matchFileName(dbfile):
service.addFile("%s/%s" % (self._obstestresfolder, dbfile))
if service.matchFileName(resfile):
service.addFile("%s/%s" % (self._obstestresfolder, resfile))
break
copyfilelist = []
# Remove latest from each list as the observer might still be writing into it (unless stop event has been set).
for service in services.values():
service.stripFileList(removelast)
for dbfile in service.files:
copyfilelist.append(dbfile)
for resfile in service.files:
copyfilelist.append(resfile)
#if (len(service.files) > 0):
# self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (" ".join(service.files), service.servicename))
......@@ -1049,7 +1060,7 @@ def main(argv):
elif service == 'serial':
header = 'timestamp,observer_id,node_id,direction,output\n'
elif service == 'datatrace':
header = "" # TODO 'timestamp,observer_id,node_id,variable,value,access,pc\n'
header = 'timestamp,observer_id,node_id,variable,value,access,pc\n'
lock.acquire()
f = open(path, 'w')
f.write(header)
......@@ -1177,7 +1188,7 @@ def main(argv):
pool = service_pools_dict['serial']
worker_args = [nextitem, nodeid, testresultsfile_dict['serial'][0], logqueue, None]
worker_f = worker_serial
elif (re.search("^datatrace_[0-9]{14}\.csv$", f) != None):
elif (re.search("^datatrace_[0-9]{14}\.log$", f) != None):
pool = service_pools_dict['datatrace']
worker_args = [nextitem, nodeid, testresultsfile_dict['datatrace'][0], logqueue, None]
worker_f = worker_datatrace
......
......@@ -208,7 +208,7 @@ def parser(swo_byte, swo_queue, global_ts_queue):
# sync packet, problem: in begin we have don't have 5 zero bytes as specified for a sync pack but there are 9
# Just read all zeros until get an 0x80, then we are in sync.
if swo_byte == 0: # only happens at beginning so no check for stop_threads
while swo_byte == 0:
while swo_byte == 0 and swo_queue:
swo_byte = swo_queue.pop()
# according to documentation it should be 0x80 but I observe the stream to start with 0x08 in certain cases
if swo_byte == 0x08:
......@@ -273,33 +273,33 @@ def pars_hard(header_swo_byte, swo_queue):
df_append.at['comp0', 'comparator'] = 0
df_append.at['comp0', 'data'] = value
if header_swo_byte & 0x04:
df_append.at['comp0', 'operation'] = 1 # "1" stands for write
df_append.at['comp0', 'operation'] = 'w'
else:
df_append.at['comp0', 'operation'] = 0 # "0" stands for read
df_append.at['comp0', 'operation'] = 'r'
elif comparator_id == 1:
df_append.at['comp1', 'comparator'] = 1
df_append.at['comp1', 'data'] = value
if header_swo_byte & 0x04:
df_append.at['comp1', 'operation'] = 1 # "1" stands for write
df_append.at['comp1', 'operation'] = 'w'
else:
df_append.at['comp1', 'operation'] = 0 # "0" stands for read
df_append.at['comp1', 'operation'] = 'r'
elif comparator_id == 2:
df_append.at['comp2', 'comparator'] = 2
df_append.at['comp2', 'data'] = value
if header_swo_byte & 0x04:
df_append.at['comp2', 'operation'] = 1 # "1" stands for write
df_append.at['comp2', 'operation'] = 'w'
else:
df_append.at['comp2', 'operation'] = 0 # "0" stands for read
df_append.at['comp2', 'operation'] = 'r'
else:
df_append.at['comp3', 'comparator'] = 3
df_append.at['comp3', 'data'] = value
if header_swo_byte & 0x04:
df_append.at['comp3', 'operation'] = 1 # "1" stands for write
df_append.at['comp3', 'operation'] = 'w'
else:
df_append.at['comp3', 'operation'] = 0 # "0" stands for read
df_append.at['comp3', 'operation'] = 'r'
# A PC or address packet
elif not header_swo_byte & 0x80:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment