flocklab_fetcher.py 52.3 KB
Newer Older
1
2
#! /usr/bin/env python3

3
import os, sys, getopt, traceback, MySQLdb, signal, random, time, errno, multiprocessing, subprocess, re, logging, __main__, threading, struct, types, queue, math, shutil, lxml.etree
4
5
import lib.daemon as daemon
import lib.flocklab as flocklab
Reto Da Forno's avatar
Reto Da Forno committed
6

7

Reto Da Forno's avatar
Reto Da Forno committed
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
logger                   = None
debug                    = False
testid                   = None 
errors                   = []
FetchObsThread_list      = []
FetchObsThread_stopEvent = None
FetchObsThread_queue     = None
obsfiledir               = None
testresultsdir           = None
testresultsfile_dict     = {}
mainloop_stop            = False
owner_fk                 = None
pindict                  = None
obsdict_byid             = None
servicedict              = None
serialdict               = None
24
25

ITEM_TO_PROCESS = 0
Reto Da Forno's avatar
Reto Da Forno committed
26
ITEM_PROCESSED  = 1
27

28

29
30
31
32
33
34
##############################################################################
#
# Error classes
#
##############################################################################
class DbFileEof(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
35
    pass
36
37

class DbFileReadError(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
38
39
40
41
    def __init__(self, expectedSize, actualSize, fpos):
        self.expectedSize = expectedSize
        self.actualSize = actualSize
        self.fpos = fpos
42
43
44
45
46
47
48
49
50
51
### END Error classes



##############################################################################
#
# Class ServiceInfo
#
##############################################################################
class ServiceInfo():
Reto Da Forno's avatar
Reto Da Forno committed
52
53
54
    def __init__(self, servicename):
        self.servicename = servicename
        self.files = []
55
        self.pattern = "^%s_[0-9]+\.[a-z]+$" % servicename
Reto Da Forno's avatar
Reto Da Forno committed
56
57
58
59
60
61
62
63
64
65
66
    
    def matchFileName(self, filename):
        return re.search(self.pattern, os.path.basename(filename)) is not None
        
    def addFile(self, filename):
        self.files.append(filename)
    
    def stripFileList(self, removelast=True):
        self.files.sort()
        if ((len(self.files) > 0) and removelast):
            self.files.pop()
67
68
69
70
71
72
73
74
75
### END ServiceInfo


##############################################################################
#
# sigterm_handler
#
##############################################################################
def sigterm_handler(signum, frame):
Reto Da Forno's avatar
Reto Da Forno committed
76
77
78
79
80
81
82
83
84
85
86
    """If the program is terminated by sending it the signal SIGTERM 
    (e.g. by executing 'kill') or SIGINT (pressing ctrl-c), 
    this signal handler is invoked for cleanup."""
    
    global mainloop_stop
    global FetchObsThread_stopEvent

    logger.info("Process received SIGTERM or SIGINT signal")
        
    # Signal all observer fetcher threads to stop:
    logger.debug("Stopping observer fetcher threads...")
Reto Da Forno's avatar
Reto Da Forno committed
87
    shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
Reto Da Forno's avatar
Reto Da Forno committed
88
89
90
91
92
93
94
95
96
97
98
99
    try:
        FetchObsThread_stopEvent.set()
    except:
        pass
    for thread in FetchObsThread_list:
        try:
            thread.join(shutdown_timeout)
        except:
            logger.warn("Fetcher thread did not stop within %d seconds." % shutdown_timeout)
    # Set DB status:
    logger.debug("Setting test status in DB to 'syncing'...")
    try:
Reto Da Forno's avatar
Reto Da Forno committed
100
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
101
102
103
104
105
106
107
108
109
        flocklab.set_test_status(cur, cn, testid, 'syncing')
        cur.close()
        cn.close()
    except:
        logger.warn("Could not connect to database")
    
    # Tell the main loop to stop:
    mainloop_stop = True
    logger.debug("Set stop signal for main loop.")
110
111
112
113
114
115
116
117
118
### END sigterm_handler


##############################################################################
#
# Functions for parsing observer DB files data
#
##############################################################################
def parse_gpio_setting(buf):
Reto Da Forno's avatar
Reto Da Forno committed
119
    _data = struct.unpack("<Iiiiii",buf) #unsigned int gpio;int value;struct timeval time_planned;struct timeval time_executed;
Reto Da Forno's avatar
Reto Da Forno committed
120
    return (_data[0], str(_data[1]), "%i.%06i" % (_data[2],_data[3]), "%i.%06i" % (_data[4],_data[5]))
121
122

def parse_gpio_monitor(buf):
Reto Da Forno's avatar
Reto Da Forno committed
123
124
125
126
    _data = str(buf).split(";")
    logger.debug("BUFFER CONTENT: %s" %str(buf))
    return (_data[0], _data[1], _data[2])
    #_data = struct.unpack("<Iiii",buf) #unsigned int gpio;enum en_edge edge;struct timeval timestamp;
Reto Da Forno's avatar
Reto Da Forno committed
127
    #return (_data[0], str(_data[1]), "%i.%06i" % (_data[2],_data[3]))
128
129

def parse_serial(buf):
Reto Da Forno's avatar
Reto Da Forno committed
130
    _data = struct.unpack("iii%ds" % (len(buf) - 12),buf) #int service; struct timeval timestamp;char * data
Reto Da Forno's avatar
Reto Da Forno committed
131
    return (_data[0], _data[3], "%i.%06i" % (_data[1],_data[2]))
Reto Da Forno's avatar
Reto Da Forno committed
132
    
133
def parse_error_log(buf):
Reto Da Forno's avatar
Reto Da Forno committed
134
    _data = struct.unpack("<iii%ds" % (len(buf) - 12),buf) #struct timeval timestamp; int service_fk; char errormessage[1024];
Reto Da Forno's avatar
Reto Da Forno committed
135
    return (str(_data[2]), _data[3], "%i.%06i" % (_data[0],_data[1]))
136
137
138
139
140
141
142
143


##############################################################################
#
# Functions for converting observer DB data
#
##############################################################################
def convert_gpio_setting(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
144
    return "%s,%s,%s,%s,%s,%s\n" %(obsdata[2], obsdata[3], observer_id, node_id, pindict[obsdata[0]][0], obsdata[1])
145
146

def convert_gpio_monitor(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
147
    return "%s,%s,%s,%s,%s\n" %(obsdata[1], observer_id, node_id, obsdata[0], obsdata[2])
148
149

def convert_serial(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
150
    return "%s,%s,%s,%s,%s\n" %(obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1])
151
152

def convert_error_log(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
153
    return "%s,%s,%s,%s\n" %(obsdata[2], observer_id, node_id, obsdata[1])
154
155
156
157
158
159
160



##############################################################################
#
# read_from_db_file: Read from an open DB file from an observer
#
161
##############################################################################
162
def read_from_db_file(dbfile):
Reto Da Forno's avatar
Reto Da Forno committed
163
164
165
166
167
168
169
170
171
172
173
174
    _buf = dbfile.read(4)
    if len(_buf) < 4:
        dbfile.close()
        raise DbFileEof()
    else:
        _size = struct.unpack("<I",_buf)
        _buf = dbfile.read(_size[0])
        if len(_buf) != _size[0]:
            _fpos = dbfile.tell() - 4 - len(_buf)
            dbfile.close()
            raise DbFileReadError(_size[0], len(_buf), _fpos)
        return _buf
175
176
177
178
179
180
### END read_from_db_file


##############################################################################
#
# worker_convert_and_aggregate: Worker function for multiprocessing pools.
Reto Da Forno's avatar
Reto Da Forno committed
181
182
#        Parses observer DB files for all services, converts values (if needed)
#        and aggregates them into single test result files.
183
184
185
#
##############################################################################
def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, commitsize=1, vizimgdir=None, parse_f=None, convert_f=None, viz_f=None, logqueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
186
187
188
189
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
Reto Da Forno's avatar
Reto Da Forno committed
190
191
        obsdbfile_path = "%s/%s" % (fdir,f)
        loggername = "(%s).(Observer %d)" % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
        #logqueue.put_nowait((loggername, logging.DEBUG, "Import file %s"%obsdbfile_path))
        # Open file:
        dbfile = open(obsdbfile_path, 'rb')
        rows = 0
        viz_values = []
        conv_values = []
        while not dbfile.closed:
            # Process DB file line by line:
            try:
                # Parse one line:
                buf = read_from_db_file(dbfile)
                obsdata = parse_f(buf)
                viz_values.append(obsdata)
                # Convert data if needed:
                if convert_f != None:
                    conv_data = convert_f(obsdata, obsid, nodeid)
                    conv_values.append(conv_data)
                    rows += 1
                # Visualize data:
                if (commitsize > 0) & (rows >= commitsize):
                    if viz_f != None:
                        #logqueue.put_nowait((loggername, logging.DEBUG, "Viz started..."))
                        viz_f(testid, owner_fk, viz_values, obsid, vizimgdir, logger)
                        #logqueue.put_nowait((loggername, logging.DEBUG, "Viz done."))
                    # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
217
                    #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for writing..." % (resultfile_path)))
Reto Da Forno's avatar
Reto Da Forno committed
218
219
220
221
222
                    resultfile_lock.acquire()
                    f = open(resultfile_path, 'a')
                    f.writelines(conv_values)
                    f.close()
                    resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
223
                    logqueue.put_nowait((loggername, logging.DEBUG, "Committed results to %s after %d rows" % (resultfile_path, rows)))
Reto Da Forno's avatar
Reto Da Forno committed
224
225
226
227
228
229
230
231
232
233
                    rows = 0
                    conv_values = []
                    viz_values = []
            except DbFileEof:
                # logqueue.put_nowait((loggername, logging.DEBUG, "DbFileEof has occurred."))
                break # dbfile has been closed in parser (most likely because EOF was reached)
            except DbFileReadError as err:
                msg = "%s: Packet size (%i) did not match payload size (%i) @ %d." %(obsdbfile_path, err.expectedSize, err.actualSize, err.fpos)
                _errors.append((msg, errno.EIO, obsid))
                logqueue.put_nowait((loggername, logging.ERROR, msg))
234
235
236
237
238
239
240
        if (len(conv_values) > 0):
            # There is still data left. Do a last commit:
            if (viz_f != None) and (len(viz_values) > 0):
                #logqueue.put_nowait((loggername, logging.DEBUG, "Viz started..."))
                viz_f(testid, owner_fk, viz_values, obsid, vizimgdir, logger)
                #logqueue.put_nowait((loggername, logging.DEBUG, "Viz done."))
            # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
241
            #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for final writing..." % (resultfile_path)))
242
243
244
245
246
            resultfile_lock.acquire()
            f = open(resultfile_path, 'a')
            f.writelines(conv_values)
            f.close()
            resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
247
            logqueue.put_nowait((loggername, logging.DEBUG, "Committed final results to %s after %d rows" % (resultfile_path, rows)))
248
        # Remove processed file:
Reto Da Forno's avatar
Reto Da Forno committed
249
        #logqueue.put_nowait((loggername, logging.DEBUG, "Remove %s" % (obsdbfile_path)))
250
        os.unlink(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
251
252
253
254
255
256
257
258
    except:
        msg = "Error in worker process: %s: %s\n%s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
259
260
261
### END worker_convert_and_aggregate


Reto Da Forno's avatar
Reto Da Forno committed
262
263
264
265
266
267
268
269
270
271
##############################################################################
#
# worker_gpiotracing: Worker function for converting and aggregating gpio
#               tracing data. Unlike for the other services, this function works on
#               whole observer DB files.
#
##############################################################################
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, slotcalib_factor=1, slotcalib_offset=0, vizimgdir=None, viz_f=None, logqueue=None):
    try:
        _errors = []
272
        cur_p = multiprocessing.current_process()
Reto Da Forno's avatar
Reto Da Forno committed
273
        (itemtype, obsid, fdir, f, workerstate) = queueitem
274
275
        obsdbfile_path = "%s/%s" % (fdir, f)
        loggername = "(%s).(Observer %d)" % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
276
277

        with open(resultfile_path, "a") as outfile:
278
279
280
281
282
283
284
285
286
287
            infile = open(obsdbfile_path, "r")
            for line in infile:
                try:
                    (timestamp, pin, level) = line.split(',')
                    outfile.write("%s,%s,%s,%s,%s" % (timestamp, obsid, nodeid, pin, level))
                except ValueError:
                    logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
                    break
            infile.close()
        os.remove(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303

        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED

        return (_errors, processeditem)
    except:
        msg = "Error in gpiotracing worker process: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_gpiotracing


304
305
306
##############################################################################
#
# worker_powerprof: Worker function for converting and aggregating power
Reto Da Forno's avatar
Reto Da Forno committed
307
308
#        profiling data. Unlike for the other services, this function works on
#        whole observer DB files.
309
310
311
#
##############################################################################
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, slotcalib_factor=1, slotcalib_offset=0, vizimgdir=None, viz_f=None, logqueue=None, PpStatsQueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
312
313
314
315
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
316
317
318
319
320
321
322
        obsdbfile_path = "%s/%s" % (fdir, f)
        loggername = "(%s.Obs%d) " % (cur_p.name, obsid)
        resultsfile = "%s_%s.%s" % (os.path.splitext(resultfile_path)[0], obsid, os.path.splitext(resultfile_path)[1])
        # TODO for now, just move the file without modifying it
        os.rename(obsdbfile_path, resultsfile)
        logger.debug("File '%s' moved to '%s'." % (obsdbfile_path, resultsfile))
        '''
Reto Da Forno's avatar
Reto Da Forno committed
323
324
325
326
327
328
329
        with open(resultfile_path, "a") as outfile:
            infile = open(obsdbfile_path, "r")
            for line in infile:
                outfile.write("%s,%s,%s" % (obsid, nodeid, str(line)))
            infile.close()
        
        os.remove(obsdbfile_path)
330
        '''
Reto Da Forno's avatar
Reto Da Forno committed
331
332
333
334
335
336
337
338
339
340
341
342
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        
        return (_errors, processeditem)
    except:
        msg = "Error in powerprof worker process: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
343
344
345
346
347
348
### END worker_powerprof


##############################################################################
#
# worker_callback: Callback function which reports errors from worker processes
Reto Da Forno's avatar
Reto Da Forno committed
349
#        back to the main process
350
351
352
#
##############################################################################
def worker_callback(result):
Reto Da Forno's avatar
Reto Da Forno committed
353
354
355
356
357
    global errors
    global FetchObsThread_queue
    
    if len(result[0]) > 0:
        for (err, eno, obsid) in result:
Reto Da Forno's avatar
Reto Da Forno committed
358
            msg = "Error %d when processing results for Observer ID %s: %s" % (eno, obsid, err)
Reto Da Forno's avatar
Reto Da Forno committed
359
360
361
362
363
364
365
366
            errors.append(msg)
    
    try:
        FetchObsThread_queue.put(item=result[1], block=True, timeout=10)
    except queue.Full:
        msg = "Queue full after processing element"
        logger.error(msg)
    return 0
367
368
369
370
371
372
373
374
375
### END worker_callback


##############################################################################
#
# LogQueueThread
#
##############################################################################
class LogQueueThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
    """    Thread which logs from queue to logfile.
    """ 
    def __init__(self, logqueue, logger, stopEvent):
        threading.Thread.__init__(self) 
        self._logger        = logger
        self._stopEvent        = stopEvent
        self._logqueue        = logqueue

    def run(self):
        self._logger.info("LogQueueThread started")
            
        # Let thread run until someone calls terminate() on it:
        while not self._stopEvent.is_set():
            try:
                (loggername, loglevel, msg) = self._logqueue.get(block=True, timeout=1)
                self._logger.log(loglevel, loggername + msg)
            except queue.Empty:
                pass
394
        
Reto Da Forno's avatar
Reto Da Forno committed
395
396
        # Stop the process:
        self._logger.info("LogQueueThread stopped")
397
398
399
400
401
402
403
404
405
### END LogQueueThread


##############################################################################
#
# FetchObsThread
#
##############################################################################
class FetchObsThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
406
407
    """    Thread which downloads database files from an observer to the server.
    """ 
Reto Da Forno's avatar
Reto Da Forno committed
408
    def __init__(self, obsid, obsethernet, dirname, debugdirname, workQueue, stopEvent):
Reto Da Forno's avatar
Reto Da Forno committed
409
410
        threading.Thread.__init__(self) 
        self._obsid            = obsid
411
412
413
        self._obsethernet      = obsethernet
        self._obsfiledir       = dirname
        self._obsfiledebugdir  = debugdirname
Reto Da Forno's avatar
Reto Da Forno committed
414
415
        self._workQueue        = workQueue
        self._stopEvent        = stopEvent
416
        self._logger           = logger
Reto Da Forno's avatar
Reto Da Forno committed
417
        
Reto Da Forno's avatar
Reto Da Forno committed
418
419
        self._min_sleep        = flocklab.config.getint("fetcher", "min_sleeptime")
        self._max_randsleep    = flocklab.config.getint("fetcher", "max_rand_sleeptime")
420
        self._obstestresfolder = "%s/%d" % (flocklab.config.get("observer", "testresultfolder"), testid)
Reto Da Forno's avatar
Reto Da Forno committed
421
422
423
        
    def run(self):
        try:
424
            self._loggerprefix = "(FetchObsThread.Obs%d) "%self._obsid
Reto Da Forno's avatar
Reto Da Forno committed
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
            self._logger.info(self._loggerprefix + "FetchObsThread starting...")
            removelast = True
                
            # Let thread run until someone calls terminate() on it:
            while removelast == True:
                """ Get data from the observer over SCP. 
                Then request data from the observer and store it in the server's filesystem. 
                Then sleep some random time before fetching data again.
                """
                # Wait for some random time:
                waittime =self._min_sleep + random.randrange(0,self._max_randsleep)
                #DEBUG self._logger.debug(self._loggerprefix + "Going to wait for %d seconds" %(waittime))
                self._stopEvent.wait(waittime) # The wait will be interrupted if the stop signal has been set causing the thread to download all remaining files
                if self._stopEvent.is_set():
                    removelast = False
                #self._logger.debug(self._loggerprefix + "Woke up")
                # Get list of available files
442
                cmd = ['ssh' ,'%s'%(self._obsethernet), "ls %s/" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
443
444
445
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)  # universal_newlines makes sure that a string is returned instead of a byte object
                out, err = p.communicate(None)
                rs = p.returncode
446
                if (rs == flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
447
                    services = {}
448
                    for servicename in [ "gpio_setting", "gpio_monitor", "powerprofiling", "serial" ]:
Reto Da Forno's avatar
Reto Da Forno committed
449
450
451
452
453
454
455
                        services[servicename] = ServiceInfo(servicename)
                        services["error_%s"%servicename] = ServiceInfo("error_%s"%servicename)
                    # Read filenames
                    for dbfile in out.split():
                        # Check name and append to corresponding list
                        for service in services.values():
                            if service.matchFileName(dbfile):
456
                                service.addFile("%s/%s" % (self._obstestresfolder, dbfile))
Reto Da Forno's avatar
Reto Da Forno committed
457
458
459
460
461
462
463
464
                                break
                    copyfilelist = []
                    # Remove latest from each list as the observer might still be writing into it (unless stop event has been set).
                    for service in services.values():
                        service.stripFileList(removelast)
                        for dbfile in service.files:
                            copyfilelist.append(dbfile)
                        #if (len(service.files) > 0):
465
                        #    self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (" ".join(service.files), service.servicename))
Reto Da Forno's avatar
Reto Da Forno committed
466
467
468
                    
                    if len(copyfilelist) > 0:
                        # Download the database files:
469
                        self._logger.debug(self._loggerprefix + "Downloading database files %s" % (" ".join(copyfilelist)))
Reto Da Forno's avatar
Reto Da Forno committed
470
                        cmd = ['scp', '-q' ]
Reto Da Forno's avatar
Reto Da Forno committed
471
                        cmd.extend(["%s:%s" % (self._obsethernet, x) for x in copyfilelist])
472
                        cmd.append("%s/" % self._obsfiledir)
Reto Da Forno's avatar
Reto Da Forno committed
473
474
475
476
477
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
                        if rs != 0:
                            self._logger.debug(self._loggerprefix + "Could not download all DB files from observer. Dataloss occurred for this observer.")
Reto Da Forno's avatar
Reto Da Forno committed
478
                            self._logger.debug(self._loggerprefix + "Tried to execute %s, result was %d, stdout: %s, error: %s" % (str(cmd), rs, out, err))
Reto Da Forno's avatar
Reto Da Forno committed
479
                        else:
480
                            self._logger.debug("Downloaded result DB files from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
481
482
483
                            # put a copy to the debug directory
                            for f in copyfilelist:
                                fname = os.path.basename(f)
Reto Da Forno's avatar
Reto Da Forno committed
484
                                shutil.copyfile("%s/%s" % (self._obsfiledir, fname), "%s/%s" % (self._obsfiledebugdir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
485
486
487
488
489
490
491
492
                            # Tell the fetcher to start working on the files:
                            for f in copyfilelist:
                                fname = os.path.basename(f)
                                try:
                                    self._workQueue.put(item=(ITEM_TO_PROCESS, self._obsid, self._obsfiledir, fname, None), block=True, timeout=10)
                                except queue.Full:
                                    # Make sure the file is downloaded again at a later point:
                                    copyfilelist.remove(f)
Reto Da Forno's avatar
Reto Da Forno committed
493
494
                                    os.unlink("%s/%s" % (self._obsfiledir, fname))
                                    self._logger.warn(self._loggerprefix + "FetchObsThread queue is full. Cannot put %s/%s on it." % (self._obsfiledir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
495
496
                            # Remove remote files if any are left:
                            if (len(copyfilelist) > 0):
497
                                cmd = ['ssh' ,'%s'%(self._obsethernet), "cd %s;"%self._obstestresfolder, "rm"]
Reto Da Forno's avatar
Reto Da Forno committed
498
499
500
501
502
                                cmd.extend(copyfilelist)
                                self._logger.debug(self._loggerprefix + "Removing files on observer...")
                                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                                out, err = p.communicate(None)
                                rs = p.wait()
503
                                if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
504
                                    self._logger.error(self._loggerprefix + "Could not remove files on observer, result was %d, error: %s" % (rs, err))
Reto Da Forno's avatar
Reto Da Forno committed
505
506
507
508
509
510
                            else:
                                self._logger.debug(self._loggerprefix + "No files left to delete on observer.")
                    else:
                        self._logger.debug(self._loggerprefix + "No files to download from observer.")
                    
                    if removelast == False: # this is the last execution of the while loop
511
                        cmd = ['ssh' ,'%s'%(self._obsethernet), "rm -rf %s" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
512
513
514
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
515
                        if (rs != flocklab.SUCCESS):
516
517
                            self._logger.error(self._loggerprefix + "Could not remove results directory from observer, result was %d, stdout: %s, error: %s" % (rs, out, err))
                    
Reto Da Forno's avatar
Reto Da Forno committed
518
                else:
Reto Da Forno's avatar
Reto Da Forno committed
519
520
                    self._logger.error(self._loggerprefix + "SSH to observer did not succeed, result was %d, error: %s" % (rs, err))
                    break   # abort
Reto Da Forno's avatar
Reto Da Forno committed
521
522
        
        except:
Reto Da Forno's avatar
Reto Da Forno committed
523
            logger.error(self._loggerprefix + "FetchObsThread crashed: %s, %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
Reto Da Forno's avatar
Reto Da Forno committed
524
525
526
        
        # Stop the process:
        self._logger.info(self._loggerprefix + "FetchObsThread stopped")
527
528
529
530
531
532
533
534
535
### END FetchObsThread


##############################################################################
#
# Start Fetcher
#
##############################################################################
def start_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
536
537
538
539
540
541
542
543
544
545
546
547
548
    global obsfiledir
    global FetchObsThread_list
    global FetchObsThread_queue
    global FetchObsThread_stopEvent
    global obsfetcher_dict
    
    # Daemonize the process ---
    daemon.daemonize(None, closedesc=False)
    logger.info("Daemon started")
    logger.info("Going to fetch data for test ID %d" %testid)
        
    # Get needed metadata from database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
549
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
550
    except:
551
        flocklab.error_logandexit("Could not connect to database", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
552
    try:
553
554
555
556
557
        cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address 
                       FROM `tbl_serv_observer` AS `a` 
                       LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk 
                       WHERE `b`.test_fk = %d GROUP BY `a`.observer_id;
                    """ % testid)
Reto Da Forno's avatar
Reto Da Forno committed
558
559
    except MySQLdb.Error as err:
        msg = str(err)
Reto Da Forno's avatar
Reto Da Forno committed
560
        flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
561
562
563
564
565
566
567
568
569
570
571
572
    except:
        logger.warn("Error %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
    rs = cur.fetchall()
    cur.close()
    cn.close()
    logger.debug("Got list of FlockLab observers from database.")
    if not rs:
        logger.info("No observers found for this test. Nothing has to be done, thus exiting...")
        return errno.ENODATA
        
    # Start fetcher threads ---
    # Create a directory structure to store the downloaded files from the DB:
573
    obsfiledir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
574
575
    if not os.path.exists(obsfiledir):
        os.makedirs(obsfiledir)
576
    obsfiledebugdir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_debug_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
577
578
579
580
581
582
583
584
585
    if not os.path.exists(obsfiledebugdir):
        os.makedirs(obsfiledebugdir)
        #DEBUG logger.debug("Created %s"%obsfiledir)
    # Start one fetching thread per observer
    FetchObsThread_stopEvent = threading.Event()
    FetchObsThread_queue = queue.Queue(maxsize=10000)
    for observer in rs:
        obsid = int(observer[0])
        # Create needed directories:
Reto Da Forno's avatar
Reto Da Forno committed
586
        dirname = "%s/%d" % (obsfiledir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
587
588
        if (not os.path.exists(dirname)):
            os.makedirs(dirname)
Reto Da Forno's avatar
Reto Da Forno committed
589
        debugdirname = "%s/%d" % (obsfiledebugdir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
590
591
592
593
        if (not os.path.exists(debugdirname)):
            os.makedirs(debugdirname)
        # Start thread: 
        try:
Reto Da Forno's avatar
Reto Da Forno committed
594
            thread = FetchObsThread(obsid, observer[1], dirname, debugdirname, FetchObsThread_queue, FetchObsThread_stopEvent)
Reto Da Forno's avatar
Reto Da Forno committed
595
596
            FetchObsThread_list.append(thread)
            thread.start()
Reto Da Forno's avatar
Reto Da Forno committed
597
            logger.debug("Started fetcher thread for observer %d" % (obsid))
Reto Da Forno's avatar
Reto Da Forno committed
598
        except:
Reto Da Forno's avatar
Reto Da Forno committed
599
600
            logger.warn("Error when starting fetcher thread for observer %d: %s, %s" % (obsid, str(sys.exc_info()[0]), str(sys.exc_info()[1])))
            continue
Reto Da Forno's avatar
Reto Da Forno committed
601
    
602
    return flocklab.SUCCESS
603
604
605
606
607
608
609
610
611
### END start_fetcher


##############################################################################
#
# Stop Fetcher
#
##############################################################################
def stop_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
612
613
614
615
616
617
618
    # Get oldest running instance of the fetcher for the selected test ID which is the main process and send it the terminate signal:
    try:
        pid = flocklab.get_fetcher_pid(testid)
        # Signal the process to stop:
        if (pid > 0):
            # Do not stop this instance if it is the only one running:
            if (pid == os.getpid()):
Reto Da Forno's avatar
Reto Da Forno committed
619
620
                raise ValueError
            logger.debug("Sending SIGTERM signal to process %d" % pid)
Reto Da Forno's avatar
Reto Da Forno committed
621
622
623
            try:
                os.kill(pid, signal.SIGTERM)
                # wait for process to finish (timeout..)
Reto Da Forno's avatar
Reto Da Forno committed
624
                shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
Reto Da Forno's avatar
Reto Da Forno committed
625
626
627
628
629
                pidpath = "/proc/%d"%pid
                while os.path.exists(pidpath) & (shutdown_timeout>0):
                    time.sleep(1)
                    shutdown_timeout = shutdown_timeout - 1
                if os.path.exists(pidpath):
630
631
632
633
                    logger.error("Fetcher is still running, killing process...")
                    # send kill signal
                    os.kill(pid, signal.SIGKILL)
                    raise ValueError
Reto Da Forno's avatar
Reto Da Forno committed
634
635
636
            except:
                pass
        else:
Reto Da Forno's avatar
Reto Da Forno committed
637
            raise ValueError
638
    except ValueError:
Reto Da Forno's avatar
Reto Da Forno committed
639
640
641
        # Set DB status in order to allow dispatcher and scheduler to go on.:
        logger.debug("Setting test status in DB to 'synced'...")
        try:
Reto Da Forno's avatar
Reto Da Forno committed
642
            (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
643
644
645
646
647
648
649
650
            flocklab.set_test_status(cur, cn, testid, 'synced')
            cur.close()
            cn.close()
        except:
            logger.warn("Could not connect to database")
        
        return errno.ENOPKG
    
651
    return flocklab.SUCCESS
652
653
654
655
656
657
658
659
660
### END stop_fetcher


##############################################################################
#
# Class WorkManager
#
##############################################################################
class WorkManager():
Reto Da Forno's avatar
Reto Da Forno committed
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
    def __init__(self):
        self.worklist = {}
        self.pattern = re.compile("_[0-9].*")
        self.workcount = 0
        
    def _next_item_with_state(self, service, obsid):
        stateitem = list(self.worklist[service][obsid][1][0])
        stateitem[4] = self.worklist[service][obsid][0]
        return tuple(stateitem)
        
    def add(self, item):
        service = self.pattern.sub("",item[3])
        obsid = item[1]
        if service not in self.worklist:
            self.worklist[service] = {}
        if obsid not in self.worklist[service]:
            self.worklist[service][obsid] = [None, []] # workerstate / worklist
        # if list is empty, we're good to process, otherwise just append it and return None
        if len(self.worklist[service][obsid][1]) == 0:
            self.worklist[service][obsid][1].append(item)
            self.workcount = self.workcount + 1
            return self._next_item_with_state(service, obsid)
        else:
            self.worklist[service][obsid][1].append(item)
            self.workcount = self.workcount + 1
            return None
        
    def done(self, item):
        service = self.pattern.sub("",item[3])
        obsid = item[1]
        if item[1:-1] == self.worklist[service][obsid][1][0][1:-1]:
            self.worklist[service][obsid][0] = item[4] # save state
            self.worklist[service][obsid][1].pop(0)
            self.workcount = self.workcount - 1
        else:
            logger.error("work done for item that was not enqueued: %s" % str(item))
        # if there is more work to do, return next item
        if len(self.worklist[service][obsid][1]) > 0:
            return self._next_item_with_state(service, obsid)
        else:
            return None
    
    def finished(self):
        return self.workcount == 0
    
706
707
708
709
710
711
712
713
714
### END WorkManager


##############################################################################
#
# Usage
#
##############################################################################
def usage():
Reto Da Forno's avatar
Reto Da Forno committed
715
    print("Usage: %s --testid=<int> [--stop] [--debug] [--help]" % __file__)
Reto Da Forno's avatar
Reto Da Forno committed
716
717
718
719
720
    print("Options:")
    print("  --testid=<int>\t\tTest ID of test to which incoming data belongs.")
    print("  --stop\t\t\tOptional. Causes the program to stop a possibly running instance of the fetcher.")
    print("  --debug\t\t\tOptional. Print debug messages to log.")
    print("  --help\t\t\tOptional. Print this help.")
721
722
723
724
725
726
727
728
729
### END usage()


##############################################################################
#
# Main
#
##############################################################################
def main(argv):
Reto Da Forno's avatar
Reto Da Forno committed
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
    
    ### Get global variables ###
    global logger
    global debug
    global testid
    global testresultsdir
    global testresultsfile_dict
    global owner_fk
    global pindict
    global obsdict_byid
    global servicedict
    global serialdict
    
    stop = False
    
    # Get logger:
Reto Da Forno's avatar
Reto Da Forno committed
746
    logger = flocklab.get_logger()
Reto Da Forno's avatar
Reto Da Forno committed
747
748
        
    # Get the config file ---
749
    flocklab.load_config()
Reto Da Forno's avatar
Reto Da Forno committed
750
751

    # Get command line parameters ---
752
    try:
Reto Da Forno's avatar
Reto Da Forno committed
753
754
755
756
757
758
759
760
        opts, args = getopt.getopt(argv, "hedt:", ["help", "stop", "debug", "testid="])
    except getopt.GetoptError as err:
        print(str(err))
        logger.warn(str(err))
        usage()
        sys.exit(errno.EINVAL)
    except:
        msg = "Error when getting arguments: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
761
        flocklab.error_logandexit(msg, errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
762
763
764
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
765
            sys.exit(flocklab.SUCCESS)
Reto Da Forno's avatar
Reto Da Forno committed
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
        elif opt in ("-d", "--debug"):
            debug = True
            logger.setLevel(logging.DEBUG)
        elif opt in ("-t", "--testid"):
            try:
                testid = int(arg)
            except ValueError:
                err = "Wrong API usage: testid has to be integer"
                print(str(err))
                logger.warn(str(err))
                usage()
                sys.exit(errno.EINVAL)
        elif opt in ("-e", "--stop"):
            stop = True
        else:
            print("Wrong API usage")
            logger.warn("Wrong API usage")
            sys.exit(errno.EINVAL)
    
    # Check if the necessary parameters are set ---
    if not testid:
        print("Wrong API usage")
        logger.warn("Wrong API usage")
        sys.exit(errno.EINVAL)
        
    # Check if the Test ID exists in the database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
793
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
794
795
    except:
        msg = "Could not connect to database"
Reto Da Forno's avatar
Reto Da Forno committed
796
        flocklab.error_logandexit(msg, errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
797
798
799
800
801
802
    rs = flocklab.check_test_id(cur, testid)
    cur.close()
    cn.close()
    if rs != 0:
        if rs == 3:
            msg = "Test ID %d does not exist in database." %testid
Reto Da Forno's avatar
Reto Da Forno committed
803
            flocklab.error_logandexit(msg, errno.EINVAL)
Reto Da Forno's avatar
Reto Da Forno committed
804
        else:
Reto Da Forno's avatar
Reto Da Forno committed
805
            msg = "Error when trying to get test ID from database: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
806
            flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
807
808
809
810
811
        
    # Add Test ID to logger name ---
    logger.name += " (Test %d)"%testid
    
    # Start / stop the fetcher ---
812
    ret = flocklab.SUCCESS
Reto Da Forno's avatar
Reto Da Forno committed
813
814
815
    if stop:
        ret = stop_fetcher()
        logger.info("FlockLab fetcher stopped.")
Reto Da Forno's avatar
Reto Da Forno committed
816
817
818
819
820
821
        sys.exit(ret)

    # Start the fetcher processes which download data from the observers: 
    ret = start_fetcher()
    if ret == flocklab.SUCCESS:
        logger.info("FlockLab fetcher started.")
Reto Da Forno's avatar
Reto Da Forno committed
822
    else:
Reto Da Forno's avatar
Reto Da Forno committed
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
        msg = "Start function returned error. Exiting..."
        os.kill(os.getpid(), signal.SIGTERM)
        rs = flocklab.error_logandexit(msg, ret)
        
    # Get needed metadata ---
    try:
        (cn, cur) = flocklab.connect_to_db()
    except:
        msg = "Could not connect to database"
        flocklab.error_logandexit(msg, errno.EAGAIN)
    rs = flocklab.get_test_owner(cur, testid)
    if isinstance(rs, tuple):
        owner_fk = rs[0]
    else:
        owner_fk = None
    rs = flocklab.get_pinmappings(cur)
    if isinstance(rs, dict):
        pindict = rs
    else:
        pindict = None
    rs = flocklab.get_test_obs(cur, testid)
    if isinstance(rs, tuple):
        obsdict_byid = rs[1]
    else:
        obsdict_byid = None
    # Dict for serial service: 'r' means reader (data read from the target), 'w' means writer (data written to the target):
    serialdict = {0: 'r', 1: 'w'}
    # Get calibration data for used slots and add it to obsdict ---
    ppstats={}
    for (obsid, (obskey, nodeid)) in obsdict_byid.items():
        ppstats[obsid]=(0.0,0)
        rs = flocklab.get_slot_calib(cur, int(obskey), testid)
        if isinstance(rs, tuple):
            obsdict_byid[obsid] = (nodeid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
857
        else:
Reto Da Forno's avatar
Reto Da Forno committed
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
            obsdict_byid = None
            break
    rs = flocklab.get_servicemappings(cur)
    if isinstance(rs, dict):
        servicedict = rs
    else:
        servicedict = None
    
    #find out the start and stoptime of the test
    cur.execute("SELECT `time_start_wish`, `time_end_wish` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
    # Times are going to be of datetime type:
    ret = cur.fetchone() 
    teststarttime = ret[0]
    teststoptime  = ret[1]
    FlockDAQ = False
    
    # Find out which services are used to allocate working threads later on ---
    # Get the XML config from the database and check which services are used in the test.
    servicesUsed_dict = {'gpiotracing': 'gpioTracingConf', 'gpioactuation': 'gpioActuationConf', 'powerprofiling': 'powerProfilingConf', 'serial': 'serialConf'}
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
    ret = cur.fetchone()
    if not ret:
        msg = "No XML found in database for testid %d." %testid
        errors.append(msg)
        logger.error(msg)
        for service, xmlname in servicesUsed_dict.items():
            servicesUsed_dict[service] = True
    else:
Reto Da Forno's avatar
Reto Da Forno committed
886
        try:
Reto Da Forno's avatar
Reto Da Forno committed
887
888
889
890
891
            logger.debug("Got XML from database.")
            parser = lxml.etree.XMLParser(remove_comments=True)
            tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
            ns = {'d': flocklab.config.get('xml', 'namespace')}
            for service, xmlname in servicesUsed_dict.items():
892
                if tree.xpath('//d:%s' % xmlname, namespaces=ns):
Reto Da Forno's avatar
Reto Da Forno committed
893
                    servicesUsed_dict[service] = True
894
                    logger.debug("Usage of %s detected." % service)
Reto Da Forno's avatar
Reto Da Forno committed
895
896
                else:
                    servicesUsed_dict[service] = False
Reto Da Forno's avatar
Reto Da Forno committed
897
        except:
Reto Da Forno's avatar
Reto Da Forno committed
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
            msg = "XML parsing failed: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
            errors.append(msg)
            logger.error(msg)
    
    cur.close()
    cn.close()
    if ((owner_fk==None) or (pindict==None) or (obsdict_byid==None) or (servicedict==None)):
        msg = "Error when getting metadata.\n"
        msg += "owner_fk: %s\npindict: %s\nobsdict_byid: %s\nservicedict: %s\n" % (str(owner_fk), str(pindict), str(obsdict_byid), str(servicedict))
        msg += "Exiting..."
        logger.debug(msg)
        os.kill(os.getpid(), signal.SIGTERM)
        flocklab.error_logandexit(msg, errno.EAGAIN)
    else:
        logger.debug("Got all needed metadata.")        
    
    # Start aggregating processes ---
    """    There is an infinite loop which gets files to process from the fetcher threads which download data from the observers.
        Downloaded data is then assigned to a worker process for the corresponding service and in the worker process parsed,
        converted (if needed) and aggregated into a single file per service.
        The loop is stopped if the program receives a stop signal. In this case, the loops runs until no more database files 
        are there to be processed.
    """
    if __name__ == '__main__':
        # Create directory and files needed for test results:
        testresultsdir = "%s/%d" %(flocklab.config.get('fetcher', 'testresults_dir'), testid)
        if not os.path.exists(testresultsdir):
            os.makedirs(testresultsdir)
926
            logger.debug("Created %s" % testresultsdir)
Reto Da Forno's avatar
Reto Da Forno committed
927
928
        manager = multiprocessing.Manager()
        for service in ('errorlog', 'gpiotracing', 'gpioactuation', 'powerprofiling', 'serial', 'powerprofilingstats'):
929
            path = "%s/%s.csv" % (testresultsdir, service)
Reto Da Forno's avatar
Reto Da Forno committed
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
            lock = manager.Lock()
            testresultsfile_dict[service] = (path, lock)
            # Create file and write header:
            if service == 'errorlog':
                header = '# timestamp,observer_id,node_id,errormessage\n'
            elif service == 'gpiotracing':
                header = 'observer_id,node_id,pin_name,# timestamp,value\n'
            elif service == 'gpioactuation':
                header = '# timestamp_planned,timestamp_executed,observer_id,node_id,pin_name,value\n'
            elif service == 'powerprofiling':
                header = 'This is a RocketLogger File with additional information about the observer id and the node id.\n'
            elif service == 'serial':
                header = '# timestamp,observer_id,node_id,direction,output\n'
            elif service == 'powerprofilingstats':
                header = '# observer_id,node_id,mean_mA\n'
            lock.acquire()
            f = open(path, 'w')
            f.write(header)
            f.close()
            lock.release()
        # Start logging thread:
        logqueue = manager.Queue(maxsize=10000)
        LogQueueThread_stopEvent = threading.Event()
        try:
            thread = LogQueueThread(logqueue, logger, LogQueueThread_stopEvent)
            thread.start()
        except:
            logger.warn("Error when starting log queue thread: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
        
        PpStatsQueue = manager.Queue(maxsize=1)
        PpStatsQueue.put(ppstats)
        # Determine the number of CPU's to be used for each aggregating process. If a service is not used, its CPUs are assigned to other services
        cpus_free = 0
        cpus_errorlog = flocklab.config.getint('fetcher', 'cpus_errorlog')
        # CPUs for serial service:
        if servicesUsed_dict['serial'] == True:
            cpus_serial    = flocklab.config.getint('fetcher', 'cpus_serial')
Reto Da Forno's avatar
Reto Da Forno committed
967
        else:
Reto Da Forno's avatar
Reto Da Forno committed
968
969
970
971
972
973
            cpus_serial    = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_serial')
        # CPUs for GPIO actuation. If the service is not used, assign a CPU anyhow since FlockLab always uses this service to determine start and stop times of a test.
        #cpus_errorlog = flocklab.config.getint('fetcher', 'cpus_errorlog')
        if servicesUsed_dict['gpioactuation'] == True:
            cpus_gpiosetting = flocklab.config.getint('fetcher', 'cpus_gpiosetting')
Reto Da Forno's avatar
Reto Da Forno committed
974
        else:
Reto Da Forno's avatar
Reto Da Forno committed
975
976
977
978
979
            cpus_gpiosetting = 1
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_gpiosetting') - cpus_gpiosetting
        # CPUs for GPIO tracing:
        if servicesUsed_dict['gpiotracing'] == True:
            cpus_gpiomonitoring    = flocklab.config.getint('fetcher', 'cpus_gpiomonitoring')
Reto Da Forno's avatar
Reto Da Forno committed
980
        else:
Reto Da Forno's avatar
Reto Da Forno committed
981
982
983
984
985
            cpus_gpiomonitoring = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_gpiomonitoring')
        # CPUs for powerprofiling:
        if servicesUsed_dict['powerprofiling'] == True:
            cpus_powerprofiling    = flocklab.config.getint('fetcher', 'cpus_powerprofiling')
Reto Da Forno's avatar
Reto Da Forno committed
986
        else:
Reto Da Forno's avatar
Reto Da Forno committed
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
            cpus_powerprofiling = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_powerprofiling')
        # If there are free CPUs left, give them to GPIO tracing and power profiling evenly as these services need the most CPU power:
        if cpus_free > 0:
            if (cpus_powerprofiling > 0) and (cpus_gpiomonitoring > 0):
                # Both services are used, distribute the free CPUS evenly:
                cpus_powerprofiling = cpus_powerprofiling + int(math.ceil(float(cpus_free)/2))
                cpus_gpiomonitoring = cpus_gpiomonitoring + int(math.floor(float(cpus_free)/2))
            elif cpus_powerprofiling > 0:
                # GPIO monitoring/tracing is not used, so give all CPUs to powerprofiling:
                cpus_powerprofiling = cpus_powerprofiling + cpus_free
            elif cpus_gpiomonitoring > 0:
                # Powerprofiling is not used, so give all CPUs to GPIO monitoring/tracing:
                cpus_gpiomonitoring = cpus_gpiomonitoring + cpus_free
For faster browsing, not all history is shown. View entire blame