flocklab_fetcher.py 49.8 KB
Newer Older
1
2
#! /usr/bin/env python3

3
import os, sys, getopt, traceback, MySQLdb, signal, random, time, errno, multiprocessing, subprocess, re, logging, __main__, threading, struct, types, queue, math, shutil, lxml.etree
4
5
import lib.daemon as daemon
import lib.flocklab as flocklab
Reto Da Forno's avatar
Reto Da Forno committed
6
7
from rocketlogger.data import RocketLoggerData
import pandas as pd
Reto Da Forno's avatar
Reto Da Forno committed
8
import numpy as np
Reto Da Forno's avatar
Reto Da Forno committed
9

10

Reto Da Forno's avatar
Reto Da Forno committed
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
logger                   = None
debug                    = False
testid                   = None 
errors                   = []
FetchObsThread_list      = []
FetchObsThread_stopEvent = None
FetchObsThread_queue     = None
obsfiledir               = None
testresultsdir           = None
testresultsfile_dict     = {}
mainloop_stop            = False
owner_fk                 = None
pindict                  = None
obsdict_byid             = None
servicedict              = None
serialdict               = None
27
28

ITEM_TO_PROCESS = 0
Reto Da Forno's avatar
Reto Da Forno committed
29
ITEM_PROCESSED  = 1
30

31

32
33
34
35
36
37
##############################################################################
#
# Error classes
#
##############################################################################
class DbFileEof(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
38
    pass
39
40

class DbFileReadError(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
41
42
43
44
    def __init__(self, expectedSize, actualSize, fpos):
        self.expectedSize = expectedSize
        self.actualSize = actualSize
        self.fpos = fpos
45
46
47
48
49
50
51
52
53
54
### END Error classes



##############################################################################
#
# Class ServiceInfo
#
##############################################################################
class ServiceInfo():
Reto Da Forno's avatar
Reto Da Forno committed
55
56
57
    def __init__(self, servicename):
        self.servicename = servicename
        self.files = []
58
        self.pattern = "^%s_[0-9]+\.[a-z]+$" % servicename
Reto Da Forno's avatar
Reto Da Forno committed
59
60
61
62
63
64
65
66
67
68
69
    
    def matchFileName(self, filename):
        return re.search(self.pattern, os.path.basename(filename)) is not None
        
    def addFile(self, filename):
        self.files.append(filename)
    
    def stripFileList(self, removelast=True):
        self.files.sort()
        if ((len(self.files) > 0) and removelast):
            self.files.pop()
70
71
72
73
74
75
76
77
78
### END ServiceInfo


##############################################################################
#
# sigterm_handler
#
##############################################################################
def sigterm_handler(signum, frame):
Reto Da Forno's avatar
Reto Da Forno committed
79
80
81
82
83
84
85
86
87
88
89
    """If the program is terminated by sending it the signal SIGTERM 
    (e.g. by executing 'kill') or SIGINT (pressing ctrl-c), 
    this signal handler is invoked for cleanup."""
    
    global mainloop_stop
    global FetchObsThread_stopEvent

    logger.info("Process received SIGTERM or SIGINT signal")
        
    # Signal all observer fetcher threads to stop:
    logger.debug("Stopping observer fetcher threads...")
Reto Da Forno's avatar
Reto Da Forno committed
90
    shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
Reto Da Forno's avatar
Reto Da Forno committed
91
92
93
94
95
96
97
98
99
100
101
102
    try:
        FetchObsThread_stopEvent.set()
    except:
        pass
    for thread in FetchObsThread_list:
        try:
            thread.join(shutdown_timeout)
        except:
            logger.warn("Fetcher thread did not stop within %d seconds." % shutdown_timeout)
    # Set DB status:
    logger.debug("Setting test status in DB to 'syncing'...")
    try:
Reto Da Forno's avatar
Reto Da Forno committed
103
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
104
105
106
107
        flocklab.set_test_status(cur, cn, testid, 'syncing')
        cur.close()
        cn.close()
    except:
Reto Da Forno's avatar
Reto Da Forno committed
108
        logger.warn("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
109
110
111
112
    
    # Tell the main loop to stop:
    mainloop_stop = True
    logger.debug("Set stop signal for main loop.")
113
114
115
116
117
118
119
120
121
### END sigterm_handler


##############################################################################
#
# Functions for parsing observer DB files data
#
##############################################################################
def parse_serial(buf):
Reto Da Forno's avatar
Reto Da Forno committed
122
123
    _data = struct.unpack("iii%ds" % (len(buf) - 12), buf) #int service; struct timeval timestamp;char * data
    return (_data[0], _data[3], "%i.%06i" % (_data[1], _data[2]))
124
125
126
127
128
129
130
131


##############################################################################
#
# Functions for converting observer DB data
#
##############################################################################
def convert_serial(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
132
    try:
Reto Da Forno's avatar
Reto Da Forno committed
133
        result = "%s,%s,%s,%s,%s\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1].decode('utf8').strip())
Reto Da Forno's avatar
Reto Da Forno committed
134
135
136
    except UnicodeDecodeError:
        result = "%s,%s,%s,%s,%s\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]], str(obsdata[1]))
    return result
137
138
139
140
141
142


##############################################################################
#
# read_from_db_file: Read from an open DB file from an observer
#
143
##############################################################################
144
def read_from_db_file(dbfile):
Reto Da Forno's avatar
Reto Da Forno committed
145
146
147
148
149
150
151
152
153
154
155
156
    _buf = dbfile.read(4)
    if len(_buf) < 4:
        dbfile.close()
        raise DbFileEof()
    else:
        _size = struct.unpack("<I",_buf)
        _buf = dbfile.read(_size[0])
        if len(_buf) != _size[0]:
            _fpos = dbfile.tell() - 4 - len(_buf)
            dbfile.close()
            raise DbFileReadError(_size[0], len(_buf), _fpos)
        return _buf
157
158
159
160
161
162
### END read_from_db_file


##############################################################################
#
# worker_convert_and_aggregate: Worker function for multiprocessing pools.
Reto Da Forno's avatar
Reto Da Forno committed
163
164
#        Parses observer DB files for all services, converts values (if needed)
#        and aggregates them into single test result files.
165
166
167
#
##############################################################################
def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, commitsize=1, vizimgdir=None, parse_f=None, convert_f=None, viz_f=None, logqueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
168
169
170
171
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
Reto Da Forno's avatar
Reto Da Forno committed
172
        obsdbfile_path = "%s/%s" % (fdir,f)
Reto Da Forno's avatar
Reto Da Forno committed
173
        loggername = "(%s.Obs%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
        #logqueue.put_nowait((loggername, logging.DEBUG, "Import file %s"%obsdbfile_path))
        # Open file:
        dbfile = open(obsdbfile_path, 'rb')
        rows = 0
        viz_values = []
        conv_values = []
        while not dbfile.closed:
            # Process DB file line by line:
            try:
                # Parse one line:
                buf = read_from_db_file(dbfile)
                obsdata = parse_f(buf)
                viz_values.append(obsdata)
                # Convert data if needed:
                if convert_f != None:
                    conv_data = convert_f(obsdata, obsid, nodeid)
                    conv_values.append(conv_data)
                    rows += 1
                # Visualize data:
                if (commitsize > 0) & (rows >= commitsize):
                    if viz_f != None:
                        #logqueue.put_nowait((loggername, logging.DEBUG, "Viz started..."))
                        viz_f(testid, owner_fk, viz_values, obsid, vizimgdir, logger)
                        #logqueue.put_nowait((loggername, logging.DEBUG, "Viz done."))
                    # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
199
                    #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for writing..." % (resultfile_path)))
Reto Da Forno's avatar
Reto Da Forno committed
200
201
202
203
204
                    resultfile_lock.acquire()
                    f = open(resultfile_path, 'a')
                    f.writelines(conv_values)
                    f.close()
                    resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
205
                    logqueue.put_nowait((loggername, logging.DEBUG, "Committed results to %s after %d rows" % (resultfile_path, rows)))
Reto Da Forno's avatar
Reto Da Forno committed
206
207
208
209
210
211
212
213
214
215
                    rows = 0
                    conv_values = []
                    viz_values = []
            except DbFileEof:
                # logqueue.put_nowait((loggername, logging.DEBUG, "DbFileEof has occurred."))
                break # dbfile has been closed in parser (most likely because EOF was reached)
            except DbFileReadError as err:
                msg = "%s: Packet size (%i) did not match payload size (%i) @ %d." %(obsdbfile_path, err.expectedSize, err.actualSize, err.fpos)
                _errors.append((msg, errno.EIO, obsid))
                logqueue.put_nowait((loggername, logging.ERROR, msg))
216
217
218
219
220
221
222
        if (len(conv_values) > 0):
            # There is still data left. Do a last commit:
            if (viz_f != None) and (len(viz_values) > 0):
                #logqueue.put_nowait((loggername, logging.DEBUG, "Viz started..."))
                viz_f(testid, owner_fk, viz_values, obsid, vizimgdir, logger)
                #logqueue.put_nowait((loggername, logging.DEBUG, "Viz done."))
            # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
223
            #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for final writing..." % (resultfile_path)))
224
225
226
227
228
            resultfile_lock.acquire()
            f = open(resultfile_path, 'a')
            f.writelines(conv_values)
            f.close()
            resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
229
            logqueue.put_nowait((loggername, logging.DEBUG, "Committed final results to %s after %d rows" % (resultfile_path, rows)))
230
        # Remove processed file:
Reto Da Forno's avatar
Reto Da Forno committed
231
        #logqueue.put_nowait((loggername, logging.DEBUG, "Remove %s" % (obsdbfile_path)))
232
        os.unlink(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
233
234
235
236
237
238
239
240
    except:
        msg = "Error in worker process: %s: %s\n%s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
241
242
243
### END worker_convert_and_aggregate


Reto Da Forno's avatar
Reto Da Forno committed
244
245
246
247
248
249
250
##############################################################################
#
# worker_gpiotracing: Worker function for converting and aggregating gpio
#               tracing data. Unlike for the other services, this function works on
#               whole observer DB files.
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
251
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, vizimgdir=None, viz_f=None, logqueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
252
253
    try:
        _errors = []
254
        cur_p = multiprocessing.current_process()
Reto Da Forno's avatar
Reto Da Forno committed
255
        (itemtype, obsid, fdir, f, workerstate) = queueitem
256
        obsdbfile_path = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
257
        loggername = "(%s.Obs%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
258
259

        with open(resultfile_path, "a") as outfile:
260
261
262
263
264
265
266
267
268
269
            infile = open(obsdbfile_path, "r")
            for line in infile:
                try:
                    (timestamp, pin, level) = line.split(',')
                    outfile.write("%s,%s,%s,%s,%s" % (timestamp, obsid, nodeid, pin, level))
                except ValueError:
                    logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
                    break
            infile.close()
        os.remove(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
270
    except:
Reto Da Forno's avatar
Reto Da Forno committed
271
        msg = "Error in gpiotracing worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
272
273
274
275
276
277
278
279
280
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_gpiotracing


281
282
283
##############################################################################
#
# worker_powerprof: Worker function for converting and aggregating power
Reto Da Forno's avatar
Reto Da Forno committed
284
285
#        profiling data. Unlike for the other services, this function works on
#        whole observer DB files.
286
287
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
288
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, vizimgdir=None, viz_f=None, logqueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
289
    try:
Reto Da Forno's avatar
Reto Da Forno committed
290
        channel_names = ['I1','V1','V2']
Reto Da Forno's avatar
Reto Da Forno committed
291
292
293
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
294
295
        obsdbfile_path = "%s/%s" % (fdir, f)
        loggername = "(%s.Obs%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
296
297

        rld_data = RocketLoggerData(obsdbfile_path).merge_channels()
Reto Da Forno's avatar
Reto Da Forno committed
298
299
300
301
        # get network time and convert to UNIX timestamp (UTC)
        timeidx = rld_data.get_time(absolute_time=True, time_reference='network')     # TODO adjust parameters for RL 1.99+
        timeidxunix = timeidx.astype('uint64') / 1e9
        rld_dataframe = pd.DataFrame(rld_data.get_data(channel_names), index=timeidxunix, columns=channel_names)
Reto Da Forno's avatar
Reto Da Forno committed
302
303
304
305
        rld_dataframe.insert(0, 'observer_id', obsid)
        rld_dataframe.insert(1, 'node_id', nodeid)
        rld_dataframe.to_csv(resultfile_path, sep=',', index_label='time', header=False, mode='a')

Reto Da Forno's avatar
Reto Da Forno committed
306
        os.remove(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
307
308
309
310
311
    except:
        msg = "Error in powerprof worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
Reto Da Forno's avatar
Reto Da Forno committed
312
313
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
Reto Da Forno's avatar
Reto Da Forno committed
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
        return (_errors, tuple(processeditem))
### END worker_powerprof


##############################################################################
#
# worker_errorlog
#
##############################################################################
def worker_errorlog(queueitem=None, nodeid=None, resultfile_path=None, vizimgdir=None, viz_f=None, logqueue=None):
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
        obsdbfile_path = "%s/%s" % (fdir, f)
        loggername = "(%s.Obs%d) " % (cur_p.name, obsid)

        with open(resultfile_path, "a") as outfile:
            infile = open(obsdbfile_path, "r")
            for line in infile:
                (timestamp, msg) = line.split(',')
                outfile.write("%s,%s,%s,%s" % (timestamp, obsid, nodeid, msg))
            infile.close()
        os.remove(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
338
    except:
Reto Da Forno's avatar
Reto Da Forno committed
339
        msg = "Error in errorlog worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
340
341
342
343
344
345
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
Reto Da Forno's avatar
Reto Da Forno committed
346
### END worker_errorlog()
347
348
349
350
351


##############################################################################
#
# worker_callback: Callback function which reports errors from worker processes
Reto Da Forno's avatar
Reto Da Forno committed
352
#        back to the main process
353
354
355
#
##############################################################################
def worker_callback(result):
Reto Da Forno's avatar
Reto Da Forno committed
356
357
358
359
360
    global errors
    global FetchObsThread_queue
    
    if len(result[0]) > 0:
        for (err, eno, obsid) in result:
Reto Da Forno's avatar
Reto Da Forno committed
361
            msg = "Error %d when processing results for Observer ID %s: %s" % (eno, obsid, err)
Reto Da Forno's avatar
Reto Da Forno committed
362
363
364
365
366
367
368
369
            errors.append(msg)
    
    try:
        FetchObsThread_queue.put(item=result[1], block=True, timeout=10)
    except queue.Full:
        msg = "Queue full after processing element"
        logger.error(msg)
    return 0
370
371
372
373
374
375
376
377
378
### END worker_callback


##############################################################################
#
# LogQueueThread
#
##############################################################################
class LogQueueThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
    """    Thread which logs from queue to logfile.
    """ 
    def __init__(self, logqueue, logger, stopEvent):
        threading.Thread.__init__(self) 
        self._logger        = logger
        self._stopEvent        = stopEvent
        self._logqueue        = logqueue

    def run(self):
        self._logger.info("LogQueueThread started")
            
        # Let thread run until someone calls terminate() on it:
        while not self._stopEvent.is_set():
            try:
                (loggername, loglevel, msg) = self._logqueue.get(block=True, timeout=1)
                self._logger.log(loglevel, loggername + msg)
            except queue.Empty:
                pass
397
        
Reto Da Forno's avatar
Reto Da Forno committed
398
399
        # Stop the process:
        self._logger.info("LogQueueThread stopped")
400
401
402
403
404
405
406
407
408
### END LogQueueThread


##############################################################################
#
# FetchObsThread
#
##############################################################################
class FetchObsThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
409
410
    """    Thread which downloads database files from an observer to the server.
    """ 
Reto Da Forno's avatar
Reto Da Forno committed
411
    def __init__(self, obsid, obsethernet, dirname, debugdirname, workQueue, stopEvent):
Reto Da Forno's avatar
Reto Da Forno committed
412
413
        threading.Thread.__init__(self) 
        self._obsid            = obsid
414
415
416
        self._obsethernet      = obsethernet
        self._obsfiledir       = dirname
        self._obsfiledebugdir  = debugdirname
Reto Da Forno's avatar
Reto Da Forno committed
417
418
        self._workQueue        = workQueue
        self._stopEvent        = stopEvent
419
        self._logger           = logger
Reto Da Forno's avatar
Reto Da Forno committed
420
        
Reto Da Forno's avatar
Reto Da Forno committed
421
422
        self._min_sleep        = flocklab.config.getint("fetcher", "min_sleeptime")
        self._max_randsleep    = flocklab.config.getint("fetcher", "max_rand_sleeptime")
423
        self._obstestresfolder = "%s/%d" % (flocklab.config.get("observer", "testresultfolder"), testid)
Reto Da Forno's avatar
Reto Da Forno committed
424
425
426
        
    def run(self):
        try:
427
            self._loggerprefix = "(FetchObsThread.Obs%d) "%self._obsid
Reto Da Forno's avatar
Reto Da Forno committed
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
            self._logger.info(self._loggerprefix + "FetchObsThread starting...")
            removelast = True
                
            # Let thread run until someone calls terminate() on it:
            while removelast == True:
                """ Get data from the observer over SCP. 
                Then request data from the observer and store it in the server's filesystem. 
                Then sleep some random time before fetching data again.
                """
                # Wait for some random time:
                waittime =self._min_sleep + random.randrange(0,self._max_randsleep)
                #DEBUG self._logger.debug(self._loggerprefix + "Going to wait for %d seconds" %(waittime))
                self._stopEvent.wait(waittime) # The wait will be interrupted if the stop signal has been set causing the thread to download all remaining files
                if self._stopEvent.is_set():
                    removelast = False
                #self._logger.debug(self._loggerprefix + "Woke up")
                # Get list of available files
Reto Da Forno's avatar
Reto Da Forno committed
445
                cmd = ['ssh' , self._obsethernet, "ls %s/" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
446
447
448
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)  # universal_newlines makes sure that a string is returned instead of a byte object
                out, err = p.communicate(None)
                rs = p.returncode
449
                if (rs == flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
450
                    services = {}
Reto Da Forno's avatar
Reto Da Forno committed
451
                    for servicename in [ "gpio_monitor", "powerprofiling", "serial", "error" ]:
Reto Da Forno's avatar
Reto Da Forno committed
452
453
454
455
456
457
                        services[servicename] = ServiceInfo(servicename)
                    # Read filenames
                    for dbfile in out.split():
                        # Check name and append to corresponding list
                        for service in services.values():
                            if service.matchFileName(dbfile):
458
                                service.addFile("%s/%s" % (self._obstestresfolder, dbfile))
Reto Da Forno's avatar
Reto Da Forno committed
459
460
461
462
463
464
465
466
                                break
                    copyfilelist = []
                    # Remove latest from each list as the observer might still be writing into it (unless stop event has been set).
                    for service in services.values():
                        service.stripFileList(removelast)
                        for dbfile in service.files:
                            copyfilelist.append(dbfile)
                        #if (len(service.files) > 0):
467
                        #    self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (" ".join(service.files), service.servicename))
Reto Da Forno's avatar
Reto Da Forno committed
468
469
470
                    
                    if len(copyfilelist) > 0:
                        # Download the database files:
Reto Da Forno's avatar
Reto Da Forno committed
471
                        self._logger.debug(self._loggerprefix + "Downloading results files %s" % (" ".join(copyfilelist)))
Reto Da Forno's avatar
Reto Da Forno committed
472
                        cmd = ['scp', '-q' ]
Reto Da Forno's avatar
Reto Da Forno committed
473
                        cmd.extend(["%s:%s" % (self._obsethernet, x) for x in copyfilelist])
474
                        cmd.append("%s/" % self._obsfiledir)
Reto Da Forno's avatar
Reto Da Forno committed
475
476
477
478
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
                        if rs != 0:
Reto Da Forno's avatar
Reto Da Forno committed
479
                            self._logger.debug(self._loggerprefix + "Could not download all results files from observer. Dataloss occurred for this observer.")
Reto Da Forno's avatar
Reto Da Forno committed
480
                            self._logger.debug(self._loggerprefix + "Tried to execute %s, result was %d, stdout: %s, error: %s" % (str(cmd), rs, out, err))
Reto Da Forno's avatar
Reto Da Forno committed
481
                        else:
Reto Da Forno's avatar
Reto Da Forno committed
482
                            self._logger.debug("Downloaded results files from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
483
484
485
                            # put a copy to the debug directory
                            for f in copyfilelist:
                                fname = os.path.basename(f)
Reto Da Forno's avatar
Reto Da Forno committed
486
                                shutil.copyfile("%s/%s" % (self._obsfiledir, fname), "%s/%s" % (self._obsfiledebugdir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
487
488
489
490
491
492
493
494
                            # Tell the fetcher to start working on the files:
                            for f in copyfilelist:
                                fname = os.path.basename(f)
                                try:
                                    self._workQueue.put(item=(ITEM_TO_PROCESS, self._obsid, self._obsfiledir, fname, None), block=True, timeout=10)
                                except queue.Full:
                                    # Make sure the file is downloaded again at a later point:
                                    copyfilelist.remove(f)
Reto Da Forno's avatar
Reto Da Forno committed
495
496
                                    os.unlink("%s/%s" % (self._obsfiledir, fname))
                                    self._logger.warn(self._loggerprefix + "FetchObsThread queue is full. Cannot put %s/%s on it." % (self._obsfiledir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
497
498
                            # Remove remote files if any are left:
                            if (len(copyfilelist) > 0):
499
                                cmd = ['ssh' ,'%s'%(self._obsethernet), "cd %s;"%self._obstestresfolder, "rm"]
Reto Da Forno's avatar
Reto Da Forno committed
500
501
502
503
504
                                cmd.extend(copyfilelist)
                                self._logger.debug(self._loggerprefix + "Removing files on observer...")
                                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                                out, err = p.communicate(None)
                                rs = p.wait()
505
                                if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
506
                                    self._logger.error(self._loggerprefix + "Could not remove files on observer, result was %d, error: %s" % (rs, err))
Reto Da Forno's avatar
Reto Da Forno committed
507
508
509
510
                            else:
                                self._logger.debug(self._loggerprefix + "No files left to delete on observer.")
                    else:
                        self._logger.debug(self._loggerprefix + "No files to download from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
511

Reto Da Forno's avatar
Reto Da Forno committed
512
                    if removelast == False: # this is the last execution of the while loop
513
                        cmd = ['ssh' ,'%s'%(self._obsethernet), "rm -rf %s" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
514
515
516
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
517
                        if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
518
                            self._logger.error(self._loggerprefix + "Could not remove results directory from observer, result was %d. Error: %s" % (rs, err.strip()))
Reto Da Forno's avatar
Reto Da Forno committed
519

Reto Da Forno's avatar
Reto Da Forno committed
520
                else:
Reto Da Forno's avatar
Reto Da Forno committed
521
                    self._logger.error(self._loggerprefix + "SSH to observer did not succeed, fetcher thread terminated with code %d. Error: %s" % (rs, err.strip()))
Reto Da Forno's avatar
Reto Da Forno committed
522
                    break   # abort
Reto Da Forno's avatar
Reto Da Forno committed
523
524
        
        except:
Reto Da Forno's avatar
Reto Da Forno committed
525
            logger.error(self._loggerprefix + "FetchObsThread crashed: %s, %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
Reto Da Forno's avatar
Reto Da Forno committed
526
527
528
        
        # Stop the process:
        self._logger.info(self._loggerprefix + "FetchObsThread stopped")
529
530
531
532
533
534
535
536
537
### END FetchObsThread


##############################################################################
#
# Start Fetcher
#
##############################################################################
def start_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
538
539
540
541
542
543
544
545
546
547
548
549
550
    global obsfiledir
    global FetchObsThread_list
    global FetchObsThread_queue
    global FetchObsThread_stopEvent
    global obsfetcher_dict
    
    # Daemonize the process ---
    daemon.daemonize(None, closedesc=False)
    logger.info("Daemon started")
    logger.info("Going to fetch data for test ID %d" %testid)
        
    # Get needed metadata from database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
551
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
552
    except:
Reto Da Forno's avatar
Reto Da Forno committed
553
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
554
    try:
555
556
557
558
559
        cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address 
                       FROM `tbl_serv_observer` AS `a` 
                       LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk 
                       WHERE `b`.test_fk = %d GROUP BY `a`.observer_id;
                    """ % testid)
Reto Da Forno's avatar
Reto Da Forno committed
560
561
    except MySQLdb.Error as err:
        msg = str(err)
Reto Da Forno's avatar
Reto Da Forno committed
562
        flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
563
564
565
566
567
568
569
570
571
572
573
574
    except:
        logger.warn("Error %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
    rs = cur.fetchall()
    cur.close()
    cn.close()
    logger.debug("Got list of FlockLab observers from database.")
    if not rs:
        logger.info("No observers found for this test. Nothing has to be done, thus exiting...")
        return errno.ENODATA
        
    # Start fetcher threads ---
    # Create a directory structure to store the downloaded files from the DB:
575
    obsfiledir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
576
577
    if not os.path.exists(obsfiledir):
        os.makedirs(obsfiledir)
578
    obsfiledebugdir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_debug_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
579
580
581
582
583
584
585
586
587
    if not os.path.exists(obsfiledebugdir):
        os.makedirs(obsfiledebugdir)
        #DEBUG logger.debug("Created %s"%obsfiledir)
    # Start one fetching thread per observer
    FetchObsThread_stopEvent = threading.Event()
    FetchObsThread_queue = queue.Queue(maxsize=10000)
    for observer in rs:
        obsid = int(observer[0])
        # Create needed directories:
Reto Da Forno's avatar
Reto Da Forno committed
588
        dirname = "%s/%d" % (obsfiledir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
589
590
        if (not os.path.exists(dirname)):
            os.makedirs(dirname)
Reto Da Forno's avatar
Reto Da Forno committed
591
        debugdirname = "%s/%d" % (obsfiledebugdir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
592
593
594
595
        if (not os.path.exists(debugdirname)):
            os.makedirs(debugdirname)
        # Start thread: 
        try:
Reto Da Forno's avatar
Reto Da Forno committed
596
            thread = FetchObsThread(obsid, observer[1], dirname, debugdirname, FetchObsThread_queue, FetchObsThread_stopEvent)
Reto Da Forno's avatar
Reto Da Forno committed
597
598
            FetchObsThread_list.append(thread)
            thread.start()
Reto Da Forno's avatar
Reto Da Forno committed
599
            logger.debug("Started fetcher thread for observer %d" % (obsid))
Reto Da Forno's avatar
Reto Da Forno committed
600
        except:
Reto Da Forno's avatar
Reto Da Forno committed
601
602
            logger.warn("Error when starting fetcher thread for observer %d: %s, %s" % (obsid, str(sys.exc_info()[0]), str(sys.exc_info()[1])))
            continue
Reto Da Forno's avatar
Reto Da Forno committed
603
    
604
    return flocklab.SUCCESS
605
606
607
608
609
610
611
612
613
### END start_fetcher


##############################################################################
#
# Stop Fetcher
#
##############################################################################
def stop_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
614
615
616
617
618
619
620
    # Get oldest running instance of the fetcher for the selected test ID which is the main process and send it the terminate signal:
    try:
        pid = flocklab.get_fetcher_pid(testid)
        # Signal the process to stop:
        if (pid > 0):
            # Do not stop this instance if it is the only one running:
            if (pid == os.getpid()):
Reto Da Forno's avatar
Reto Da Forno committed
621
622
                raise ValueError
            logger.debug("Sending SIGTERM signal to process %d" % pid)
Reto Da Forno's avatar
Reto Da Forno committed
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
            os.kill(pid, signal.SIGTERM)
            # wait for process to finish (timeout..)
            shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
            pidpath = "/proc/%d" % pid
            while os.path.exists(pidpath) & (shutdown_timeout > 0):
                time.sleep(1)
                shutdown_timeout = shutdown_timeout - 1
            if os.path.exists(pidpath):
                logger.error("Fetcher with PID %d is still running, killing process..." % pid)
                # send kill signal
                os.kill(pid, signal.SIGKILL)
                time.sleep(3)
                # check if there is a remaining fetcher process
                pid = flocklab.get_fetcher_pid(testid)
                if pid > 0 and pid != os.getpid():
                    logger.warn("Found a remaining fetcher thread with PID %d, killing it now..." % (pid))
639
                    os.kill(pid, signal.SIGKILL)
Reto Da Forno's avatar
Reto Da Forno committed
640
                raise ValueError
Reto Da Forno's avatar
Reto Da Forno committed
641
        else:
Reto Da Forno's avatar
Reto Da Forno committed
642
            raise ValueError
643
    except ValueError:
Reto Da Forno's avatar
Reto Da Forno committed
644
645
646
        # Set DB status in order to allow dispatcher and scheduler to go on.:
        logger.debug("Setting test status in DB to 'synced'...")
        try:
Reto Da Forno's avatar
Reto Da Forno committed
647
            (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
648
649
650
651
            flocklab.set_test_status(cur, cn, testid, 'synced')
            cur.close()
            cn.close()
        except:
Reto Da Forno's avatar
Reto Da Forno committed
652
            logger.warn("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
653
654
        return errno.ENOPKG
    
655
    return flocklab.SUCCESS
656
657
658
659
660
661
662
663
664
### END stop_fetcher


##############################################################################
#
# Class WorkManager
#
##############################################################################
class WorkManager():
Reto Da Forno's avatar
Reto Da Forno committed
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
    def __init__(self):
        self.worklist = {}
        self.pattern = re.compile("_[0-9].*")
        self.workcount = 0
        
    def _next_item_with_state(self, service, obsid):
        stateitem = list(self.worklist[service][obsid][1][0])
        stateitem[4] = self.worklist[service][obsid][0]
        return tuple(stateitem)
        
    def add(self, item):
        service = self.pattern.sub("",item[3])
        obsid = item[1]
        if service not in self.worklist:
            self.worklist[service] = {}
        if obsid not in self.worklist[service]:
            self.worklist[service][obsid] = [None, []] # workerstate / worklist
        # if list is empty, we're good to process, otherwise just append it and return None
        if len(self.worklist[service][obsid][1]) == 0:
            self.worklist[service][obsid][1].append(item)
            self.workcount = self.workcount + 1
            return self._next_item_with_state(service, obsid)
        else:
            self.worklist[service][obsid][1].append(item)
            self.workcount = self.workcount + 1
            return None
        
    def done(self, item):
        service = self.pattern.sub("",item[3])
        obsid = item[1]
        if item[1:-1] == self.worklist[service][obsid][1][0][1:-1]:
            self.worklist[service][obsid][0] = item[4] # save state
            self.worklist[service][obsid][1].pop(0)
            self.workcount = self.workcount - 1
        else:
            logger.error("work done for item that was not enqueued: %s" % str(item))
        # if there is more work to do, return next item
        if len(self.worklist[service][obsid][1]) > 0:
            return self._next_item_with_state(service, obsid)
        else:
            return None
    
    def finished(self):
        return self.workcount == 0
    
710
711
712
713
714
715
716
717
718
### END WorkManager


##############################################################################
#
# Usage
#
##############################################################################
def usage():
Reto Da Forno's avatar
Reto Da Forno committed
719
    print("Usage: %s --testid=<int> [--stop] [--debug] [--help]" % __file__)
Reto Da Forno's avatar
Reto Da Forno committed
720
721
722
723
724
    print("Options:")
    print("  --testid=<int>\t\tTest ID of test to which incoming data belongs.")
    print("  --stop\t\t\tOptional. Causes the program to stop a possibly running instance of the fetcher.")
    print("  --debug\t\t\tOptional. Print debug messages to log.")
    print("  --help\t\t\tOptional. Print this help.")
725
726
727
728
729
730
731
732
733
### END usage()


##############################################################################
#
# Main
#
##############################################################################
def main(argv):
Reto Da Forno's avatar
Reto Da Forno committed
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
    
    ### Get global variables ###
    global logger
    global debug
    global testid
    global testresultsdir
    global testresultsfile_dict
    global owner_fk
    global pindict
    global obsdict_byid
    global servicedict
    global serialdict
    
    stop = False
    
    # Get logger:
Reto Da Forno's avatar
Reto Da Forno committed
750
    logger = flocklab.get_logger()
Reto Da Forno's avatar
Reto Da Forno committed
751
752
        
    # Get the config file ---
753
    flocklab.load_config()
Reto Da Forno's avatar
Reto Da Forno committed
754
755

    # Get command line parameters ---
756
    try:
Reto Da Forno's avatar
Reto Da Forno committed
757
758
759
760
761
762
763
764
        opts, args = getopt.getopt(argv, "hedt:", ["help", "stop", "debug", "testid="])
    except getopt.GetoptError as err:
        print(str(err))
        logger.warn(str(err))
        usage()
        sys.exit(errno.EINVAL)
    except:
        msg = "Error when getting arguments: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
765
        flocklab.error_logandexit(msg, errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
766
767
768
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
769
            sys.exit(flocklab.SUCCESS)
Reto Da Forno's avatar
Reto Da Forno committed
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
        elif opt in ("-d", "--debug"):
            debug = True
            logger.setLevel(logging.DEBUG)
        elif opt in ("-t", "--testid"):
            try:
                testid = int(arg)
            except ValueError:
                err = "Wrong API usage: testid has to be integer"
                print(str(err))
                logger.warn(str(err))
                usage()
                sys.exit(errno.EINVAL)
        elif opt in ("-e", "--stop"):
            stop = True
        else:
            print("Wrong API usage")
            logger.warn("Wrong API usage")
            sys.exit(errno.EINVAL)
    
    # Check if the necessary parameters are set ---
    if not testid:
        print("Wrong API usage")
        logger.warn("Wrong API usage")
        sys.exit(errno.EINVAL)
        
    # Check if the Test ID exists in the database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
797
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
798
    except:
Reto Da Forno's avatar
Reto Da Forno committed
799
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
800
801
802
803
804
805
    rs = flocklab.check_test_id(cur, testid)
    cur.close()
    cn.close()
    if rs != 0:
        if rs == 3:
            msg = "Test ID %d does not exist in database." %testid
Reto Da Forno's avatar
Reto Da Forno committed
806
            flocklab.error_logandexit(msg, errno.EINVAL)
Reto Da Forno's avatar
Reto Da Forno committed
807
        else:
Reto Da Forno's avatar
Reto Da Forno committed
808
            msg = "Error when trying to get test ID from database: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
809
            flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
810
811
812
813
814
        
    # Add Test ID to logger name ---
    logger.name += " (Test %d)"%testid
    
    # Start / stop the fetcher ---
815
    ret = flocklab.SUCCESS
Reto Da Forno's avatar
Reto Da Forno committed
816
817
818
    if stop:
        ret = stop_fetcher()
        logger.info("FlockLab fetcher stopped.")
Reto Da Forno's avatar
Reto Da Forno committed
819
820
821
822
823
824
        sys.exit(ret)

    # Start the fetcher processes which download data from the observers: 
    ret = start_fetcher()
    if ret == flocklab.SUCCESS:
        logger.info("FlockLab fetcher started.")
Reto Da Forno's avatar
Reto Da Forno committed
825
    else:
Reto Da Forno's avatar
Reto Da Forno committed
826
827
828
829
830
831
832
833
        msg = "Start function returned error. Exiting..."
        os.kill(os.getpid(), signal.SIGTERM)
        rs = flocklab.error_logandexit(msg, ret)
        
    # Get needed metadata ---
    try:
        (cn, cur) = flocklab.connect_to_db()
    except:
Reto Da Forno's avatar
Reto Da Forno committed
834
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
    rs = flocklab.get_test_owner(cur, testid)
    if isinstance(rs, tuple):
        owner_fk = rs[0]
    else:
        owner_fk = None
    rs = flocklab.get_pinmappings(cur)
    if isinstance(rs, dict):
        pindict = rs
    else:
        pindict = None
    rs = flocklab.get_test_obs(cur, testid)
    if isinstance(rs, tuple):
        obsdict_byid = rs[1]
    else:
        obsdict_byid = None
    # Dict for serial service: 'r' means reader (data read from the target), 'w' means writer (data written to the target):
    serialdict = {0: 'r', 1: 'w'}
    rs = flocklab.get_servicemappings(cur)
    if isinstance(rs, dict):
        servicedict = rs
    else:
        servicedict = None
    
    #find out the start and stoptime of the test
    cur.execute("SELECT `time_start_wish`, `time_end_wish` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
    # Times are going to be of datetime type:
    ret = cur.fetchone() 
    teststarttime = ret[0]
    teststoptime  = ret[1]
    FlockDAQ = False
    
    # Find out which services are used to allocate working threads later on ---
    # Get the XML config from the database and check which services are used in the test.
Reto Da Forno's avatar
Reto Da Forno committed
868
    servicesUsed_dict = {'gpiotracing': 'gpioTracingConf', 'powerprofiling': 'powerProfilingConf', 'serial': 'serialConf'}
Reto Da Forno's avatar
Reto Da Forno committed
869
870
871
872
873
874
875
876
877
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
    ret = cur.fetchone()
    if not ret:
        msg = "No XML found in database for testid %d." %testid
        errors.append(msg)
        logger.error(msg)
        for service, xmlname in servicesUsed_dict.items():
            servicesUsed_dict[service] = True
    else:
Reto Da Forno's avatar
Reto Da Forno committed
878
        try:
Reto Da Forno's avatar
Reto Da Forno committed
879
880
881
882
883
            logger.debug("Got XML from database.")
            parser = lxml.etree.XMLParser(remove_comments=True)
            tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
            ns = {'d': flocklab.config.get('xml', 'namespace')}
            for service, xmlname in servicesUsed_dict.items():
884
                if tree.xpath('//d:%s' % xmlname, namespaces=ns):
Reto Da Forno's avatar
Reto Da Forno committed
885
                    servicesUsed_dict[service] = True
886
                    logger.debug("Usage of %s detected." % service)
Reto Da Forno's avatar
Reto Da Forno committed
887
888
                else:
                    servicesUsed_dict[service] = False
Reto Da Forno's avatar
Reto Da Forno committed
889
        except:
Reto Da Forno's avatar
Reto Da Forno committed
890
891
892
893
894
895
896
897
898
899
900
901
902
903
            msg = "XML parsing failed: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
            errors.append(msg)
            logger.error(msg)
    
    cur.close()
    cn.close()
    if ((owner_fk==None) or (pindict==None) or (obsdict_byid==None) or (servicedict==None)):
        msg = "Error when getting metadata.\n"
        msg += "owner_fk: %s\npindict: %s\nobsdict_byid: %s\nservicedict: %s\n" % (str(owner_fk), str(pindict), str(obsdict_byid), str(servicedict))
        msg += "Exiting..."
        logger.debug(msg)
        os.kill(os.getpid(), signal.SIGTERM)
        flocklab.error_logandexit(msg, errno.EAGAIN)
    else:
Reto Da Forno's avatar
Reto Da Forno committed
904
        logger.debug("Got all needed metadata.")
Reto Da Forno's avatar
Reto Da Forno committed
905
906
907
908
909
910
911
912
913
914
915
916
917
    
    # Start aggregating processes ---
    """    There is an infinite loop which gets files to process from the fetcher threads which download data from the observers.
        Downloaded data is then assigned to a worker process for the corresponding service and in the worker process parsed,
        converted (if needed) and aggregated into a single file per service.
        The loop is stopped if the program receives a stop signal. In this case, the loops runs until no more database files 
        are there to be processed.
    """
    if __name__ == '__main__':
        # Create directory and files needed for test results:
        testresultsdir = "%s/%d" %(flocklab.config.get('fetcher', 'testresults_dir'), testid)
        if not os.path.exists(testresultsdir):
            os.makedirs(testresultsdir)
918
            logger.debug("Created %s" % testresultsdir)
Reto Da Forno's avatar
Reto Da Forno committed
919
        manager = multiprocessing.Manager()
Reto Da Forno's avatar
Reto Da Forno committed
920
        for service in ('errorlog', 'gpiotracing', 'powerprofiling', 'serial'):
921
            path = "%s/%s.csv" % (testresultsdir, service)
Reto Da Forno's avatar
Reto Da Forno committed
922
923
924
925
            lock = manager.Lock()
            testresultsfile_dict[service] = (path, lock)
            # Create file and write header:
            if service == 'errorlog':
926
                header = 'timestamp,observer_id,node_id,errormessage\n'
Reto Da Forno's avatar
Reto Da Forno committed
927
            elif service == 'gpiotracing':
928
                header = 'timestamp,observer_id,node_id,pin_name,value\n'
Reto Da Forno's avatar
Reto Da Forno committed
929
            elif service == 'powerprofiling':
Reto Da Forno's avatar
Reto Da Forno committed
930
                header = 'timestamp,observer_id,node_id,I1,V1,V2\n'
Reto Da Forno's avatar
Reto Da Forno committed
931
            elif service == 'serial':
932
                header = 'timestamp,observer_id,node_id,direction,output\n'
Reto Da Forno's avatar
Reto Da Forno committed
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
            lock.acquire()
            f = open(path, 'w')
            f.write(header)
            f.close()
            lock.release()
        # Start logging thread:
        logqueue = manager.Queue(maxsize=10000)
        LogQueueThread_stopEvent = threading.Event()
        try:
            thread = LogQueueThread(logqueue, logger, LogQueueThread_stopEvent)
            thread.start()
        except:
            logger.warn("Error when starting log queue thread: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
        
        # Determine the number of CPU's to be used for each aggregating process. If a service is not used, its CPUs are assigned to other services
        cpus_free = 0
        cpus_errorlog = flocklab.config.getint('fetcher', 'cpus_errorlog')
        # CPUs for serial service:
        if servicesUsed_dict['serial'] == True:
            cpus_serial    = flocklab.config.getint('fetcher', 'cpus_serial')
Reto Da Forno's avatar
Reto Da Forno committed
953
        else:
Reto Da Forno's avatar
Reto Da Forno committed
954
955
956
957
958
            cpus_serial    = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_serial')
        # CPUs for GPIO tracing:
        if servicesUsed_dict['gpiotracing'] == True:
            cpus_gpiomonitoring    = flocklab.config.getint('fetcher', 'cpus_gpiomonitoring')
Reto Da Forno's avatar
Reto Da Forno committed
959
        else:
Reto Da Forno's avatar
Reto Da Forno committed
960
961
962
963
964
            cpus_gpiomonitoring = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_gpiomonitoring')
        # CPUs for powerprofiling:
        if servicesUsed_dict['powerprofiling'] == True:
            cpus_powerprofiling    = flocklab.config.getint('fetcher', 'cpus_powerprofiling')
Reto Da Forno's avatar
Reto Da Forno committed
965
        else:
Reto Da Forno's avatar
Reto Da Forno committed
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
            cpus_powerprofiling = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_powerprofiling')
        # If there are free CPUs left, give them to GPIO tracing and power profiling evenly as these services need the most CPU power:
        if cpus_free > 0:
            if (cpus_powerprofiling > 0) and (cpus_gpiomonitoring > 0):
                # Both services are used, distribute the free CPUS evenly:
                cpus_powerprofiling = cpus_powerprofiling + int(math.ceil(float(cpus_free)/2))
                cpus_gpiomonitoring = cpus_gpiomonitoring + int(math.floor(float(cpus_free)/2))
            elif cpus_powerprofiling > 0:
                # GPIO monitoring/tracing is not used, so give all CPUs to powerprofiling:
                cpus_powerprofiling = cpus_powerprofiling + cpus_free
            elif cpus_gpiomonitoring > 0:
                # Powerprofiling is not used, so give all CPUs to GPIO monitoring/tracing:
                cpus_gpiomonitoring = cpus_gpiomonitoring + cpus_free
            else:
                # Neither of the services is used, so give it to one of the other services:
                if cpus_serial > 0:
                    cpus_serial = cpus_serial + cpus_free
Reto Da Forno's avatar
Reto Da Forno committed
984
        cpus_total = cpus_errorlog + cpus_serial + cpus_gpiomonitoring + cpus_powerprofiling
Reto Da Forno's avatar
Reto Da Forno committed
985
        
Reto Da Forno's avatar
Reto Da Forno committed
986
        service_pools_dict = { 'errorlog': cpus_errorlog, 'serial': cpus_serial, 'gpiotracing': cpus_gpiomonitoring, 'powerprofiling': cpus_powerprofiling }
Reto Da Forno's avatar
Reto Da Forno committed
987
988
        if (cpus_total > multiprocessing.cpu_count()):
            logger.warn("Number of requested CPUs for all aggregating processes (%d) is higher than number of available CPUs (%d) on system." % (cpus_total, multiprocessing.cpu_count()))
Reto Da Forno's avatar
Reto Da Forno committed
989
        
Reto Da Forno's avatar
Reto Da Forno committed
990
991
        # Start a worker process pool for every service:
        for service, cpus in service_pools_dict.items():
992
993
994
995
            if cpus != 1:
                # currently only 1 CPU / process can be used per task since processing functions are NOT thread safe!
                logger.warning("%d is an invalid number of CPUs for service %s, using default value of 1." % (cpus, service))
                cpus = 1
Reto Da Forno's avatar
Reto Da Forno committed
996
997
998
999
            if cpus > 0:
                pool = multiprocessing.Pool(processes=cpus)
                logger.debug("Created pool for %s workers with %d processes" % (service, cpus))
                service_pools_dict[service] = pool
Reto Da Forno's avatar
Reto Da Forno committed
1000
            else:
For faster browsing, not all history is shown. View entire blame