Commit 30fc22dc authored by Reto Da Forno's avatar Reto Da Forno
Browse files

dwt parser updated, output in fetcher adjusted

parent 1ee30282
......@@ -449,23 +449,24 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue
continue
if flocklab.parse_int(var) < len(varnames):
var = varnames[flocklab.parse_int(var)]
if pc:
pc = "0x%08x" % flocklab.parse_int(pc)
# output format: timestamp,observer_id,node_id,variable,value,access,pc
outfile.write("%s,%s,%s,%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, var, val, access, pc))
infile.close()
# delete files
os.remove(input_filename)
os.remove(tmpfile1)
os.remove(tmpfile2)
# debug
#shutil.copyfile(input_filename, "%s_raw" % resultfile_path)
#shutil.copyfile(tmpfile1, "%s_uncorrected.csv" % resultfile_path)
except:
msg = "Error in datatrace worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
# for some reason, the logging below does not work properly -> print msg into the log directly
logger = flocklab.get_logger()
logger.error(msg)
#logger = flocklab.get_logger()
#logger.error(msg)
_errors.append((msg, errno.ECOMM, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
# delete files
os.remove(input_filename)
os.remove(tmpfile1)
os.remove(tmpfile2)
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
return (_errors, tuple(processeditem))
......@@ -800,36 +801,44 @@ class WorkManager():
return tuple(stateitem)
def add(self, item):
service = self.pattern.sub("",item[3])
obsid = item[1]
if service not in self.worklist:
self.worklist[service] = {}
if obsid not in self.worklist[service]:
self.worklist[service][obsid] = [None, []] # workerstate / worklist
# if list is empty, we're good to process, otherwise just append it and return None
if len(self.worklist[service][obsid][1]) == 0:
self.worklist[service][obsid][1].append(item)
self.workcount = self.workcount + 1
return self._next_item_with_state(service, obsid)
else:
self.worklist[service][obsid][1].append(item)
self.workcount = self.workcount + 1
return None
try:
service = self.pattern.sub("",item[3])
obsid = item[1]
if service not in self.worklist:
self.worklist[service] = {}
if obsid not in self.worklist[service]:
self.worklist[service][obsid] = [None, []] # workerstate / worklist
# if list is empty, we're good to process, otherwise just append it and return None
if len(self.worklist[service][obsid][1]) == 0:
self.worklist[service][obsid][1].append(item)
self.workcount = self.workcount + 1
return self._next_item_with_state(service, obsid)
else:
self.worklist[service][obsid][1].append(item)
self.workcount = self.workcount + 1
return None
except:
logger = flocklab.get_logger()
logger.error("Error in WorkManager.add(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
def done(self, item):
service = self.pattern.sub("",item[3])
obsid = item[1]
if item[1:-1] == self.worklist[service][obsid][1][0][1:-1]:
self.worklist[service][obsid][0] = item[4] # save state
self.worklist[service][obsid][1].pop(0)
self.workcount = self.workcount - 1
else:
logger.error("work done for item that was not enqueued: %s" % str(item))
# if there is more work to do, return next item
if len(self.worklist[service][obsid][1]) > 0:
return self._next_item_with_state(service, obsid)
else:
return None
try:
service = self.pattern.sub("",item[3])
obsid = item[1]
if item[1:-1] == self.worklist[service][obsid][1][0][1:-1]:
self.worklist[service][obsid][0] = item[4] # save state
self.worklist[service][obsid][1].pop(0)
self.workcount = self.workcount - 1
else:
logger.error("work done for item that was not enqueued: %s" % str(item))
# if there is more work to do, return next item
if len(self.worklist[service][obsid][1]) > 0:
return self._next_item_with_state(service, obsid)
else:
return None
except:
logger = flocklab.get_logger()
logger.error("Error in WorkManager.done(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
def finished(self):
return self.workcount == 0
......@@ -1205,6 +1214,7 @@ def main(argv):
continue
# Schedule worker function from the service's pool. The result will be reported to the callback function.
pool.apply_async(func=worker_f, args=tuple(worker_args), callback=callback_f)
# Stop signal for main loop has been set ---
# Stop worker pool:
for service, pool in service_pools_dict.items():
......
......@@ -64,13 +64,15 @@ new_row.index = index_
# parse_fun will then directly create the csv file.
def parse_dwt_output(input_file='swo_read_log', output_file='swo_read_log.csv'):
def parse_dwt_output(input_file='swo_read_log', output_file='swo_read_log.csv', threads=False):
"""
Starts the read and parse thread that will read from the given input_file and parse the content
It will save the parsed contents in the file specified as second argument
Parameters:
input_file (str): name of the file to parse
output_file (str): name of the file to parse
threads (bool): if set to true program will run using 2 threads, else first read then parse
Returns:
int: True if the program was halted by Key interrupt
......@@ -85,27 +87,33 @@ def parse_dwt_output(input_file='swo_read_log', output_file='swo_read_log.csv'):
sys.stdout.write('configure and read demo\n')
sys.stdout.write('Press Ctrl-C to Exit\n')
# Create threads
read_thread = threading.Thread(target=read_fun, args=(swo_queue, global_ts_queue, input_file))
read_thread.setDaemon(True)
parse_thread = threading.Thread(target=parse_fun, args=(swo_queue, global_ts_queue))
parse_thread.setDaemon(True)
if threads:
# Create threads
read_thread = threading.Thread(target=read_fun, args=(swo_queue, global_ts_queue, input_file))
read_thread.setDaemon(True)
parse_thread = threading.Thread(target=parse_fun, args=(swo_queue, global_ts_queue))
parse_thread.setDaemon(True)
# Starts threads
read_thread.start()
parse_thread.start()
# Starts threads
read_thread.start()
parse_thread.start()
while True:
time.sleep(1)
if both_threads_done:
break
while True:
time.sleep(1)
if both_threads_done:
break
else:
read_fun(swo_queue, global_ts_queue, input_file)
parse_fun(swo_queue, global_ts_queue)
# df = df_queue.get()
# convert the pandas data frame to a csv file
df.to_csv(output_file, index=False, header=True)
read_thread.join() # wait for the threads to end
parse_thread.join()
if threads:
read_thread.join() # wait for the threads to end
parse_thread.join()
return 0 # exit the program execution
......@@ -302,23 +310,20 @@ def pars_hard(header_swo_byte, swo_queue):
df_append.at['comp3', 'operation'] = 'r'
# A PC or address packet
elif not header_swo_byte & 0x80:
else:
if comparator_id == 0:
df_append.at['comp0', 'comparator'] = 0
df_append.at['comp0', 'PC'] = value
df_append.at['comp0', 'PC'] = hex(value)
elif comparator_id == 1:
df_append.at['comp1', 'comparator'] = 1
df_append.at['comp1', 'PC'] = value
df_append.at['comp1', 'PC'] = hex(value)
elif comparator_id == 2:
df_append.at['comp2', 'comparator'] = 2
df_append.at['comp2', 'PC'] = value
df_append.at['comp2', 'PC'] = hex(value)
else:
df_append.at['comp3', 'comparator'] = 3
df_append.at['comp3', 'PC'] = value
df_append.at['comp3', 'PC'] = hex(value)
else:
if logging_on:
print("unknown HW packet type")
def timestamp_parse(swo_queue, global_ts_queue):
......@@ -331,7 +336,7 @@ def timestamp_parse(swo_queue, global_ts_queue):
i = 0
local_ts_delta = 0
while True:
while i < 4:
if read_thread_ended and not swo_queue: # to not get blocked on queue.get() first check if should stop
return
buf[i] = swo_queue.pop()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment