Commit 245272b8 authored by Reto Da Forno's avatar Reto Da Forno
Browse files

parsing of RL data added

parent 50d63b1f
......@@ -4,7 +4,7 @@
CRONLOG=/home/flocklab/logs/cron.log
* * * * * /home/flocklab/testmanagementserver/flocklab_scheduler.py --debug >> $CRONLOG 2>&1
*/10 * * * * /home/flocklab/testmanagementserver/flocklab_cleaner.py --debug >> $CRONLOG 2>&1
0 * * * * /home/flocklab/testmanagementserver/flocklab_cleaner.py --debug >> $CRONLOG 2>&1
0 5 * * * /home/flocklab/testmanagementserver/flocklab_retention_cleaner.py --debug >> $CRONLOG 2>&1
0 0 * * * /usr/sbin/logrotate --state /home/flocklab/logrotate.state /home/flocklab/logrotate >> $CRONLOG 2>&1
0 2 * * 1 php /home/flocklab/webserver/update_stats.php >> $CRONLOG 2>&1
......@@ -673,13 +673,13 @@ def start_test(testid, cur, cn, obsdict_key, obsdict_id):
thread.join(timeout=(flocklab.config.getint('tests','setuptime')*0.75*60))
if thread.isAlive():
# Timeout occurred. Signal the thread to abort:
logger.error("Telling thread for test start on observer ID %s to abort..." %(str(obsdict_key[obskey][1])))
logger.error("Telling thread for test start on observer ID %s to abort..." % (str(obsdict_key[obskey][1])))
thread.abort()
# Wait again for the aborted threads:
for (thread, obskey) in thread_list:
thread.join(timeout=10)
if thread.isAlive():
msg = "Thread for test start on observer ID %s is still alive but should be aborted now." %(str(obsdict_key[obskey][1]))
msg = "Thread for test start on observer ID %s timed out and will be aborted now." % (str(obsdict_key[obskey][1]))
errors.append(msg)
logger.error(msg)
# -- END OF CRITICAL SECTION where dispatcher accesses used observers
......
......@@ -3,6 +3,8 @@
import os, sys, getopt, traceback, MySQLdb, signal, random, time, errno, multiprocessing, subprocess, re, logging, __main__, threading, struct, types, queue, math, shutil, lxml.etree
import lib.daemon as daemon
import lib.flocklab as flocklab
from rocketlogger.data import RocketLoggerData
import pandas as pd
logger = None
......@@ -102,7 +104,7 @@ def sigterm_handler(signum, frame):
cur.close()
cn.close()
except:
logger.warn("Could not connect to database")
logger.warn("Could not connect to database.")
# Tell the main loop to stop:
mainloop_stop = True
......@@ -119,17 +121,10 @@ def parse_gpio_setting(buf):
_data = struct.unpack("<Iiiiii",buf) #unsigned int gpio;int value;struct timeval time_planned;struct timeval time_executed;
return (_data[0], str(_data[1]), "%i.%06i" % (_data[2],_data[3]), "%i.%06i" % (_data[4],_data[5]))
def parse_gpio_monitor(buf):
_data = str(buf).split(";")
logger.debug("BUFFER CONTENT: %s" %str(buf))
return (_data[0], _data[1], _data[2])
#_data = struct.unpack("<Iiii",buf) #unsigned int gpio;enum en_edge edge;struct timeval timestamp;
#return (_data[0], str(_data[1]), "%i.%06i" % (_data[2],_data[3]))
def parse_serial(buf):
_data = struct.unpack("iii%ds" % (len(buf) - 12),buf) #int service; struct timeval timestamp;char * data
return (_data[0], _data[3], "%i.%06i" % (_data[1],_data[2]))
def parse_error_log(buf):
_data = struct.unpack("<iii%ds" % (len(buf) - 12),buf) #struct timeval timestamp; int service_fk; char errormessage[1024];
return (str(_data[2]), _data[3], "%i.%06i" % (_data[0],_data[1]))
......@@ -188,7 +183,7 @@ def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=No
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir,f)
loggername = "(%s).(Observer %d)" % (cur_p.name, obsid)
loggername = "(%s.Obs%d)" % (cur_p.name, obsid)
#logqueue.put_nowait((loggername, logging.DEBUG, "Import file %s"%obsdbfile_path))
# Open file:
dbfile = open(obsdbfile_path, 'rb')
......@@ -272,7 +267,7 @@ def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, slotca
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir, f)
loggername = "(%s).(Observer %d)" % (cur_p.name, obsid)
loggername = "(%s.Obs%d)" % (cur_p.name, obsid)
with open(resultfile_path, "a") as outfile:
infile = open(obsdbfile_path, "r")
......@@ -310,24 +305,21 @@ def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, slotca
##############################################################################
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, slotcalib_factor=1, slotcalib_offset=0, vizimgdir=None, viz_f=None, logqueue=None, PpStatsQueue=None):
try:
channel_names = ['I1','V1','V2']
_errors = []
cur_p = multiprocessing.current_process()
(itemtype, obsid, fdir, f, workerstate) = queueitem
obsdbfile_path = "%s/%s" % (fdir, f)
loggername = "(%s.Obs%d) " % (cur_p.name, obsid)
resultsfile = "%s_%s.%s" % (os.path.splitext(resultfile_path)[0], obsid, os.path.splitext(resultfile_path)[1])
# TODO for now, just move the file without modifying it
os.rename(obsdbfile_path, resultsfile)
logger.debug("File '%s' moved to '%s'." % (obsdbfile_path, resultsfile))
'''
with open(resultfile_path, "a") as outfile:
infile = open(obsdbfile_path, "r")
for line in infile:
outfile.write("%s,%s,%s" % (obsid, nodeid, str(line)))
infile.close()
rld_data = RocketLoggerData(obsdbfile_path).merge_channels()
rld_dataframe = pd.DataFrame(rld_data.get_data(channel_names), index=rld_data.get_time(), columns=channel_names)
rld_dataframe.insert(0, 'observer_id', obsid)
rld_dataframe.insert(1, 'node_id', nodeid)
rld_dataframe.to_csv(resultfile_path, sep=',', index_label='time', header=False, mode='a')
os.remove(obsdbfile_path)
'''
processeditem = list(queueitem)
processeditem[0] = ITEM_PROCESSED
......@@ -466,7 +458,7 @@ class FetchObsThread(threading.Thread):
if len(copyfilelist) > 0:
# Download the database files:
self._logger.debug(self._loggerprefix + "Downloading database files %s" % (" ".join(copyfilelist)))
self._logger.debug(self._loggerprefix + "Downloading results files %s" % (" ".join(copyfilelist)))
cmd = ['scp', '-q' ]
cmd.extend(["%s:%s" % (self._obsethernet, x) for x in copyfilelist])
cmd.append("%s/" % self._obsfiledir)
......@@ -474,10 +466,10 @@ class FetchObsThread(threading.Thread):
out, err = p.communicate(None)
rs = p.wait()
if rs != 0:
self._logger.debug(self._loggerprefix + "Could not download all DB files from observer. Dataloss occurred for this observer.")
self._logger.debug(self._loggerprefix + "Could not download all results files from observer. Dataloss occurred for this observer.")
self._logger.debug(self._loggerprefix + "Tried to execute %s, result was %d, stdout: %s, error: %s" % (str(cmd), rs, out, err))
else:
self._logger.debug("Downloaded result DB files from observer.")
self._logger.debug("Downloaded results files from observer.")
# put a copy to the debug directory
for f in copyfilelist:
fname = os.path.basename(f)
......@@ -506,7 +498,7 @@ class FetchObsThread(threading.Thread):
self._logger.debug(self._loggerprefix + "No files left to delete on observer.")
else:
self._logger.debug(self._loggerprefix + "No files to download from observer.")
if removelast == False: # this is the last execution of the while loop
cmd = ['ssh' ,'%s'%(self._obsethernet), "rm -rf %s" % self._obstestresfolder]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
......@@ -514,7 +506,7 @@ class FetchObsThread(threading.Thread):
rs = p.wait()
if (rs != flocklab.SUCCESS):
self._logger.error(self._loggerprefix + "Could not remove results directory from observer, result was %d, stdout: %s, error: %s" % (rs, out, err))
else:
self._logger.error(self._loggerprefix + "SSH to observer did not succeed, result was %d, error: %s" % (rs, err))
break # abort
......@@ -548,7 +540,7 @@ def start_fetcher():
try:
(cn, cur) = flocklab.connect_to_db()
except:
flocklab.error_logandexit("Could not connect to database", errno.EAGAIN)
flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
try:
cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address
FROM `tbl_serv_observer` AS `a`
......@@ -644,7 +636,7 @@ def stop_fetcher():
cur.close()
cn.close()
except:
logger.warn("Could not connect to database")
logger.warn("Could not connect to database.")
return errno.ENOPKG
......@@ -792,8 +784,7 @@ def main(argv):
try:
(cn, cur) = flocklab.connect_to_db()
except:
msg = "Could not connect to database"
flocklab.error_logandexit(msg, errno.EAGAIN)
flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
rs = flocklab.check_test_id(cur, testid)
cur.close()
cn.close()
......@@ -828,8 +819,7 @@ def main(argv):
try:
(cn, cur) = flocklab.connect_to_db()
except:
msg = "Could not connect to database"
flocklab.error_logandexit(msg, errno.EAGAIN)
flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
rs = flocklab.get_test_owner(cur, testid)
if isinstance(rs, tuple):
owner_fk = rs[0]
......@@ -909,7 +899,7 @@ def main(argv):
os.kill(os.getpid(), signal.SIGTERM)
flocklab.error_logandexit(msg, errno.EAGAIN)
else:
logger.debug("Got all needed metadata.")
logger.debug("Got all needed metadata.")
# Start aggregating processes ---
""" There is an infinite loop which gets files to process from the fetcher threads which download data from the observers.
......@@ -937,7 +927,7 @@ def main(argv):
elif service == 'gpioactuation':
header = '# timestamp_planned,timestamp_executed,observer_id,node_id,pin_name,value\n'
elif service == 'powerprofiling':
header = 'This is a RocketLogger File with additional information about the observer id and the node id.\n'
header = 'timestamp,observer_id,node_id,I1,V1,V2\n'
elif service == 'serial':
header = '# timestamp,observer_id,node_id,direction,output\n'
elif service == 'powerprofilingstats':
......@@ -1058,7 +1048,7 @@ def main(argv):
logger.debug(loggerprefix + "Next item is None.")
continue
(itemtype, obsid, fdir, f, workerstate) = nextitem
logger.debug(loggerprefix + "Next item is %s, %s/%s" % (str(obsid), fdir, f))
logger.debug(loggerprefix + "Next item is %s/%s (Obs%s)." % (fdir, f, str(obsid)))
nodeid = obsdict_byid[obsid][0]
callback_f = worker_callback
worker_f = worker_convert_and_aggregate
......@@ -1091,7 +1081,7 @@ def main(argv):
pool = service_pools_dict['errorlog']
worker_args = [nextitem, nodeid, testresultsfile_dict['errorlog'][0], testresultsfile_dict['errorlog'][1], commitsize, vizimgdir, parse_error_log, convert_error_log, None, logqueue]
else:
logger.warn(loggerprefix + "DB file %s/%s from observer %s did not match any of the known patterns" %(fdir, f, obsid))
logger.warn(loggerprefix + "Results file %s/%s from observer %s did not match any of the known patterns" %(fdir, f, obsid))
continue
# Schedule worker function from the service's pool. The result will be reported to the callback function.
pool.apply_async(func=worker_f, args=tuple(worker_args), callback=callback_f)
......@@ -1126,7 +1116,7 @@ def main(argv):
cur.close()
cn.close()
except:
logger.warn("Could not connect to database")
logger.warn("Could not connect to database.")
# Delete the obsfile directories as they are not needed anymore:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment