flocklab_fetcher.py 51.8 KB
Newer Older
1
2
#! /usr/bin/env python3

3
import os, sys, getopt, traceback, MySQLdb, signal, random, time, errno, multiprocessing, subprocess, re, logging, __main__, threading, struct, types, queue, math, shutil, lxml.etree
4
5
import lib.daemon as daemon
import lib.flocklab as flocklab
Reto Da Forno's avatar
Reto Da Forno committed
6
7
from rocketlogger.data import RocketLoggerData
import pandas as pd
Reto Da Forno's avatar
Reto Da Forno committed
8

9

Reto Da Forno's avatar
Reto Da Forno committed
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
logger                   = None
debug                    = False
testid                   = None 
errors                   = []
FetchObsThread_list      = []
FetchObsThread_stopEvent = None
FetchObsThread_queue     = None
obsfiledir               = None
testresultsdir           = None
testresultsfile_dict     = {}
mainloop_stop            = False
owner_fk                 = None
pindict                  = None
obsdict_byid             = None
servicedict              = None
serialdict               = None
26
27

ITEM_TO_PROCESS = 0
Reto Da Forno's avatar
Reto Da Forno committed
28
ITEM_PROCESSED  = 1
29

30

31
32
33
34
35
36
##############################################################################
#
# Error classes
#
##############################################################################
class DbFileEof(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
37
    pass
38
39

class DbFileReadError(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
40
41
42
43
    def __init__(self, expectedSize, actualSize, fpos):
        self.expectedSize = expectedSize
        self.actualSize = actualSize
        self.fpos = fpos
44
45
46
47
48
49
50
51
52
53
### END Error classes



##############################################################################
#
# Class ServiceInfo
#
##############################################################################
class ServiceInfo():
Reto Da Forno's avatar
Reto Da Forno committed
54
55
56
    def __init__(self, servicename):
        self.servicename = servicename
        self.files = []
57
        self.pattern = "^%s_[0-9]+\.[a-z]+$" % servicename
Reto Da Forno's avatar
Reto Da Forno committed
58
59
60
61
62
63
64
65
66
67
68
    
    def matchFileName(self, filename):
        return re.search(self.pattern, os.path.basename(filename)) is not None
        
    def addFile(self, filename):
        self.files.append(filename)
    
    def stripFileList(self, removelast=True):
        self.files.sort()
        if ((len(self.files) > 0) and removelast):
            self.files.pop()
69
70
71
72
73
74
75
76
77
### END ServiceInfo


##############################################################################
#
# sigterm_handler
#
##############################################################################
def sigterm_handler(signum, frame):
Reto Da Forno's avatar
Reto Da Forno committed
78
79
80
81
82
83
84
85
86
87
88
    """If the program is terminated by sending it the signal SIGTERM 
    (e.g. by executing 'kill') or SIGINT (pressing ctrl-c), 
    this signal handler is invoked for cleanup."""
    
    global mainloop_stop
    global FetchObsThread_stopEvent

    logger.info("Process received SIGTERM or SIGINT signal")
        
    # Signal all observer fetcher threads to stop:
    logger.debug("Stopping observer fetcher threads...")
Reto Da Forno's avatar
Reto Da Forno committed
89
    shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
Reto Da Forno's avatar
Reto Da Forno committed
90
91
92
93
94
95
96
97
98
99
100
101
    try:
        FetchObsThread_stopEvent.set()
    except:
        pass
    for thread in FetchObsThread_list:
        try:
            thread.join(shutdown_timeout)
        except:
            logger.warn("Fetcher thread did not stop within %d seconds." % shutdown_timeout)
    # Set DB status:
    logger.debug("Setting test status in DB to 'syncing'...")
    try:
Reto Da Forno's avatar
Reto Da Forno committed
102
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
103
104
105
106
        flocklab.set_test_status(cur, cn, testid, 'syncing')
        cur.close()
        cn.close()
    except:
Reto Da Forno's avatar
Reto Da Forno committed
107
        logger.warn("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
108
109
110
111
    
    # Tell the main loop to stop:
    mainloop_stop = True
    logger.debug("Set stop signal for main loop.")
112
113
114
115
116
117
118
119
120
### END sigterm_handler


##############################################################################
#
# Functions for parsing observer DB files data
#
##############################################################################
def parse_gpio_setting(buf):
Reto Da Forno's avatar
Reto Da Forno committed
121
    _data = struct.unpack("<Iiiiii",buf) #unsigned int gpio;int value;struct timeval time_planned;struct timeval time_executed;
Reto Da Forno's avatar
Reto Da Forno committed
122
    return (_data[0], str(_data[1]), "%i.%06i" % (_data[2],_data[3]), "%i.%06i" % (_data[4],_data[5]))
123
124

def parse_serial(buf):
Reto Da Forno's avatar
Reto Da Forno committed
125
    _data = struct.unpack("iii%ds" % (len(buf) - 12),buf) #int service; struct timeval timestamp;char * data
Reto Da Forno's avatar
Reto Da Forno committed
126
    return (_data[0], _data[3], "%i.%06i" % (_data[1],_data[2]))
Reto Da Forno's avatar
Reto Da Forno committed
127

128
def parse_error_log(buf):
Reto Da Forno's avatar
Reto Da Forno committed
129
    _data = struct.unpack("<iii%ds" % (len(buf) - 12),buf) #struct timeval timestamp; int service_fk; char errormessage[1024];
Reto Da Forno's avatar
Reto Da Forno committed
130
    return (str(_data[2]), _data[3], "%i.%06i" % (_data[0],_data[1]))
131
132
133
134
135
136
137
138


##############################################################################
#
# Functions for converting observer DB data
#
##############################################################################
def convert_gpio_setting(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
139
    return "%s,%s,%s,%s,%s,%s\n" %(obsdata[2], obsdata[3], observer_id, node_id, pindict[obsdata[0]][0], obsdata[1])
140
141

def convert_gpio_monitor(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
142
    return "%s,%s,%s,%s,%s\n" %(obsdata[1], observer_id, node_id, obsdata[0], obsdata[2])
143
144

def convert_serial(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
145
    return "%s,%s,%s,%s,%s\n" %(obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1])
146
147

def convert_error_log(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
148
    return "%s,%s,%s,%s\n" %(obsdata[2], observer_id, node_id, obsdata[1])
149
150
151
152
153
154
155



##############################################################################
#
# read_from_db_file: Read from an open DB file from an observer
#
156
##############################################################################
157
def read_from_db_file(dbfile):
Reto Da Forno's avatar
Reto Da Forno committed
158
159
160
161
162
163
164
165
166
167
168
169
    _buf = dbfile.read(4)
    if len(_buf) < 4:
        dbfile.close()
        raise DbFileEof()
    else:
        _size = struct.unpack("<I",_buf)
        _buf = dbfile.read(_size[0])
        if len(_buf) != _size[0]:
            _fpos = dbfile.tell() - 4 - len(_buf)
            dbfile.close()
            raise DbFileReadError(_size[0], len(_buf), _fpos)
        return _buf
170
171
172
173
174
175
### END read_from_db_file


##############################################################################
#
# worker_convert_and_aggregate: Worker function for multiprocessing pools.
Reto Da Forno's avatar
Reto Da Forno committed
176
177
#        Parses observer DB files for all services, converts values (if needed)
#        and aggregates them into single test result files.
178
179
180
#
##############################################################################
def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, commitsize=1, vizimgdir=None, parse_f=None, convert_f=None, viz_f=None, logqueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
181
182
183
184
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
Reto Da Forno's avatar
Reto Da Forno committed
185
        obsdbfile_path = "%s/%s" % (fdir,f)
Reto Da Forno's avatar
Reto Da Forno committed
186
        loggername = "(%s.Obs%d)" % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
        #logqueue.put_nowait((loggername, logging.DEBUG, "Import file %s"%obsdbfile_path))
        # Open file:
        dbfile = open(obsdbfile_path, 'rb')
        rows = 0
        viz_values = []
        conv_values = []
        while not dbfile.closed:
            # Process DB file line by line:
            try:
                # Parse one line:
                buf = read_from_db_file(dbfile)
                obsdata = parse_f(buf)
                viz_values.append(obsdata)
                # Convert data if needed:
                if convert_f != None:
                    conv_data = convert_f(obsdata, obsid, nodeid)
                    conv_values.append(conv_data)
                    rows += 1
                # Visualize data:
                if (commitsize > 0) & (rows >= commitsize):
                    if viz_f != None:
                        #logqueue.put_nowait((loggername, logging.DEBUG, "Viz started..."))
                        viz_f(testid, owner_fk, viz_values, obsid, vizimgdir, logger)
                        #logqueue.put_nowait((loggername, logging.DEBUG, "Viz done."))
                    # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
212
                    #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for writing..." % (resultfile_path)))
Reto Da Forno's avatar
Reto Da Forno committed
213
214
215
216
217
                    resultfile_lock.acquire()
                    f = open(resultfile_path, 'a')
                    f.writelines(conv_values)
                    f.close()
                    resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
218
                    logqueue.put_nowait((loggername, logging.DEBUG, "Committed results to %s after %d rows" % (resultfile_path, rows)))
Reto Da Forno's avatar
Reto Da Forno committed
219
220
221
222
223
224
225
226
227
228
                    rows = 0
                    conv_values = []
                    viz_values = []
            except DbFileEof:
                # logqueue.put_nowait((loggername, logging.DEBUG, "DbFileEof has occurred."))
                break # dbfile has been closed in parser (most likely because EOF was reached)
            except DbFileReadError as err:
                msg = "%s: Packet size (%i) did not match payload size (%i) @ %d." %(obsdbfile_path, err.expectedSize, err.actualSize, err.fpos)
                _errors.append((msg, errno.EIO, obsid))
                logqueue.put_nowait((loggername, logging.ERROR, msg))
229
230
231
232
233
234
235
        if (len(conv_values) > 0):
            # There is still data left. Do a last commit:
            if (viz_f != None) and (len(viz_values) > 0):
                #logqueue.put_nowait((loggername, logging.DEBUG, "Viz started..."))
                viz_f(testid, owner_fk, viz_values, obsid, vizimgdir, logger)
                #logqueue.put_nowait((loggername, logging.DEBUG, "Viz done."))
            # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
236
            #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for final writing..." % (resultfile_path)))
237
238
239
240
241
            resultfile_lock.acquire()
            f = open(resultfile_path, 'a')
            f.writelines(conv_values)
            f.close()
            resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
242
            logqueue.put_nowait((loggername, logging.DEBUG, "Committed final results to %s after %d rows" % (resultfile_path, rows)))
243
        # Remove processed file:
Reto Da Forno's avatar
Reto Da Forno committed
244
        #logqueue.put_nowait((loggername, logging.DEBUG, "Remove %s" % (obsdbfile_path)))
245
        os.unlink(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
246
247
248
249
250
251
252
253
    except:
        msg = "Error in worker process: %s: %s\n%s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
254
255
256
### END worker_convert_and_aggregate


Reto Da Forno's avatar
Reto Da Forno committed
257
258
259
260
261
262
263
264
265
266
##############################################################################
#
# worker_gpiotracing: Worker function for converting and aggregating gpio
#               tracing data. Unlike for the other services, this function works on
#               whole observer DB files.
#
##############################################################################
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, slotcalib_factor=1, slotcalib_offset=0, vizimgdir=None, viz_f=None, logqueue=None):
    try:
        _errors = []
267
        cur_p = multiprocessing.current_process()
Reto Da Forno's avatar
Reto Da Forno committed
268
        (itemtype, obsid, fdir, f, workerstate) = queueitem
269
        obsdbfile_path = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
270
        loggername = "(%s.Obs%d)" % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
271
272

        with open(resultfile_path, "a") as outfile:
273
274
275
276
277
278
279
280
281
282
            infile = open(obsdbfile_path, "r")
            for line in infile:
                try:
                    (timestamp, pin, level) = line.split(',')
                    outfile.write("%s,%s,%s,%s,%s" % (timestamp, obsid, nodeid, pin, level))
                except ValueError:
                    logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
                    break
            infile.close()
        os.remove(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298

        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED

        return (_errors, processeditem)
    except:
        msg = "Error in gpiotracing worker process: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_gpiotracing


299
300
301
##############################################################################
#
# worker_powerprof: Worker function for converting and aggregating power
Reto Da Forno's avatar
Reto Da Forno committed
302
303
#        profiling data. Unlike for the other services, this function works on
#        whole observer DB files.
304
305
306
#
##############################################################################
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, slotcalib_factor=1, slotcalib_offset=0, vizimgdir=None, viz_f=None, logqueue=None, PpStatsQueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
307
    try:
Reto Da Forno's avatar
Reto Da Forno committed
308
        channel_names = ['I1','V1','V2']
Reto Da Forno's avatar
Reto Da Forno committed
309
310
311
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
312
313
        obsdbfile_path = "%s/%s" % (fdir, f)
        loggername = "(%s.Obs%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
314
315
316
317
318
319
320

        rld_data = RocketLoggerData(obsdbfile_path).merge_channels()
        rld_dataframe = pd.DataFrame(rld_data.get_data(channel_names), index=rld_data.get_time(), columns=channel_names)
        rld_dataframe.insert(0, 'observer_id', obsid)
        rld_dataframe.insert(1, 'node_id', nodeid)
        rld_dataframe.to_csv(resultfile_path, sep=',', index_label='time', header=False, mode='a')

Reto Da Forno's avatar
Reto Da Forno committed
321
        os.remove(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
322
        
Reto Da Forno's avatar
Reto Da Forno committed
323
324
325
326
327
328
329
330
331
332
333
334
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        
        return (_errors, processeditem)
    except:
        msg = "Error in powerprof worker process: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
335
336
337
338
339
340
### END worker_powerprof


##############################################################################
#
# worker_callback: Callback function which reports errors from worker processes
Reto Da Forno's avatar
Reto Da Forno committed
341
#        back to the main process
342
343
344
#
##############################################################################
def worker_callback(result):
Reto Da Forno's avatar
Reto Da Forno committed
345
346
347
348
349
    global errors
    global FetchObsThread_queue
    
    if len(result[0]) > 0:
        for (err, eno, obsid) in result:
Reto Da Forno's avatar
Reto Da Forno committed
350
            msg = "Error %d when processing results for Observer ID %s: %s" % (eno, obsid, err)
Reto Da Forno's avatar
Reto Da Forno committed
351
352
353
354
355
356
357
358
            errors.append(msg)
    
    try:
        FetchObsThread_queue.put(item=result[1], block=True, timeout=10)
    except queue.Full:
        msg = "Queue full after processing element"
        logger.error(msg)
    return 0
359
360
361
362
363
364
365
366
367
### END worker_callback


##############################################################################
#
# LogQueueThread
#
##############################################################################
class LogQueueThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
    """    Thread which logs from queue to logfile.
    """ 
    def __init__(self, logqueue, logger, stopEvent):
        threading.Thread.__init__(self) 
        self._logger        = logger
        self._stopEvent        = stopEvent
        self._logqueue        = logqueue

    def run(self):
        self._logger.info("LogQueueThread started")
            
        # Let thread run until someone calls terminate() on it:
        while not self._stopEvent.is_set():
            try:
                (loggername, loglevel, msg) = self._logqueue.get(block=True, timeout=1)
                self._logger.log(loglevel, loggername + msg)
            except queue.Empty:
                pass
386
        
Reto Da Forno's avatar
Reto Da Forno committed
387
388
        # Stop the process:
        self._logger.info("LogQueueThread stopped")
389
390
391
392
393
394
395
396
397
### END LogQueueThread


##############################################################################
#
# FetchObsThread
#
##############################################################################
class FetchObsThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
398
399
    """    Thread which downloads database files from an observer to the server.
    """ 
Reto Da Forno's avatar
Reto Da Forno committed
400
    def __init__(self, obsid, obsethernet, dirname, debugdirname, workQueue, stopEvent):
Reto Da Forno's avatar
Reto Da Forno committed
401
402
        threading.Thread.__init__(self) 
        self._obsid            = obsid
403
404
405
        self._obsethernet      = obsethernet
        self._obsfiledir       = dirname
        self._obsfiledebugdir  = debugdirname
Reto Da Forno's avatar
Reto Da Forno committed
406
407
        self._workQueue        = workQueue
        self._stopEvent        = stopEvent
408
        self._logger           = logger
Reto Da Forno's avatar
Reto Da Forno committed
409
        
Reto Da Forno's avatar
Reto Da Forno committed
410
411
        self._min_sleep        = flocklab.config.getint("fetcher", "min_sleeptime")
        self._max_randsleep    = flocklab.config.getint("fetcher", "max_rand_sleeptime")
412
        self._obstestresfolder = "%s/%d" % (flocklab.config.get("observer", "testresultfolder"), testid)
Reto Da Forno's avatar
Reto Da Forno committed
413
414
415
        
    def run(self):
        try:
416
            self._loggerprefix = "(FetchObsThread.Obs%d) "%self._obsid
Reto Da Forno's avatar
Reto Da Forno committed
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
            self._logger.info(self._loggerprefix + "FetchObsThread starting...")
            removelast = True
                
            # Let thread run until someone calls terminate() on it:
            while removelast == True:
                """ Get data from the observer over SCP. 
                Then request data from the observer and store it in the server's filesystem. 
                Then sleep some random time before fetching data again.
                """
                # Wait for some random time:
                waittime =self._min_sleep + random.randrange(0,self._max_randsleep)
                #DEBUG self._logger.debug(self._loggerprefix + "Going to wait for %d seconds" %(waittime))
                self._stopEvent.wait(waittime) # The wait will be interrupted if the stop signal has been set causing the thread to download all remaining files
                if self._stopEvent.is_set():
                    removelast = False
                #self._logger.debug(self._loggerprefix + "Woke up")
                # Get list of available files
434
                cmd = ['ssh' ,'%s'%(self._obsethernet), "ls %s/" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
435
436
437
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)  # universal_newlines makes sure that a string is returned instead of a byte object
                out, err = p.communicate(None)
                rs = p.returncode
438
                if (rs == flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
439
                    services = {}
440
                    for servicename in [ "gpio_setting", "gpio_monitor", "powerprofiling", "serial" ]:
Reto Da Forno's avatar
Reto Da Forno committed
441
442
443
444
445
446
447
                        services[servicename] = ServiceInfo(servicename)
                        services["error_%s"%servicename] = ServiceInfo("error_%s"%servicename)
                    # Read filenames
                    for dbfile in out.split():
                        # Check name and append to corresponding list
                        for service in services.values():
                            if service.matchFileName(dbfile):
448
                                service.addFile("%s/%s" % (self._obstestresfolder, dbfile))
Reto Da Forno's avatar
Reto Da Forno committed
449
450
451
452
453
454
455
456
                                break
                    copyfilelist = []
                    # Remove latest from each list as the observer might still be writing into it (unless stop event has been set).
                    for service in services.values():
                        service.stripFileList(removelast)
                        for dbfile in service.files:
                            copyfilelist.append(dbfile)
                        #if (len(service.files) > 0):
457
                        #    self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (" ".join(service.files), service.servicename))
Reto Da Forno's avatar
Reto Da Forno committed
458
459
460
                    
                    if len(copyfilelist) > 0:
                        # Download the database files:
Reto Da Forno's avatar
Reto Da Forno committed
461
                        self._logger.debug(self._loggerprefix + "Downloading results files %s" % (" ".join(copyfilelist)))
Reto Da Forno's avatar
Reto Da Forno committed
462
                        cmd = ['scp', '-q' ]
Reto Da Forno's avatar
Reto Da Forno committed
463
                        cmd.extend(["%s:%s" % (self._obsethernet, x) for x in copyfilelist])
464
                        cmd.append("%s/" % self._obsfiledir)
Reto Da Forno's avatar
Reto Da Forno committed
465
466
467
468
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
                        if rs != 0:
Reto Da Forno's avatar
Reto Da Forno committed
469
                            self._logger.debug(self._loggerprefix + "Could not download all results files from observer. Dataloss occurred for this observer.")
Reto Da Forno's avatar
Reto Da Forno committed
470
                            self._logger.debug(self._loggerprefix + "Tried to execute %s, result was %d, stdout: %s, error: %s" % (str(cmd), rs, out, err))
Reto Da Forno's avatar
Reto Da Forno committed
471
                        else:
Reto Da Forno's avatar
Reto Da Forno committed
472
                            self._logger.debug("Downloaded results files from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
473
474
475
                            # put a copy to the debug directory
                            for f in copyfilelist:
                                fname = os.path.basename(f)
Reto Da Forno's avatar
Reto Da Forno committed
476
                                shutil.copyfile("%s/%s" % (self._obsfiledir, fname), "%s/%s" % (self._obsfiledebugdir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
477
478
479
480
481
482
483
484
                            # Tell the fetcher to start working on the files:
                            for f in copyfilelist:
                                fname = os.path.basename(f)
                                try:
                                    self._workQueue.put(item=(ITEM_TO_PROCESS, self._obsid, self._obsfiledir, fname, None), block=True, timeout=10)
                                except queue.Full:
                                    # Make sure the file is downloaded again at a later point:
                                    copyfilelist.remove(f)
Reto Da Forno's avatar
Reto Da Forno committed
485
486
                                    os.unlink("%s/%s" % (self._obsfiledir, fname))
                                    self._logger.warn(self._loggerprefix + "FetchObsThread queue is full. Cannot put %s/%s on it." % (self._obsfiledir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
487
488
                            # Remove remote files if any are left:
                            if (len(copyfilelist) > 0):
489
                                cmd = ['ssh' ,'%s'%(self._obsethernet), "cd %s;"%self._obstestresfolder, "rm"]
Reto Da Forno's avatar
Reto Da Forno committed
490
491
492
493
494
                                cmd.extend(copyfilelist)
                                self._logger.debug(self._loggerprefix + "Removing files on observer...")
                                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                                out, err = p.communicate(None)
                                rs = p.wait()
495
                                if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
496
                                    self._logger.error(self._loggerprefix + "Could not remove files on observer, result was %d, error: %s" % (rs, err))
Reto Da Forno's avatar
Reto Da Forno committed
497
498
499
500
                            else:
                                self._logger.debug(self._loggerprefix + "No files left to delete on observer.")
                    else:
                        self._logger.debug(self._loggerprefix + "No files to download from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
501

Reto Da Forno's avatar
Reto Da Forno committed
502
                    if removelast == False: # this is the last execution of the while loop
503
                        cmd = ['ssh' ,'%s'%(self._obsethernet), "rm -rf %s" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
504
505
506
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
507
                        if (rs != flocklab.SUCCESS):
508
                            self._logger.error(self._loggerprefix + "Could not remove results directory from observer, result was %d, stdout: %s, error: %s" % (rs, out, err))
Reto Da Forno's avatar
Reto Da Forno committed
509

Reto Da Forno's avatar
Reto Da Forno committed
510
                else:
Reto Da Forno's avatar
Reto Da Forno committed
511
512
                    self._logger.error(self._loggerprefix + "SSH to observer did not succeed, result was %d, error: %s" % (rs, err))
                    break   # abort
Reto Da Forno's avatar
Reto Da Forno committed
513
514
        
        except:
Reto Da Forno's avatar
Reto Da Forno committed
515
            logger.error(self._loggerprefix + "FetchObsThread crashed: %s, %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
Reto Da Forno's avatar
Reto Da Forno committed
516
517
518
        
        # Stop the process:
        self._logger.info(self._loggerprefix + "FetchObsThread stopped")
519
520
521
522
523
524
525
526
527
### END FetchObsThread


##############################################################################
#
# Start Fetcher
#
##############################################################################
def start_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
528
529
530
531
532
533
534
535
536
537
538
539
540
    global obsfiledir
    global FetchObsThread_list
    global FetchObsThread_queue
    global FetchObsThread_stopEvent
    global obsfetcher_dict
    
    # Daemonize the process ---
    daemon.daemonize(None, closedesc=False)
    logger.info("Daemon started")
    logger.info("Going to fetch data for test ID %d" %testid)
        
    # Get needed metadata from database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
541
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
542
    except:
Reto Da Forno's avatar
Reto Da Forno committed
543
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
544
    try:
545
546
547
548
549
        cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address 
                       FROM `tbl_serv_observer` AS `a` 
                       LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk 
                       WHERE `b`.test_fk = %d GROUP BY `a`.observer_id;
                    """ % testid)
Reto Da Forno's avatar
Reto Da Forno committed
550
551
    except MySQLdb.Error as err:
        msg = str(err)
Reto Da Forno's avatar
Reto Da Forno committed
552
        flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
553
554
555
556
557
558
559
560
561
562
563
564
    except:
        logger.warn("Error %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
    rs = cur.fetchall()
    cur.close()
    cn.close()
    logger.debug("Got list of FlockLab observers from database.")
    if not rs:
        logger.info("No observers found for this test. Nothing has to be done, thus exiting...")
        return errno.ENODATA
        
    # Start fetcher threads ---
    # Create a directory structure to store the downloaded files from the DB:
565
    obsfiledir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
566
567
    if not os.path.exists(obsfiledir):
        os.makedirs(obsfiledir)
568
    obsfiledebugdir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_debug_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
569
570
571
572
573
574
575
576
577
    if not os.path.exists(obsfiledebugdir):
        os.makedirs(obsfiledebugdir)
        #DEBUG logger.debug("Created %s"%obsfiledir)
    # Start one fetching thread per observer
    FetchObsThread_stopEvent = threading.Event()
    FetchObsThread_queue = queue.Queue(maxsize=10000)
    for observer in rs:
        obsid = int(observer[0])
        # Create needed directories:
Reto Da Forno's avatar
Reto Da Forno committed
578
        dirname = "%s/%d" % (obsfiledir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
579
580
        if (not os.path.exists(dirname)):
            os.makedirs(dirname)
Reto Da Forno's avatar
Reto Da Forno committed
581
        debugdirname = "%s/%d" % (obsfiledebugdir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
582
583
584
585
        if (not os.path.exists(debugdirname)):
            os.makedirs(debugdirname)
        # Start thread: 
        try:
Reto Da Forno's avatar
Reto Da Forno committed
586
            thread = FetchObsThread(obsid, observer[1], dirname, debugdirname, FetchObsThread_queue, FetchObsThread_stopEvent)
Reto Da Forno's avatar
Reto Da Forno committed
587
588
            FetchObsThread_list.append(thread)
            thread.start()
Reto Da Forno's avatar
Reto Da Forno committed
589
            logger.debug("Started fetcher thread for observer %d" % (obsid))
Reto Da Forno's avatar
Reto Da Forno committed
590
        except:
Reto Da Forno's avatar
Reto Da Forno committed
591
592
            logger.warn("Error when starting fetcher thread for observer %d: %s, %s" % (obsid, str(sys.exc_info()[0]), str(sys.exc_info()[1])))
            continue
Reto Da Forno's avatar
Reto Da Forno committed
593
    
594
    return flocklab.SUCCESS
595
596
597
598
599
600
601
602
603
### END start_fetcher


##############################################################################
#
# Stop Fetcher
#
##############################################################################
def stop_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
604
605
606
607
608
609
610
    # Get oldest running instance of the fetcher for the selected test ID which is the main process and send it the terminate signal:
    try:
        pid = flocklab.get_fetcher_pid(testid)
        # Signal the process to stop:
        if (pid > 0):
            # Do not stop this instance if it is the only one running:
            if (pid == os.getpid()):
Reto Da Forno's avatar
Reto Da Forno committed
611
612
                raise ValueError
            logger.debug("Sending SIGTERM signal to process %d" % pid)
Reto Da Forno's avatar
Reto Da Forno committed
613
614
615
            try:
                os.kill(pid, signal.SIGTERM)
                # wait for process to finish (timeout..)
Reto Da Forno's avatar
Reto Da Forno committed
616
                shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
Reto Da Forno's avatar
Reto Da Forno committed
617
618
619
620
621
                pidpath = "/proc/%d"%pid
                while os.path.exists(pidpath) & (shutdown_timeout>0):
                    time.sleep(1)
                    shutdown_timeout = shutdown_timeout - 1
                if os.path.exists(pidpath):
622
623
624
625
                    logger.error("Fetcher is still running, killing process...")
                    # send kill signal
                    os.kill(pid, signal.SIGKILL)
                    raise ValueError
Reto Da Forno's avatar
Reto Da Forno committed
626
627
628
            except:
                pass
        else:
Reto Da Forno's avatar
Reto Da Forno committed
629
            raise ValueError
630
    except ValueError:
Reto Da Forno's avatar
Reto Da Forno committed
631
632
633
        # Set DB status in order to allow dispatcher and scheduler to go on.:
        logger.debug("Setting test status in DB to 'synced'...")
        try:
Reto Da Forno's avatar
Reto Da Forno committed
634
            (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
635
636
637
638
            flocklab.set_test_status(cur, cn, testid, 'synced')
            cur.close()
            cn.close()
        except:
Reto Da Forno's avatar
Reto Da Forno committed
639
            logger.warn("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
640
641
642
        
        return errno.ENOPKG
    
643
    return flocklab.SUCCESS
644
645
646
647
648
649
650
651
652
### END stop_fetcher


##############################################################################
#
# Class WorkManager
#
##############################################################################
class WorkManager():
Reto Da Forno's avatar
Reto Da Forno committed
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
    def __init__(self):
        self.worklist = {}
        self.pattern = re.compile("_[0-9].*")
        self.workcount = 0
        
    def _next_item_with_state(self, service, obsid):
        stateitem = list(self.worklist[service][obsid][1][0])
        stateitem[4] = self.worklist[service][obsid][0]
        return tuple(stateitem)
        
    def add(self, item):
        service = self.pattern.sub("",item[3])
        obsid = item[1]
        if service not in self.worklist:
            self.worklist[service] = {}
        if obsid not in self.worklist[service]:
            self.worklist[service][obsid] = [None, []] # workerstate / worklist
        # if list is empty, we're good to process, otherwise just append it and return None
        if len(self.worklist[service][obsid][1]) == 0:
            self.worklist[service][obsid][1].append(item)
            self.workcount = self.workcount + 1
            return self._next_item_with_state(service, obsid)
        else:
            self.worklist[service][obsid][1].append(item)
            self.workcount = self.workcount + 1
            return None
        
    def done(self, item):
        service = self.pattern.sub("",item[3])
        obsid = item[1]
        if item[1:-1] == self.worklist[service][obsid][1][0][1:-1]:
            self.worklist[service][obsid][0] = item[4] # save state
            self.worklist[service][obsid][1].pop(0)
            self.workcount = self.workcount - 1
        else:
            logger.error("work done for item that was not enqueued: %s" % str(item))
        # if there is more work to do, return next item
        if len(self.worklist[service][obsid][1]) > 0:
            return self._next_item_with_state(service, obsid)
        else:
            return None
    
    def finished(self):
        return self.workcount == 0
    
698
699
700
701
702
703
704
705
706
### END WorkManager


##############################################################################
#
# Usage
#
##############################################################################
def usage():
Reto Da Forno's avatar
Reto Da Forno committed
707
    print("Usage: %s --testid=<int> [--stop] [--debug] [--help]" % __file__)
Reto Da Forno's avatar
Reto Da Forno committed
708
709
710
711
712
    print("Options:")
    print("  --testid=<int>\t\tTest ID of test to which incoming data belongs.")
    print("  --stop\t\t\tOptional. Causes the program to stop a possibly running instance of the fetcher.")
    print("  --debug\t\t\tOptional. Print debug messages to log.")
    print("  --help\t\t\tOptional. Print this help.")
713
714
715
716
717
718
719
720
721
### END usage()


##############################################################################
#
# Main
#
##############################################################################
def main(argv):
Reto Da Forno's avatar
Reto Da Forno committed
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
    
    ### Get global variables ###
    global logger
    global debug
    global testid
    global testresultsdir
    global testresultsfile_dict
    global owner_fk
    global pindict
    global obsdict_byid
    global servicedict
    global serialdict
    
    stop = False
    
    # Get logger:
Reto Da Forno's avatar
Reto Da Forno committed
738
    logger = flocklab.get_logger()
Reto Da Forno's avatar
Reto Da Forno committed
739
740
        
    # Get the config file ---
741
    flocklab.load_config()
Reto Da Forno's avatar
Reto Da Forno committed
742
743

    # Get command line parameters ---
744
    try:
Reto Da Forno's avatar
Reto Da Forno committed
745
746
747
748
749
750
751
752
        opts, args = getopt.getopt(argv, "hedt:", ["help", "stop", "debug", "testid="])
    except getopt.GetoptError as err:
        print(str(err))
        logger.warn(str(err))
        usage()
        sys.exit(errno.EINVAL)
    except:
        msg = "Error when getting arguments: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
753
        flocklab.error_logandexit(msg, errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
754
755
756
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
757
            sys.exit(flocklab.SUCCESS)
Reto Da Forno's avatar
Reto Da Forno committed
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
        elif opt in ("-d", "--debug"):
            debug = True
            logger.setLevel(logging.DEBUG)
        elif opt in ("-t", "--testid"):
            try:
                testid = int(arg)
            except ValueError:
                err = "Wrong API usage: testid has to be integer"
                print(str(err))
                logger.warn(str(err))
                usage()
                sys.exit(errno.EINVAL)
        elif opt in ("-e", "--stop"):
            stop = True
        else:
            print("Wrong API usage")
            logger.warn("Wrong API usage")
            sys.exit(errno.EINVAL)
    
    # Check if the necessary parameters are set ---
    if not testid:
        print("Wrong API usage")
        logger.warn("Wrong API usage")
        sys.exit(errno.EINVAL)
        
    # Check if the Test ID exists in the database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
785
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
786
    except:
Reto Da Forno's avatar
Reto Da Forno committed
787
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
788
789
790
791
792
793
    rs = flocklab.check_test_id(cur, testid)
    cur.close()
    cn.close()
    if rs != 0:
        if rs == 3:
            msg = "Test ID %d does not exist in database." %testid
Reto Da Forno's avatar
Reto Da Forno committed
794
            flocklab.error_logandexit(msg, errno.EINVAL)
Reto Da Forno's avatar
Reto Da Forno committed
795
        else:
Reto Da Forno's avatar
Reto Da Forno committed
796
            msg = "Error when trying to get test ID from database: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
797
            flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
798
799
800
801
802
        
    # Add Test ID to logger name ---
    logger.name += " (Test %d)"%testid
    
    # Start / stop the fetcher ---
803
    ret = flocklab.SUCCESS
Reto Da Forno's avatar
Reto Da Forno committed
804
805
806
    if stop:
        ret = stop_fetcher()
        logger.info("FlockLab fetcher stopped.")
Reto Da Forno's avatar
Reto Da Forno committed
807
808
809
810
811
812
        sys.exit(ret)

    # Start the fetcher processes which download data from the observers: 
    ret = start_fetcher()
    if ret == flocklab.SUCCESS:
        logger.info("FlockLab fetcher started.")
Reto Da Forno's avatar
Reto Da Forno committed
813
    else:
Reto Da Forno's avatar
Reto Da Forno committed
814
815
816
817
818
819
820
821
        msg = "Start function returned error. Exiting..."
        os.kill(os.getpid(), signal.SIGTERM)
        rs = flocklab.error_logandexit(msg, ret)
        
    # Get needed metadata ---
    try:
        (cn, cur) = flocklab.connect_to_db()
    except:
Reto Da Forno's avatar
Reto Da Forno committed
822
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
    rs = flocklab.get_test_owner(cur, testid)
    if isinstance(rs, tuple):
        owner_fk = rs[0]
    else:
        owner_fk = None
    rs = flocklab.get_pinmappings(cur)
    if isinstance(rs, dict):
        pindict = rs
    else:
        pindict = None
    rs = flocklab.get_test_obs(cur, testid)
    if isinstance(rs, tuple):
        obsdict_byid = rs[1]
    else:
        obsdict_byid = None
    # Dict for serial service: 'r' means reader (data read from the target), 'w' means writer (data written to the target):
    serialdict = {0: 'r', 1: 'w'}
    # Get calibration data for used slots and add it to obsdict ---
    ppstats={}
    for (obsid, (obskey, nodeid)) in obsdict_byid.items():
        ppstats[obsid]=(0.0,0)
        rs = flocklab.get_slot_calib(cur, int(obskey), testid)
        if isinstance(rs, tuple):
            obsdict_byid[obsid] = (nodeid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
847
        else:
Reto Da Forno's avatar
Reto Da Forno committed
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
            obsdict_byid = None
            break
    rs = flocklab.get_servicemappings(cur)
    if isinstance(rs, dict):
        servicedict = rs
    else:
        servicedict = None
    
    #find out the start and stoptime of the test
    cur.execute("SELECT `time_start_wish`, `time_end_wish` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
    # Times are going to be of datetime type:
    ret = cur.fetchone() 
    teststarttime = ret[0]
    teststoptime  = ret[1]
    FlockDAQ = False
    
    # Find out which services are used to allocate working threads later on ---
    # Get the XML config from the database and check which services are used in the test.
    servicesUsed_dict = {'gpiotracing': 'gpioTracingConf', 'gpioactuation': 'gpioActuationConf', 'powerprofiling': 'powerProfilingConf', 'serial': 'serialConf'}
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
    ret = cur.fetchone()
    if not ret:
        msg = "No XML found in database for testid %d." %testid
        errors.append(msg)
        logger.error(msg)
        for service, xmlname in servicesUsed_dict.items():
            servicesUsed_dict[service] = True
    else:
Reto Da Forno's avatar
Reto Da Forno committed
876
        try:
Reto Da Forno's avatar
Reto Da Forno committed
877
878
879
880
881
            logger.debug("Got XML from database.")
            parser = lxml.etree.XMLParser(remove_comments=True)
            tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
            ns = {'d': flocklab.config.get('xml', 'namespace')}
            for service, xmlname in servicesUsed_dict.items():
882
                if tree.xpath('//d:%s' % xmlname, namespaces=ns):
Reto Da Forno's avatar
Reto Da Forno committed
883
                    servicesUsed_dict[service] = True
884
                    logger.debug("Usage of %s detected." % service)
Reto Da Forno's avatar
Reto Da Forno committed
885
886
                else:
                    servicesUsed_dict[service] = False
Reto Da Forno's avatar
Reto Da Forno committed
887
        except:
Reto Da Forno's avatar
Reto Da Forno committed
888
889
890
891
892
893
894
895
896
897
898
899
900
901
            msg = "XML parsing failed: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
            errors.append(msg)
            logger.error(msg)
    
    cur.close()
    cn.close()
    if ((owner_fk==None) or (pindict==None) or (obsdict_byid==None) or (servicedict==None)):
        msg = "Error when getting metadata.\n"
        msg += "owner_fk: %s\npindict: %s\nobsdict_byid: %s\nservicedict: %s\n" % (str(owner_fk), str(pindict), str(obsdict_byid), str(servicedict))
        msg += "Exiting..."
        logger.debug(msg)
        os.kill(os.getpid(), signal.SIGTERM)
        flocklab.error_logandexit(msg, errno.EAGAIN)
    else:
Reto Da Forno's avatar
Reto Da Forno committed
902
        logger.debug("Got all needed metadata.")
Reto Da Forno's avatar
Reto Da Forno committed
903
904
905
906
907
908
909
910
911
912
913
914
915
    
    # Start aggregating processes ---
    """    There is an infinite loop which gets files to process from the fetcher threads which download data from the observers.
        Downloaded data is then assigned to a worker process for the corresponding service and in the worker process parsed,
        converted (if needed) and aggregated into a single file per service.
        The loop is stopped if the program receives a stop signal. In this case, the loops runs until no more database files 
        are there to be processed.
    """
    if __name__ == '__main__':
        # Create directory and files needed for test results:
        testresultsdir = "%s/%d" %(flocklab.config.get('fetcher', 'testresults_dir'), testid)
        if not os.path.exists(testresultsdir):
            os.makedirs(testresultsdir)
916
            logger.debug("Created %s" % testresultsdir)
Reto Da Forno's avatar
Reto Da Forno committed
917
918
        manager = multiprocessing.Manager()
        for service in ('errorlog', 'gpiotracing', 'gpioactuation', 'powerprofiling', 'serial', 'powerprofilingstats'):
919
            path = "%s/%s.csv" % (testresultsdir, service)
Reto Da Forno's avatar
Reto Da Forno committed
920
921
922
923
924
925
926
927
928
929
            lock = manager.Lock()
            testresultsfile_dict[service] = (path, lock)
            # Create file and write header:
            if service == 'errorlog':
                header = '# timestamp,observer_id,node_id,errormessage\n'
            elif service == 'gpiotracing':
                header = 'observer_id,node_id,pin_name,# timestamp,value\n'
            elif service == 'gpioactuation':
                header = '# timestamp_planned,timestamp_executed,observer_id,node_id,pin_name,value\n'
            elif service == 'powerprofiling':
Reto Da Forno's avatar
Reto Da Forno committed
930
                header = 'timestamp,observer_id,node_id,I1,V1,V2\n'
Reto Da Forno's avatar
Reto Da Forno committed
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
            elif service == 'serial':
                header = '# timestamp,observer_id,node_id,direction,output\n'
            elif service == 'powerprofilingstats':
                header = '# observer_id,node_id,mean_mA\n'
            lock.acquire()
            f = open(path, 'w')
            f.write(header)
            f.close()
            lock.release()
        # Start logging thread:
        logqueue = manager.Queue(maxsize=10000)
        LogQueueThread_stopEvent = threading.Event()
        try:
            thread = LogQueueThread(logqueue, logger, LogQueueThread_stopEvent)
            thread.start()
        except:
            logger.warn("Error when starting log queue thread: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
        
        PpStatsQueue = manager.Queue(maxsize=1)
        PpStatsQueue.put(ppstats)
        # Determine the number of CPU's to be used for each aggregating process. If a service is not used, its CPUs are assigned to other services
        cpus_free = 0
        cpus_errorlog = flocklab.config.getint('fetcher', 'cpus_errorlog')
        # CPUs for serial service:
        if servicesUsed_dict['serial'] == True:
            cpus_serial    = flocklab.config.getint('fetcher', 'cpus_serial')
Reto Da Forno's avatar
Reto Da Forno committed
957
        else:
Reto Da Forno's avatar
Reto Da Forno committed
958
959
960
961
962
963
            cpus_serial    = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_serial')
        # CPUs for GPIO actuation. If the service is not used, assign a CPU anyhow since FlockLab always uses this service to determine start and stop times of a test.
        #cpus_errorlog = flocklab.config.getint('fetcher', 'cpus_errorlog')
        if servicesUsed_dict['gpioactuation'] == True:
            cpus_gpiosetting = flocklab.config.getint('fetcher', 'cpus_gpiosetting')
Reto Da Forno's avatar
Reto Da Forno committed
964
        else:
Reto Da Forno's avatar
Reto Da Forno committed
965
966
967
968
969
            cpus_gpiosetting = 1
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_gpiosetting') - cpus_gpiosetting
        # CPUs for GPIO tracing:
        if servicesUsed_dict['gpiotracing'] == True:
            cpus_gpiomonitoring    = flocklab.config.getint('fetcher', 'cpus_gpiomonitoring')
Reto Da Forno's avatar
Reto Da Forno committed
970
        else:
Reto Da Forno's avatar
Reto Da Forno committed
971
972
973
974
975
            cpus_gpiomonitoring = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_gpiomonitoring')
        # CPUs for powerprofiling:
        if servicesUsed_dict['powerprofiling'] == True:
            cpus_powerprofiling    = flocklab.config.getint('fetcher', 'cpus_powerprofiling')
Reto Da Forno's avatar
Reto Da Forno committed
976
        else:
Reto Da Forno's avatar
Reto Da Forno committed
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
            cpus_powerprofiling = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_powerprofiling')
        # If there are free CPUs left, give them to GPIO tracing and power profiling evenly as these services need the most CPU power:
        if cpus_free > 0:
            if (cpus_powerprofiling > 0) and (cpus_gpiomonitoring > 0):
                # Both services are used, distribute the free CPUS evenly:
                cpus_powerprofiling = cpus_powerprofiling + int(math.ceil(float(cpus_free)/2))
                cpus_gpiomonitoring = cpus_gpiomonitoring + int(math.floor(float(cpus_free)/2))
            elif cpus_powerprofiling > 0:
                # GPIO monitoring/tracing is not used, so give all CPUs to powerprofiling:
                cpus_powerprofiling = cpus_powerprofiling + cpus_free
            elif cpus_gpiomonitoring > 0:
                # Powerprofiling is not used, so give all CPUs to GPIO monitoring/tracing:
                cpus_gpiomonitoring = cpus_gpiomonitoring + cpus_free
            else:
                # Neither of the services is used, so give it to one of the other services:
                if cpus_serial > 0:
                    cpus_serial = cpus_serial + cpus_free
                elif cpus_gpiosetting > 0:
                    cpus_gpiosetting = cpus_gpiosetting + cpus_free
        cpus_total = cpus_errorlog + cpus_serial + cpus_gpiosetting + cpus_gpiomonitoring + cpus_powerprofiling
Reto Da Forno's avatar
Reto Da Forno committed
998
        
Reto Da Forno's avatar
Reto Da Forno committed
999
1000
        service_pools_dict = { 'errorlog': cpus_errorlog, 'serial': cpus_serial, 'gpioactuation': cpus_gpiosetting, 'gpiotracing': cpus_gpiomonitoring, 'powerprofiling': cpus_powerprofiling }
        if (cpus_total > multiprocessing.cpu_count()):
For faster browsing, not all history is shown. View entire blame