Commit fe491303 authored by Reto Da Forno's avatar Reto Da Forno
Browse files

changes by ldario added

parent 11dbf4fc
......@@ -59,7 +59,7 @@ class StopTestThread(threading.Thread):
logger.debug("Start StopTestThread for observer ID %d"%(self._obsdict_key[self._obskey][1]))
errors = []
# First test if the observer is online and if the SD card is mounted:
cmd = ['ssh', '%s'%(self._obsdict_key[self._obskey][2]), "ls mmc/"]
cmd = ['ssh', '%s'%(self._obsdict_key[self._obskey][2]), "mount | grep /dev/mmcblk0p1"]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
while p.returncode == None:
self._abortEvent.wait(1.0)
......
......@@ -12,7 +12,7 @@ import lib.daemon as daemon
import lib.flocklab as flocklab
from lib.flocklab import SUCCESS
import ext_c_modules.lib.python.cResultfetcher as cResultfetcher
from shutil import copyfile
### Global variables ###
###
scriptname = os.path.basename(__main__.__file__)
......@@ -141,8 +141,11 @@ def parse_gpio_setting(buf):
return (_data[0], str(_data[1]), "%i.%06i"%(_data[2],_data[3]), "%i.%06i"%(_data[4],_data[5]))
def parse_gpio_monitor(buf):
_data = struct.unpack("<Iiii",buf) #unsigned int gpio;enum en_edge edge;struct timeval timestamp;
return (_data[0], str(_data[1]), "%i.%06i"%(_data[2],_data[3]))
_data = str(buf).split(";")
logger.debug("BUFFER CONTENT: %s" %str(buf))
return (_data[0], _data[1], _data[2])
#_data = struct.unpack("<Iiii",buf) #unsigned int gpio;enum en_edge edge;struct timeval timestamp;
#return (_data[0], str(_data[1]), "%i.%06i"%(_data[2],_data[3]))
def parse_serial(buf):
_data = struct.unpack("iii%ds" % (len(buf) - 12),buf) #int service; struct timeval timestamp;char * data
......@@ -163,10 +166,10 @@ def convert_gpio_setting(obsdata, observer_id, node_id):
return "%s,%s,%s,%s,%s,%s\n" %(obsdata[2], obsdata[3], observer_id, node_id, pindict[obsdata[0]][0], obsdata[1])
def convert_gpio_monitor(obsdata, observer_id, node_id):
return "%s,%s,%s,%s,%s\n" %(obsdata[2], observer_id, node_id, pindict[obsdata[0]][0], obsdata[1])
return "%s,%s,%s,%s,%s\n" %(obsdata[1], observer_id, node_id, obsdata[0], obsdata[2])
def convert_serial(obsdata, observer_id, node_id):
return "%s,%s,%s,%s,%s\n" %(obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1].decode('utf-8'))
return "%s,%s,%s,%s,%s\n" %(obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1])
def convert_error_log(obsdata, observer_id, node_id):
return "%s,%s,%s,%s\n" %(obsdata[2], observer_id, node_id, obsdata[1])
......@@ -285,6 +288,45 @@ def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=No
##############################################################################
#
# worker_gpiotracing: Worker function for converting and aggregating gpio
# tracing data. Unlike for the other services, this function works on
# whole observer DB files.
#
##############################################################################
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, slotcalib_factor=1, slotcalib_offset=0, vizimgdir=None, viz_f=None, logqueue=None):
try:
_errors = []
#cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s"%(fdir,f)
#loggername = "(%s).(Observer %d)"%(cur_p.name, obsid)
with open(resultfile_path, "a") as outfile:
infile = open(obsdbfile_path, "r")
for line in infile:
outfile.write("%s,%s,%s" % (obsid, nodeid, str(line)))
infile.close()
#os.remove(obsdbfile_path)
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
return (_errors, processeditem)
except:
msg = "Error in gpiotracing worker process: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
_errors.append((msg, errno.ECOMM, obsid))
logqueue.put_nowait((loggername, logging.ERROR, msg))
finally:
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
return (_errors, tuple(processeditem))
### END worker_gpiotracing
##############################################################################
#
# worker_powerprof: Worker function for converting and aggregating power
......@@ -300,37 +342,18 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, slotcali
obsdbfile_path = "%s/%s"%(fdir,f)
loggername = "(%s).(Observer %d)"%(cur_p.name, obsid)
# Rename file:
logqueue.put_nowait((loggername, logging.DEBUG, "Import file %s"%obsdbfile_path))
with open(resultfile_path, "a") as outfile:
infile = open(obsdbfile_path, "r")
for line in infile:
outfile.write("%s,%s,%s" % (obsid, nodeid, str(line)))
infile.close()
# Use fast C-implementation to fetch values:
try:
#logqueue.put_nowait((loggername, logging.DEBUG, "cResultfetcher started. obsid: %d, nodeid: %d, slotcalib_factor: %f, slotcalib_offset: %f"%(int(obsid), int(nodeid), slotcalib_factor, slotcalib_offset)))
ret = cResultfetcher.getppresults(obsid=int(obsid), nodeid=int(nodeid), obsdbfilepath=obsdbfile_path, resultfilepath=resultfile_path, slotcalib_factor=slotcalib_factor, slotcalib_offset=slotcalib_offset)
#logqueue.put_nowait((loggername, logging.DEBUG, "cResultfetcher done."))
except:
raise
# Do vizualisation:
if ((type(ret) == list) and (len(ret) > 0)):
if viz_f != None:
#logqueue.put_nowait((loggername, logging.DEBUG, "Viz started..."))
viz_f(testid, owner_fk, ret, obsid, vizimgdir, logger)
#logqueue.put_nowait((loggername, logging.DEBUG, "Viz done."))
ppstats = PpStatsQueue.get()
(totalavg, totalcount) = ppstats[obsid]
newcount = totalcount + ret[3]
if (newcount != 0):
avg = float(totalcount) / newcount * totalavg + float(ret[3]) / newcount * ret[2]
ppstats[obsid] = (avg,newcount)
else:
logqueue.put_nowait((loggername, logging.WARN, "Bad count"))
PpStatsQueue.put(ppstats)
#logqueue.put_nowait((loggername, logging.DEBUG, "AVG values: %f %d + (%f %d) -> %f %d" % (totalavg, totalcount, ret[2], ret[3], avg, newcount)))
# Remove processed file:
os.unlink(obsdbfile_path)
os.remove(obsdbfile_path)
logger.debug("Finished with powerprof collection.")
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
return (_errors, processeditem)
except:
msg = "Error in powerprof worker process: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
_errors.append((msg, errno.ECOMM, obsid))
......@@ -395,7 +418,6 @@ def worker_flockdaq(queueitem=None, nodeid=None, tracingresults_path=None, actua
logger.debug("Done converting FlockDAQ results...")
# Do vizualisation:
workerstate = None
if ((type(ret) == list) and (len(ret) > 0)):
......@@ -1059,11 +1081,11 @@ def main(argv):
if service == 'errorlog':
header = '# timestamp,observer_id,node_id,errormessage\n'
elif service == 'gpiotracing':
header = '# timestamp,observer_id,node_id,pin_name,value\n'
header = 'observer_id,node_id,pin_name,# timestamp,value\n'
elif service == 'gpioactuation':
header = '# timestamp_planned,timestamp_executed,observer_id,node_id,pin_name,value\n'
elif service == 'powerprofiling':
header = '# timestamp,observer_id,node_id,value_mA\n'
header = 'This is a RocketLogger File with additional information about the observer id and the node id.\n'
elif service == 'serial':
header = '# timestamp,observer_id,node_id,direction,output\n'
elif service == 'powerprofilingstats':
......@@ -1189,6 +1211,7 @@ def main(argv):
nodeid = obsdict_byid[obsid][0]
callback_f = worker_callback
worker_f = worker_convert_and_aggregate
quick_test = False
# Match the filename against the patterns and schedule an appropriate worker function:
if (re.search("^gpio_setting_[0-9]{14}\.db$", f) != None):
#logger.debug(loggerprefix + "File %s contains GPIO setting results"%f)
......@@ -1197,9 +1220,14 @@ def main(argv):
elif (re.search("^gpio_monitor_[0-9]{14}\.db$", f) != None):
#logger.debug(loggerprefix + "File %s contains GPIO monitoring results"%f)
pool = service_pools_dict['gpiotracing']
worker_args = [nextitem, nodeid, testresultsfile_dict['gpiotracing'][0], testresultsfile_dict['gpiotracing'][1], commitsize, vizimgdir, parse_gpio_monitor, convert_gpio_monitor, None, logqueue]
logger.debug(loggerprefix + "resultfile_path: %s" % str(testresultsfile_dict['gpiotracing'][0]))
logger.debug(loggerprefix + "queue item: %s" % str(nextitem))
logger.debug(loggerprefix + "node id: %s" % str(nodeid))
worker_args = [nextitem, nodeid, testresultsfile_dict['gpiotracing'][0], obsdict_byid[obsid][1][1], obsdict_byid[obsid][1][0], vizimgdir, None, logqueue]
worker_f = worker_gpiotracing
quick_test = True
if (enableviz == 1):
worker_args[8] = flocklab.viz_gpio_monitor
worker_args[6] = flocklab.viz_gpio_monitor
elif (re.search("^powerprofiling_[0-9]{14}\.db$", f) != None):
#logger.debug(loggerprefix + "File %s contains power profiling results"%f)
# Power profiling has a special worker function which parses the whole file in a C module:
......@@ -1231,6 +1259,11 @@ def main(argv):
logger.warn(loggerprefix + "DB file %s/%s from observer %s did not match any of the known patterns" %(fdir, f, obsid))
continue
# Schedule worker function from the service's pool. The result will be reported to the callback function.
if quick_test:
logger.debug("GPIO MONITOR GETS STARTED")
else:
logger.debug("OTHER SERVICE DATA GETS COLLECTED")
pool.apply_async(func=worker_f, args=tuple(worker_args), callback=callback_f)
# Stop signal for main loop has been set ---
# Stop worker pool:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment