flocklab_fetcher.py 51.3 KB
Newer Older
1
2
#! /usr/bin/env python3

3
import os, sys, getopt, traceback, MySQLdb, signal, random, time, errno, multiprocessing, subprocess, re, logging, __main__, threading, struct, types, queue, math, shutil, lxml.etree
4
5
import lib.daemon as daemon
import lib.flocklab as flocklab
Reto Da Forno's avatar
Reto Da Forno committed
6
7
from rocketlogger.data import RocketLoggerData
import pandas as pd
Reto Da Forno's avatar
Reto Da Forno committed
8
import numpy as np
Reto Da Forno's avatar
Reto Da Forno committed
9

10

Reto Da Forno's avatar
Reto Da Forno committed
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
logger                   = None
debug                    = False
testid                   = None 
errors                   = []
FetchObsThread_list      = []
FetchObsThread_stopEvent = None
FetchObsThread_queue     = None
obsfiledir               = None
testresultsdir           = None
testresultsfile_dict     = {}
mainloop_stop            = False
owner_fk                 = None
pindict                  = None
obsdict_byid             = None
servicedict              = None
serialdict               = None
27
28

ITEM_TO_PROCESS = 0
Reto Da Forno's avatar
Reto Da Forno committed
29
ITEM_PROCESSED  = 1
30

31

32
33
34
35
36
37
##############################################################################
#
# Error classes
#
##############################################################################
class DbFileEof(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
38
    pass
39
40

class DbFileReadError(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
41
42
43
44
    def __init__(self, expectedSize, actualSize, fpos):
        self.expectedSize = expectedSize
        self.actualSize = actualSize
        self.fpos = fpos
45
46
47
48
49
50
51
52
53
54
### END Error classes



##############################################################################
#
# Class ServiceInfo
#
##############################################################################
class ServiceInfo():
Reto Da Forno's avatar
Reto Da Forno committed
55
56
57
    def __init__(self, servicename):
        self.servicename = servicename
        self.files = []
58
        self.pattern = "^%s_[0-9]+\.[a-z]+$" % servicename
Reto Da Forno's avatar
Reto Da Forno committed
59
60
61
62
63
64
65
66
67
68
69
    
    def matchFileName(self, filename):
        return re.search(self.pattern, os.path.basename(filename)) is not None
        
    def addFile(self, filename):
        self.files.append(filename)
    
    def stripFileList(self, removelast=True):
        self.files.sort()
        if ((len(self.files) > 0) and removelast):
            self.files.pop()
70
71
72
73
74
75
76
77
78
### END ServiceInfo


##############################################################################
#
# sigterm_handler
#
##############################################################################
def sigterm_handler(signum, frame):
Reto Da Forno's avatar
Reto Da Forno committed
79
80
81
82
83
84
85
86
87
88
89
    """If the program is terminated by sending it the signal SIGTERM 
    (e.g. by executing 'kill') or SIGINT (pressing ctrl-c), 
    this signal handler is invoked for cleanup."""
    
    global mainloop_stop
    global FetchObsThread_stopEvent

    logger.info("Process received SIGTERM or SIGINT signal")
        
    # Signal all observer fetcher threads to stop:
    logger.debug("Stopping observer fetcher threads...")
Reto Da Forno's avatar
Reto Da Forno committed
90
    shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
Reto Da Forno's avatar
Reto Da Forno committed
91
92
93
94
95
96
97
98
99
100
101
102
    try:
        FetchObsThread_stopEvent.set()
    except:
        pass
    for thread in FetchObsThread_list:
        try:
            thread.join(shutdown_timeout)
        except:
            logger.warn("Fetcher thread did not stop within %d seconds." % shutdown_timeout)
    # Set DB status:
    logger.debug("Setting test status in DB to 'syncing'...")
    try:
Reto Da Forno's avatar
Reto Da Forno committed
103
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
104
105
106
107
        flocklab.set_test_status(cur, cn, testid, 'syncing')
        cur.close()
        cn.close()
    except:
Reto Da Forno's avatar
Reto Da Forno committed
108
        logger.warn("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
109
110
111
112
    
    # Tell the main loop to stop:
    mainloop_stop = True
    logger.debug("Set stop signal for main loop.")
113
114
115
116
117
118
119
120
121
### END sigterm_handler


##############################################################################
#
# Functions for parsing observer DB files data
#
##############################################################################
def parse_serial(buf):
Reto Da Forno's avatar
Reto Da Forno committed
122
123
    _data = struct.unpack("iii%ds" % (len(buf) - 12), buf) #int service; struct timeval timestamp;char * data
    return (_data[0], _data[3], "%i.%06i" % (_data[1], _data[2]))
124
125
126
127
128
129
130
131


##############################################################################
#
# Functions for converting observer DB data
#
##############################################################################
def convert_serial(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
132
    try:
133
        result = "%s,%s,%s,%s,%s\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1].decode('utf8').rstrip())
Reto Da Forno's avatar
Reto Da Forno committed
134
135
136
    except UnicodeDecodeError:
        result = "%s,%s,%s,%s,%s\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]], str(obsdata[1]))
    return result
137
138
139
140
141
142


##############################################################################
#
# read_from_db_file: Read from an open DB file from an observer
#
143
##############################################################################
144
def read_from_db_file(dbfile):
Reto Da Forno's avatar
Reto Da Forno committed
145
146
147
148
149
150
151
152
153
154
155
156
    _buf = dbfile.read(4)
    if len(_buf) < 4:
        dbfile.close()
        raise DbFileEof()
    else:
        _size = struct.unpack("<I",_buf)
        _buf = dbfile.read(_size[0])
        if len(_buf) != _size[0]:
            _fpos = dbfile.tell() - 4 - len(_buf)
            dbfile.close()
            raise DbFileReadError(_size[0], len(_buf), _fpos)
        return _buf
157
158
159
160
161
162
### END read_from_db_file


##############################################################################
#
# worker_convert_and_aggregate: Worker function for multiprocessing pools.
Reto Da Forno's avatar
Reto Da Forno committed
163
164
#        Parses observer DB files for all services, converts values (if needed)
#        and aggregates them into single test result files.
165
166
167
#
##############################################################################
def worker_convert_and_aggregate(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, commitsize=1, vizimgdir=None, parse_f=None, convert_f=None, viz_f=None, logqueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
168
169
170
171
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
Reto Da Forno's avatar
Reto Da Forno committed
172
        obsdbfile_path = "%s/%s" % (fdir,f)
Reto Da Forno's avatar
Reto Da Forno committed
173
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
        #logqueue.put_nowait((loggername, logging.DEBUG, "Import file %s"%obsdbfile_path))
        # Open file:
        dbfile = open(obsdbfile_path, 'rb')
        rows = 0
        viz_values = []
        conv_values = []
        while not dbfile.closed:
            # Process DB file line by line:
            try:
                # Parse one line:
                buf = read_from_db_file(dbfile)
                obsdata = parse_f(buf)
                viz_values.append(obsdata)
                # Convert data if needed:
                if convert_f != None:
                    conv_data = convert_f(obsdata, obsid, nodeid)
                    conv_values.append(conv_data)
                    rows += 1
                # Visualize data:
                if (commitsize > 0) & (rows >= commitsize):
                    if viz_f != None:
                        #logqueue.put_nowait((loggername, logging.DEBUG, "Viz started..."))
                        viz_f(testid, owner_fk, viz_values, obsid, vizimgdir, logger)
                        #logqueue.put_nowait((loggername, logging.DEBUG, "Viz done."))
                    # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
199
                    #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for writing..." % (resultfile_path)))
Reto Da Forno's avatar
Reto Da Forno committed
200
201
202
203
204
                    resultfile_lock.acquire()
                    f = open(resultfile_path, 'a')
                    f.writelines(conv_values)
                    f.close()
                    resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
205
                    logqueue.put_nowait((loggername, logging.DEBUG, "Committed results to %s after %d rows" % (resultfile_path, rows)))
Reto Da Forno's avatar
Reto Da Forno committed
206
207
208
209
210
211
212
213
214
215
                    rows = 0
                    conv_values = []
                    viz_values = []
            except DbFileEof:
                # logqueue.put_nowait((loggername, logging.DEBUG, "DbFileEof has occurred."))
                break # dbfile has been closed in parser (most likely because EOF was reached)
            except DbFileReadError as err:
                msg = "%s: Packet size (%i) did not match payload size (%i) @ %d." %(obsdbfile_path, err.expectedSize, err.actualSize, err.fpos)
                _errors.append((msg, errno.EIO, obsid))
                logqueue.put_nowait((loggername, logging.ERROR, msg))
216
217
218
219
220
221
222
        if (len(conv_values) > 0):
            # There is still data left. Do a last commit:
            if (viz_f != None) and (len(viz_values) > 0):
                #logqueue.put_nowait((loggername, logging.DEBUG, "Viz started..."))
                viz_f(testid, owner_fk, viz_values, obsid, vizimgdir, logger)
                #logqueue.put_nowait((loggername, logging.DEBUG, "Viz done."))
            # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
223
            #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for final writing..." % (resultfile_path)))
224
225
226
227
228
            resultfile_lock.acquire()
            f = open(resultfile_path, 'a')
            f.writelines(conv_values)
            f.close()
            resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
229
            logqueue.put_nowait((loggername, logging.DEBUG, "Committed final results to %s after %d rows" % (resultfile_path, rows)))
230
        # Remove processed file:
Reto Da Forno's avatar
Reto Da Forno committed
231
        #logqueue.put_nowait((loggername, logging.DEBUG, "Remove %s" % (obsdbfile_path)))
232
        os.unlink(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
233
234
235
236
237
238
239
240
    except:
        msg = "Error in worker process: %s: %s\n%s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
241
242
243
### END worker_convert_and_aggregate


Reto Da Forno's avatar
Reto Da Forno committed
244
245
246
247
248
249
250
##############################################################################
#
# worker_gpiotracing: Worker function for converting and aggregating gpio
#               tracing data. Unlike for the other services, this function works on
#               whole observer DB files.
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
251
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
Reto Da Forno's avatar
Reto Da Forno committed
252
253
    try:
        _errors = []
254
        cur_p = multiprocessing.current_process()
Reto Da Forno's avatar
Reto Da Forno committed
255
        (itemtype, obsid, fdir, f, workerstate) = queueitem
256
        obsdbfile_path = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
257
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
258
259

        with open(resultfile_path, "a") as outfile:
260
261
262
            infile = open(obsdbfile_path, "r")
            for line in infile:
                try:
263
264
                    (timestamp, pin, level) = line.strip().split(',', 2)
                    outfile.write("%s,%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, pin, level))
265
266
267
268
269
                except ValueError:
                    logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
                    break
            infile.close()
        os.remove(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
270
    except:
Reto Da Forno's avatar
Reto Da Forno committed
271
        msg = "Error in gpiotracing worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
272
273
274
275
276
277
278
279
280
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_gpiotracing


281
282
283
##############################################################################
#
# worker_powerprof: Worker function for converting and aggregating power
Reto Da Forno's avatar
Reto Da Forno committed
284
285
#        profiling data. Unlike for the other services, this function works on
#        whole observer DB files.
286
287
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
288
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
Reto Da Forno's avatar
Reto Da Forno committed
289
290
291
292
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
293
        obsdbfile_path = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
294
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
295

Reto Da Forno's avatar
Reto Da Forno committed
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
        if arg and arg == 'rld':
            # RLD file format
            # simply move the file into the results directory
            try:
                resfilename = "%s.%s.%s.rld" % (os.path.splitext(resultfile_path)[0], obsid, nodeid)
                os.rename(obsdbfile_path, resfilename)
            except FileExistsError:
                # TODO: properly handle case where file already exists (several rld files per observer)
                msg = "File '%s' already exists, dropping test results." % (resfilename)
                _errors.append((msg, errno.EEXIST, obsid))
                logqueue.put_nowait((loggername, logging.ERROR, msg))
        else:
            # CSV file format
            rld_data = RocketLoggerData(obsdbfile_path).merge_channels()
            # get network time and convert to UNIX timestamp (UTC)
            timeidx = rld_data.get_time(absolute_time=True, time_reference='network')     # TODO adjust parameters for RL 1.99+
            timeidxunix = timeidx.astype('uint64') / 1e9   # convert to s
            current_ch = rld_data.get_data('I1') * 1000    # convert to mA
            voltage_ch = rld_data.get_data('V2') - rld_data.get_data('V1')    # voltage difference
            rld_dataframe = pd.DataFrame(np.hstack((current_ch, voltage_ch)), index=timeidxunix, columns=['I', 'V'])
            rld_dataframe.insert(0, 'observer_id', obsid)
            rld_dataframe.insert(1, 'node_id', nodeid)
            rld_dataframe.to_csv(resultfile_path, sep=',', index_label='time', header=False, mode='a')
Reto Da Forno's avatar
Reto Da Forno committed
319

Reto Da Forno's avatar
Reto Da Forno committed
320
            os.remove(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
321
322
323
324
325
    except:
        msg = "Error in powerprof worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
Reto Da Forno's avatar
Reto Da Forno committed
326
327
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
Reto Da Forno's avatar
Reto Da Forno committed
328
329
330
331
332
333
        return (_errors, tuple(processeditem))
### END worker_powerprof


##############################################################################
#
334
# worker_logs
Reto Da Forno's avatar
Reto Da Forno committed
335
336
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
337
def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
Reto Da Forno's avatar
Reto Da Forno committed
338
339
340
341
342
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
        obsdbfile_path = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
343
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
344
345
346
347

        with open(resultfile_path, "a") as outfile:
            infile = open(obsdbfile_path, "r")
            for line in infile:
348
349
                (timestamp, msg) = line.strip().split(',', 1)
                outfile.write("%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, msg))
Reto Da Forno's avatar
Reto Da Forno committed
350
351
            infile.close()
        os.remove(obsdbfile_path)
Reto Da Forno's avatar
Reto Da Forno committed
352
    except:
353
        msg = "Error in logs worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
354
355
356
357
358
359
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
360
### END worker_logs()
361
362
363
364
365


##############################################################################
#
# worker_callback: Callback function which reports errors from worker processes
Reto Da Forno's avatar
Reto Da Forno committed
366
#        back to the main process
367
368
369
#
##############################################################################
def worker_callback(result):
Reto Da Forno's avatar
Reto Da Forno committed
370
371
372
373
374
    global errors
    global FetchObsThread_queue
    
    if len(result[0]) > 0:
        for (err, eno, obsid) in result:
Reto Da Forno's avatar
Reto Da Forno committed
375
            msg = "Error %d when processing results for Observer ID %s: %s" % (eno, obsid, err)
Reto Da Forno's avatar
Reto Da Forno committed
376
377
378
379
380
381
382
383
            errors.append(msg)
    
    try:
        FetchObsThread_queue.put(item=result[1], block=True, timeout=10)
    except queue.Full:
        msg = "Queue full after processing element"
        logger.error(msg)
    return 0
384
385
386
387
388
389
390
391
392
### END worker_callback


##############################################################################
#
# LogQueueThread
#
##############################################################################
class LogQueueThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
    """    Thread which logs from queue to logfile.
    """ 
    def __init__(self, logqueue, logger, stopEvent):
        threading.Thread.__init__(self) 
        self._logger        = logger
        self._stopEvent        = stopEvent
        self._logqueue        = logqueue

    def run(self):
        self._logger.info("LogQueueThread started")
            
        # Let thread run until someone calls terminate() on it:
        while not self._stopEvent.is_set():
            try:
                (loggername, loglevel, msg) = self._logqueue.get(block=True, timeout=1)
                self._logger.log(loglevel, loggername + msg)
            except queue.Empty:
                pass
411
        
Reto Da Forno's avatar
Reto Da Forno committed
412
413
        # Stop the process:
        self._logger.info("LogQueueThread stopped")
414
415
416
417
418
419
420
421
422
### END LogQueueThread


##############################################################################
#
# FetchObsThread
#
##############################################################################
class FetchObsThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
423
424
    """    Thread which downloads database files from an observer to the server.
    """ 
Reto Da Forno's avatar
Reto Da Forno committed
425
    def __init__(self, obsid, obsethernet, dirname, debugdirname, workQueue, stopEvent):
Reto Da Forno's avatar
Reto Da Forno committed
426
427
        threading.Thread.__init__(self) 
        self._obsid            = obsid
428
429
430
        self._obsethernet      = obsethernet
        self._obsfiledir       = dirname
        self._obsfiledebugdir  = debugdirname
Reto Da Forno's avatar
Reto Da Forno committed
431
432
        self._workQueue        = workQueue
        self._stopEvent        = stopEvent
433
        self._logger           = logger
Reto Da Forno's avatar
Reto Da Forno committed
434
        
Reto Da Forno's avatar
Reto Da Forno committed
435
436
        self._min_sleep        = flocklab.config.getint("fetcher", "min_sleeptime")
        self._max_randsleep    = flocklab.config.getint("fetcher", "max_rand_sleeptime")
437
        self._obstestresfolder = "%s/%d" % (flocklab.config.get("observer", "testresultfolder"), testid)
Reto Da Forno's avatar
Reto Da Forno committed
438
439
440
        
    def run(self):
        try:
Reto Da Forno's avatar
Reto Da Forno committed
441
            self._loggerprefix = "(FetchObsThread.%d) "%self._obsid
Reto Da Forno's avatar
Reto Da Forno committed
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
            self._logger.info(self._loggerprefix + "FetchObsThread starting...")
            removelast = True
                
            # Let thread run until someone calls terminate() on it:
            while removelast == True:
                """ Get data from the observer over SCP. 
                Then request data from the observer and store it in the server's filesystem. 
                Then sleep some random time before fetching data again.
                """
                # Wait for some random time:
                waittime =self._min_sleep + random.randrange(0,self._max_randsleep)
                #DEBUG self._logger.debug(self._loggerprefix + "Going to wait for %d seconds" %(waittime))
                self._stopEvent.wait(waittime) # The wait will be interrupted if the stop signal has been set causing the thread to download all remaining files
                if self._stopEvent.is_set():
                    removelast = False
                #self._logger.debug(self._loggerprefix + "Woke up")
                # Get list of available files
Reto Da Forno's avatar
Reto Da Forno committed
459
                cmd = ['ssh' , self._obsethernet, "ls %s/" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
460
461
462
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)  # universal_newlines makes sure that a string is returned instead of a byte object
                out, err = p.communicate(None)
                rs = p.returncode
463
                if (rs == flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
464
                    services = {}
465
                    for servicename in [ "gpio_monitor", "powerprofiling", "serial", "error", "timesync" ]:
Reto Da Forno's avatar
Reto Da Forno committed
466
467
468
469
470
471
                        services[servicename] = ServiceInfo(servicename)
                    # Read filenames
                    for dbfile in out.split():
                        # Check name and append to corresponding list
                        for service in services.values():
                            if service.matchFileName(dbfile):
472
                                service.addFile("%s/%s" % (self._obstestresfolder, dbfile))
Reto Da Forno's avatar
Reto Da Forno committed
473
474
475
476
477
478
479
480
                                break
                    copyfilelist = []
                    # Remove latest from each list as the observer might still be writing into it (unless stop event has been set).
                    for service in services.values():
                        service.stripFileList(removelast)
                        for dbfile in service.files:
                            copyfilelist.append(dbfile)
                        #if (len(service.files) > 0):
481
                        #    self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (" ".join(service.files), service.servicename))
Reto Da Forno's avatar
Reto Da Forno committed
482
483
484
                    
                    if len(copyfilelist) > 0:
                        # Download the database files:
Reto Da Forno's avatar
Reto Da Forno committed
485
                        self._logger.debug(self._loggerprefix + "Downloading results files %s" % (" ".join(copyfilelist)))
Reto Da Forno's avatar
Reto Da Forno committed
486
                        cmd = ['scp', '-q' ]
Reto Da Forno's avatar
Reto Da Forno committed
487
                        cmd.extend(["%s:%s" % (self._obsethernet, x) for x in copyfilelist])
488
                        cmd.append("%s/" % self._obsfiledir)
Reto Da Forno's avatar
Reto Da Forno committed
489
490
491
492
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
                        if rs != 0:
Reto Da Forno's avatar
Reto Da Forno committed
493
                            self._logger.debug(self._loggerprefix + "Could not download all results files from observer. Dataloss occurred for this observer.")
Reto Da Forno's avatar
Reto Da Forno committed
494
                            self._logger.debug(self._loggerprefix + "Tried to execute %s, result was %d, stdout: %s, error: %s" % (str(cmd), rs, out, err))
Reto Da Forno's avatar
Reto Da Forno committed
495
                        else:
Reto Da Forno's avatar
Reto Da Forno committed
496
                            self._logger.debug("Downloaded results files from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
497
498
499
                            # put a copy to the debug directory
                            for f in copyfilelist:
                                fname = os.path.basename(f)
Reto Da Forno's avatar
Reto Da Forno committed
500
                                shutil.copyfile("%s/%s" % (self._obsfiledir, fname), "%s/%s" % (self._obsfiledebugdir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
501
502
503
504
505
506
507
508
                            # Tell the fetcher to start working on the files:
                            for f in copyfilelist:
                                fname = os.path.basename(f)
                                try:
                                    self._workQueue.put(item=(ITEM_TO_PROCESS, self._obsid, self._obsfiledir, fname, None), block=True, timeout=10)
                                except queue.Full:
                                    # Make sure the file is downloaded again at a later point:
                                    copyfilelist.remove(f)
Reto Da Forno's avatar
Reto Da Forno committed
509
510
                                    os.unlink("%s/%s" % (self._obsfiledir, fname))
                                    self._logger.warn(self._loggerprefix + "FetchObsThread queue is full. Cannot put %s/%s on it." % (self._obsfiledir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
511
512
                            # Remove remote files if any are left:
                            if (len(copyfilelist) > 0):
513
                                cmd = ['ssh' ,'%s'%(self._obsethernet), "cd %s;"%self._obstestresfolder, "rm"]
Reto Da Forno's avatar
Reto Da Forno committed
514
515
516
517
518
                                cmd.extend(copyfilelist)
                                self._logger.debug(self._loggerprefix + "Removing files on observer...")
                                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                                out, err = p.communicate(None)
                                rs = p.wait()
519
                                if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
520
                                    self._logger.error(self._loggerprefix + "Could not remove files on observer, result was %d, error: %s" % (rs, err))
Reto Da Forno's avatar
Reto Da Forno committed
521
522
523
524
                            else:
                                self._logger.debug(self._loggerprefix + "No files left to delete on observer.")
                    else:
                        self._logger.debug(self._loggerprefix + "No files to download from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
525

Reto Da Forno's avatar
Reto Da Forno committed
526
                    if removelast == False: # this is the last execution of the while loop
527
                        cmd = ['ssh' ,'%s'%(self._obsethernet), "rm -rf %s" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
528
529
530
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
531
                        if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
532
                            self._logger.error(self._loggerprefix + "Could not remove results directory from observer, result was %d. Error: %s" % (rs, err.strip()))
Reto Da Forno's avatar
Reto Da Forno committed
533

Reto Da Forno's avatar
Reto Da Forno committed
534
                else:
Reto Da Forno's avatar
Reto Da Forno committed
535
                    self._logger.error(self._loggerprefix + "SSH to observer did not succeed, fetcher thread terminated with code %d. Error: %s" % (rs, err.strip()))
Reto Da Forno's avatar
Reto Da Forno committed
536
                    break   # abort
Reto Da Forno's avatar
Reto Da Forno committed
537
538
        
        except:
Reto Da Forno's avatar
Reto Da Forno committed
539
            logger.error(self._loggerprefix + "FetchObsThread crashed: %s, %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
Reto Da Forno's avatar
Reto Da Forno committed
540
541
542
        
        # Stop the process:
        self._logger.info(self._loggerprefix + "FetchObsThread stopped")
543
544
545
546
547
548
549
550
551
### END FetchObsThread


##############################################################################
#
# Start Fetcher
#
##############################################################################
def start_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
552
553
554
555
556
557
558
559
560
561
562
563
564
    global obsfiledir
    global FetchObsThread_list
    global FetchObsThread_queue
    global FetchObsThread_stopEvent
    global obsfetcher_dict
    
    # Daemonize the process ---
    daemon.daemonize(None, closedesc=False)
    logger.info("Daemon started")
    logger.info("Going to fetch data for test ID %d" %testid)
        
    # Get needed metadata from database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
565
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
566
    except:
Reto Da Forno's avatar
Reto Da Forno committed
567
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
568
    try:
569
570
571
572
573
        cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address 
                       FROM `tbl_serv_observer` AS `a` 
                       LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk 
                       WHERE `b`.test_fk = %d GROUP BY `a`.observer_id;
                    """ % testid)
Reto Da Forno's avatar
Reto Da Forno committed
574
575
    except MySQLdb.Error as err:
        msg = str(err)
Reto Da Forno's avatar
Reto Da Forno committed
576
        flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
577
578
579
580
581
582
583
584
585
586
587
588
    except:
        logger.warn("Error %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
    rs = cur.fetchall()
    cur.close()
    cn.close()
    logger.debug("Got list of FlockLab observers from database.")
    if not rs:
        logger.info("No observers found for this test. Nothing has to be done, thus exiting...")
        return errno.ENODATA
        
    # Start fetcher threads ---
    # Create a directory structure to store the downloaded files from the DB:
589
    obsfiledir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
590
591
    if not os.path.exists(obsfiledir):
        os.makedirs(obsfiledir)
592
    obsfiledebugdir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_debug_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
593
594
595
596
597
598
599
600
601
    if not os.path.exists(obsfiledebugdir):
        os.makedirs(obsfiledebugdir)
        #DEBUG logger.debug("Created %s"%obsfiledir)
    # Start one fetching thread per observer
    FetchObsThread_stopEvent = threading.Event()
    FetchObsThread_queue = queue.Queue(maxsize=10000)
    for observer in rs:
        obsid = int(observer[0])
        # Create needed directories:
Reto Da Forno's avatar
Reto Da Forno committed
602
        dirname = "%s/%d" % (obsfiledir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
603
604
        if (not os.path.exists(dirname)):
            os.makedirs(dirname)
Reto Da Forno's avatar
Reto Da Forno committed
605
        debugdirname = "%s/%d" % (obsfiledebugdir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
606
607
608
609
        if (not os.path.exists(debugdirname)):
            os.makedirs(debugdirname)
        # Start thread: 
        try:
Reto Da Forno's avatar
Reto Da Forno committed
610
            thread = FetchObsThread(obsid, observer[1], dirname, debugdirname, FetchObsThread_queue, FetchObsThread_stopEvent)
Reto Da Forno's avatar
Reto Da Forno committed
611
612
            FetchObsThread_list.append(thread)
            thread.start()
Reto Da Forno's avatar
Reto Da Forno committed
613
            logger.debug("Started fetcher thread for observer %d" % (obsid))
Reto Da Forno's avatar
Reto Da Forno committed
614
        except:
Reto Da Forno's avatar
Reto Da Forno committed
615
616
            logger.warn("Error when starting fetcher thread for observer %d: %s, %s" % (obsid, str(sys.exc_info()[0]), str(sys.exc_info()[1])))
            continue
Reto Da Forno's avatar
Reto Da Forno committed
617
    
618
    return flocklab.SUCCESS
619
620
621
622
623
624
625
626
627
### END start_fetcher


##############################################################################
#
# Stop Fetcher
#
##############################################################################
def stop_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
628
629
630
631
632
633
634
    # Get oldest running instance of the fetcher for the selected test ID which is the main process and send it the terminate signal:
    try:
        pid = flocklab.get_fetcher_pid(testid)
        # Signal the process to stop:
        if (pid > 0):
            # Do not stop this instance if it is the only one running:
            if (pid == os.getpid()):
Reto Da Forno's avatar
Reto Da Forno committed
635
636
                raise ValueError
            logger.debug("Sending SIGTERM signal to process %d" % pid)
Reto Da Forno's avatar
Reto Da Forno committed
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
            os.kill(pid, signal.SIGTERM)
            # wait for process to finish (timeout..)
            shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
            pidpath = "/proc/%d" % pid
            while os.path.exists(pidpath) & (shutdown_timeout > 0):
                time.sleep(1)
                shutdown_timeout = shutdown_timeout - 1
            if os.path.exists(pidpath):
                logger.error("Fetcher with PID %d is still running, killing process..." % pid)
                # send kill signal
                os.kill(pid, signal.SIGKILL)
                time.sleep(3)
                # check if there is a remaining fetcher process
                pid = flocklab.get_fetcher_pid(testid)
                if pid > 0 and pid != os.getpid():
                    logger.warn("Found a remaining fetcher thread with PID %d, killing it now..." % (pid))
653
                    os.kill(pid, signal.SIGKILL)
Reto Da Forno's avatar
Reto Da Forno committed
654
                raise ValueError
Reto Da Forno's avatar
Reto Da Forno committed
655
        else:
Reto Da Forno's avatar
Reto Da Forno committed
656
            raise ValueError
657
    except ValueError:
Reto Da Forno's avatar
Reto Da Forno committed
658
659
660
        # Set DB status in order to allow dispatcher and scheduler to go on.:
        logger.debug("Setting test status in DB to 'synced'...")
        try:
Reto Da Forno's avatar
Reto Da Forno committed
661
            (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
662
663
664
665
            flocklab.set_test_status(cur, cn, testid, 'synced')
            cur.close()
            cn.close()
        except:
Reto Da Forno's avatar
Reto Da Forno committed
666
            logger.warn("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
667
668
        return errno.ENOPKG
    
669
    return flocklab.SUCCESS
670
671
672
673
674
675
676
677
678
### END stop_fetcher


##############################################################################
#
# Class WorkManager
#
##############################################################################
class WorkManager():
Reto Da Forno's avatar
Reto Da Forno committed
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
    def __init__(self):
        self.worklist = {}
        self.pattern = re.compile("_[0-9].*")
        self.workcount = 0
        
    def _next_item_with_state(self, service, obsid):
        stateitem = list(self.worklist[service][obsid][1][0])
        stateitem[4] = self.worklist[service][obsid][0]
        return tuple(stateitem)
        
    def add(self, item):
        service = self.pattern.sub("",item[3])
        obsid = item[1]
        if service not in self.worklist:
            self.worklist[service] = {}
        if obsid not in self.worklist[service]:
            self.worklist[service][obsid] = [None, []] # workerstate / worklist
        # if list is empty, we're good to process, otherwise just append it and return None
        if len(self.worklist[service][obsid][1]) == 0:
            self.worklist[service][obsid][1].append(item)
            self.workcount = self.workcount + 1
            return self._next_item_with_state(service, obsid)
        else:
            self.worklist[service][obsid][1].append(item)
            self.workcount = self.workcount + 1
            return None
        
    def done(self, item):
        service = self.pattern.sub("",item[3])
        obsid = item[1]
        if item[1:-1] == self.worklist[service][obsid][1][0][1:-1]:
            self.worklist[service][obsid][0] = item[4] # save state
            self.worklist[service][obsid][1].pop(0)
            self.workcount = self.workcount - 1
        else:
            logger.error("work done for item that was not enqueued: %s" % str(item))
        # if there is more work to do, return next item
        if len(self.worklist[service][obsid][1]) > 0:
            return self._next_item_with_state(service, obsid)
        else:
            return None
    
    def finished(self):
        return self.workcount == 0
    
724
725
726
727
728
729
730
731
732
### END WorkManager


##############################################################################
#
# Usage
#
##############################################################################
def usage():
Reto Da Forno's avatar
Reto Da Forno committed
733
    print("Usage: %s --testid=<int> [--stop] [--debug] [--help]" % __file__)
Reto Da Forno's avatar
Reto Da Forno committed
734
735
736
737
738
    print("Options:")
    print("  --testid=<int>\t\tTest ID of test to which incoming data belongs.")
    print("  --stop\t\t\tOptional. Causes the program to stop a possibly running instance of the fetcher.")
    print("  --debug\t\t\tOptional. Print debug messages to log.")
    print("  --help\t\t\tOptional. Print this help.")
739
740
741
742
743
744
745
746
747
### END usage()


##############################################################################
#
# Main
#
##############################################################################
def main(argv):
Reto Da Forno's avatar
Reto Da Forno committed
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
    
    ### Get global variables ###
    global logger
    global debug
    global testid
    global testresultsdir
    global testresultsfile_dict
    global owner_fk
    global pindict
    global obsdict_byid
    global servicedict
    global serialdict
    
    stop = False
    
    # Get logger:
Reto Da Forno's avatar
Reto Da Forno committed
764
    logger = flocklab.get_logger()
Reto Da Forno's avatar
Reto Da Forno committed
765
766
        
    # Get the config file ---
767
    flocklab.load_config()
Reto Da Forno's avatar
Reto Da Forno committed
768
769

    # Get command line parameters ---
770
    try:
Reto Da Forno's avatar
Reto Da Forno committed
771
772
773
774
775
776
777
778
        opts, args = getopt.getopt(argv, "hedt:", ["help", "stop", "debug", "testid="])
    except getopt.GetoptError as err:
        print(str(err))
        logger.warn(str(err))
        usage()
        sys.exit(errno.EINVAL)
    except:
        msg = "Error when getting arguments: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
779
        flocklab.error_logandexit(msg, errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
780
781
782
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
783
            sys.exit(flocklab.SUCCESS)
Reto Da Forno's avatar
Reto Da Forno committed
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
        elif opt in ("-d", "--debug"):
            debug = True
            logger.setLevel(logging.DEBUG)
        elif opt in ("-t", "--testid"):
            try:
                testid = int(arg)
            except ValueError:
                err = "Wrong API usage: testid has to be integer"
                print(str(err))
                logger.warn(str(err))
                usage()
                sys.exit(errno.EINVAL)
        elif opt in ("-e", "--stop"):
            stop = True
        else:
            print("Wrong API usage")
            logger.warn("Wrong API usage")
            sys.exit(errno.EINVAL)
    
    # Check if the necessary parameters are set ---
    if not testid:
        print("Wrong API usage")
        logger.warn("Wrong API usage")
        sys.exit(errno.EINVAL)
        
    # Check if the Test ID exists in the database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
811
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
812
    except:
Reto Da Forno's avatar
Reto Da Forno committed
813
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
814
815
816
817
818
819
    rs = flocklab.check_test_id(cur, testid)
    cur.close()
    cn.close()
    if rs != 0:
        if rs == 3:
            msg = "Test ID %d does not exist in database." %testid
Reto Da Forno's avatar
Reto Da Forno committed
820
            flocklab.error_logandexit(msg, errno.EINVAL)
Reto Da Forno's avatar
Reto Da Forno committed
821
        else:
Reto Da Forno's avatar
Reto Da Forno committed
822
            msg = "Error when trying to get test ID from database: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
823
            flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
824
825
826
827
828
        
    # Add Test ID to logger name ---
    logger.name += " (Test %d)"%testid
    
    # Start / stop the fetcher ---
829
    ret = flocklab.SUCCESS
Reto Da Forno's avatar
Reto Da Forno committed
830
831
832
    if stop:
        ret = stop_fetcher()
        logger.info("FlockLab fetcher stopped.")
Reto Da Forno's avatar
Reto Da Forno committed
833
834
835
836
837
838
        sys.exit(ret)

    # Start the fetcher processes which download data from the observers: 
    ret = start_fetcher()
    if ret == flocklab.SUCCESS:
        logger.info("FlockLab fetcher started.")
Reto Da Forno's avatar
Reto Da Forno committed
839
    else:
Reto Da Forno's avatar
Reto Da Forno committed
840
841
842
843
844
845
846
847
        msg = "Start function returned error. Exiting..."
        os.kill(os.getpid(), signal.SIGTERM)
        rs = flocklab.error_logandexit(msg, ret)
        
    # Get needed metadata ---
    try:
        (cn, cur) = flocklab.connect_to_db()
    except:
Reto Da Forno's avatar
Reto Da Forno committed
848
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
    rs = flocklab.get_test_owner(cur, testid)
    if isinstance(rs, tuple):
        owner_fk = rs[0]
    else:
        owner_fk = None
    rs = flocklab.get_pinmappings(cur)
    if isinstance(rs, dict):
        pindict = rs
    else:
        pindict = None
    rs = flocklab.get_test_obs(cur, testid)
    if isinstance(rs, tuple):
        obsdict_byid = rs[1]
    else:
        obsdict_byid = None
    # Dict for serial service: 'r' means reader (data read from the target), 'w' means writer (data written to the target):
    serialdict = {0: 'r', 1: 'w'}
    rs = flocklab.get_servicemappings(cur)
    if isinstance(rs, dict):
        servicedict = rs
    else:
        servicedict = None
    
    #find out the start and stoptime of the test
    cur.execute("SELECT `time_start_wish`, `time_end_wish` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
    # Times are going to be of datetime type:
    ret = cur.fetchone() 
    teststarttime = ret[0]
    teststoptime  = ret[1]
Reto Da Forno's avatar
Reto Da Forno committed
878
    ppFileFormat  = None
Reto Da Forno's avatar
Reto Da Forno committed
879
880
881
    
    # Find out which services are used to allocate working threads later on ---
    # Get the XML config from the database and check which services are used in the test.
Reto Da Forno's avatar
Reto Da Forno committed
882
    servicesUsed_dict = {'gpiotracing': 'gpioTracingConf', 'powerprofiling': 'powerProfilingConf', 'serial': 'serialConf'}
Reto Da Forno's avatar
Reto Da Forno committed
883
884
885
886
887
888
889
890
891
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
    ret = cur.fetchone()
    if not ret:
        msg = "No XML found in database for testid %d." %testid
        errors.append(msg)
        logger.error(msg)
        for service, xmlname in servicesUsed_dict.items():
            servicesUsed_dict[service] = True
    else:
Reto Da Forno's avatar
Reto Da Forno committed
892
        try:
Reto Da Forno's avatar
Reto Da Forno committed
893
894
895
896
897
            logger.debug("Got XML from database.")
            parser = lxml.etree.XMLParser(remove_comments=True)
            tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
            ns = {'d': flocklab.config.get('xml', 'namespace')}
            for service, xmlname in servicesUsed_dict.items():
898
                if tree.xpath('//d:%s' % xmlname, namespaces=ns):
Reto Da Forno's avatar
Reto Da Forno committed
899
                    servicesUsed_dict[service] = True
900
                    logger.debug("Usage of %s detected." % service)
Reto Da Forno's avatar
Reto Da Forno committed
901
902
                else:
                    servicesUsed_dict[service] = False
Reto Da Forno's avatar
Reto Da Forno committed
903
904
905
906
907
908
909
            # check which file format the user wants for the power profiling
            if servicesUsed_dict['powerprofiling']:
                if tree.xpath('//d:powerProfilingConf/d:fileFormat', namespaces=ns):
                    ppFileFormat = tree.xpath('//d:powerProfilingConf/d:fileFormat', namespaces=ns)[0].text
                    logger.debug("User wants file format %s for power profiling." % (ppFileFormat))
                else:
                    logger.debug("Element <fileFormat> not detected.")
Reto Da Forno's avatar
Reto Da Forno committed
910
        except:
Reto Da Forno's avatar
Reto Da Forno committed
911
912
913
914
915
916
917
918
919
920
921
922
923
924
            msg = "XML parsing failed: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
            errors.append(msg)
            logger.error(msg)
    
    cur.close()
    cn.close()
    if ((owner_fk==None) or (pindict==None) or (obsdict_byid==None) or (servicedict==None)):
        msg = "Error when getting metadata.\n"
        msg += "owner_fk: %s\npindict: %s\nobsdict_byid: %s\nservicedict: %s\n" % (str(owner_fk), str(pindict), str(obsdict_byid), str(servicedict))
        msg += "Exiting..."
        logger.debug(msg)
        os.kill(os.getpid(), signal.SIGTERM)
        flocklab.error_logandexit(msg, errno.EAGAIN)
    else:
Reto Da Forno's avatar
Reto Da Forno committed
925
        logger.debug("Got all needed metadata.")
Reto Da Forno's avatar
Reto Da Forno committed
926
927
928
929
930
931
932
933
934
935
936
937
938
    
    # Start aggregating processes ---
    """    There is an infinite loop which gets files to process from the fetcher threads which download data from the observers.
        Downloaded data is then assigned to a worker process for the corresponding service and in the worker process parsed,
        converted (if needed) and aggregated into a single file per service.
        The loop is stopped if the program receives a stop signal. In this case, the loops runs until no more database files 
        are there to be processed.
    """
    if __name__ == '__main__':
        # Create directory and files needed for test results:
        testresultsdir = "%s/%d" %(flocklab.config.get('fetcher', 'testresults_dir'), testid)
        if not os.path.exists(testresultsdir):
            os.makedirs(testresultsdir)
939
            logger.debug("Created %s" % testresultsdir)
Reto Da Forno's avatar
Reto Da Forno committed
940
        manager = multiprocessing.Manager()
941
        for service in ('errorlog', 'gpiotracing', 'powerprofiling', 'serial', 'timesynclog'):
942
            path = "%s/%s.csv" % (testresultsdir, service)
Reto Da Forno's avatar
Reto Da Forno committed
943
944
945
            lock = manager.Lock()
            testresultsfile_dict[service] = (path, lock)
            # Create file and write header:
946
947
            if service in ('errorlog', 'timesynclog'):
                header = 'timestamp,observer_id,node_id,message\n'
Reto Da Forno's avatar
Reto Da Forno committed
948
            elif service == 'gpiotracing':
949
                header = 'timestamp,observer_id,node_id,pin_name,value\n'
Reto Da Forno's avatar
Reto Da Forno committed
950
            elif service == 'powerprofiling':
Reto Da Forno's avatar
Reto Da Forno committed
951
                header = 'timestamp,observer_id,node_id,current[mA],voltage[V]\n'
Reto Da Forno's avatar
Reto Da Forno committed
952
            elif service == 'serial':
953
                header = 'timestamp,observer_id,node_id,direction,output\n'
Reto Da Forno's avatar
Reto Da Forno committed
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
            lock.acquire()
            f = open(path, 'w')
            f.write(header)
            f.close()
            lock.release()
        # Start logging thread:
        logqueue = manager.Queue(maxsize=10000)
        LogQueueThread_stopEvent = threading.Event()
        try:
            thread = LogQueueThread(logqueue, logger, LogQueueThread_stopEvent)
            thread.start()
        except:
            logger.warn("Error when starting log queue thread: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
        
        # Determine the number of CPU's to be used for each aggregating process. If a service is not used, its CPUs are assigned to other services
        cpus_free = 0
970
        cpus_logs = flocklab.config.getint('fetcher', 'cpus_errorlog')
Reto Da Forno's avatar
Reto Da Forno committed
971
972
        # CPUs for serial service:
        if servicesUsed_dict['serial'] == True:
973
            cpus_serial = flocklab.config.getint('fetcher', 'cpus_serial')
Reto Da Forno's avatar
Reto Da Forno committed
974
        else:
975
            cpus_serial = 0
Reto Da Forno's avatar
Reto Da Forno committed
976
977
978
979
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_serial')
        # CPUs for GPIO tracing:
        if servicesUsed_dict['gpiotracing'] == True:
            cpus_gpiomonitoring    = flocklab.config.getint('fetcher', 'cpus_gpiomonitoring')
Reto Da Forno's avatar
Reto Da Forno committed
980
        else:
Reto Da Forno's avatar
Reto Da Forno committed
981
982
983
984
985
            cpus_gpiomonitoring = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_gpiomonitoring')
        # CPUs for powerprofiling:
        if servicesUsed_dict['powerprofiling'] == True:
            cpus_powerprofiling    = flocklab.config.getint('fetcher', 'cpus_powerprofiling')
Reto Da Forno's avatar
Reto Da Forno committed
986
        else:
Reto Da Forno's avatar
Reto Da Forno committed
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
            cpus_powerprofiling = 0
            cpus_free = cpus_free + flocklab.config.getint('fetcher', 'cpus_powerprofiling')
        # If there are free CPUs left, give them to GPIO tracing and power profiling evenly as these services need the most CPU power:
        if cpus_free > 0:
            if (cpus_powerprofiling > 0) and (cpus_gpiomonitoring > 0):
                # Both services are used, distribute the free CPUS evenly:
                cpus_powerprofiling = cpus_powerprofiling + int(math.ceil(float(cpus_free)/2))
                cpus_gpiomonitoring = cpus_gpiomonitoring + int(math.floor(float(cpus_free)/2))
            elif cpus_powerprofiling > 0:
                # GPIO monitoring/tracing is not used, so give all CPUs to powerprofiling:
                cpus_powerprofiling = cpus_powerprofiling + cpus_free
            elif cpus_gpiomonitoring > 0:
                # Powerprofiling is not used, so give all CPUs to GPIO monitoring/tracing:
                cpus_gpiomonitoring = cpus_gpiomonitoring + cpus_free
            else:
                # Neither of the services is used, so give it to one of the other services:
                if cpus_serial > 0:
                    cpus_serial = cpus_serial + cpus_free
1005
        cpus_total = cpus_logs + cpus_serial + cpus_gpiomonitoring + cpus_powerprofiling
Reto Da Forno's avatar
Reto Da Forno committed
1006
        
1007
        service_pools_dict = { 'logs': cpus_logs, 'serial': cpus_serial, 'gpiotracing': cpus_gpiomonitoring, 'powerprofiling': cpus_powerprofiling }
Reto Da Forno's avatar
Reto Da Forno committed
1008
1009
        if (cpus_total > multiprocessing.cpu_count()):
            logger.warn("Number of requested CPUs for all aggregating processes (%d) is higher than number of available CPUs (%d) on system." % (cpus_total, multiprocessing.cpu_count()))
Reto Da Forno's avatar
Reto Da Forno committed
1010
        
Reto Da Forno's avatar
Reto Da Forno committed
1011
1012
        # Start a worker process pool for every service:
        for service, cpus in service_pools_dict.items():
1013
1014
1015
1016
            if cpus != 1:
                # currently only 1 CPU / process can be used per task since processing functions are NOT thread safe!
                logger.warning("%d is an invalid number of CPUs for service %s, using default value of 1." % (cpus, service))
                cpus = 1
Reto Da Forno's avatar
Reto Da Forno committed
1017
1018
1019
1020
            if cpus > 0:
                pool = multiprocessing.Pool(processes=cpus)
                logger.debug("Created pool for %s workers with %d processes" % (service, cpus))
                service_pools_dict[service] = pool
Reto Da Forno's avatar
Reto Da Forno committed
1021
            else:
Reto Da Forno's avatar
Reto Da Forno committed
1022
1023
1024
1025
1026
1027
1028
1029
1030
                service_pools_dict[service] = None
        logger.debug("Created all worker pools for services.")
        # Catch kill signals ---
        signal.signal(signal.SIGTERM, sigterm_handler)
        signal.signal(signal.SIGINT,  sigterm_handler)
        # Loop through the folders and assign work to the worker processes:
        vizimgdir = flocklab.config.get('viz','imgdir')
        commitsize = flocklab.config.getint('fetcher', 'commitsize')
        enableviz = flocklab.config.getint('viz','enablepreview')
1031
        loggerprefix = "(Mainloop) "
Reto Da Forno's avatar
Reto Da Forno committed
1032
        workmanager = WorkManager()
Reto Da Forno's avatar
Reto Da Forno committed
1033
        
Reto Da Forno's avatar
Reto Da Forno committed
1034
        # Main loop ---
Reto Da Forno's avatar
Reto Da Forno committed
1035
        while True:
1036
1037
1038
1039
1040
            if mainloop_stop:
                if workmanager.finished() and FetchObsThread_queue.empty():
                    # exit main loop
                    logger.debug("Work manager has nothing more to do, finishing up..")
                    break
Reto Da Forno's avatar
Reto Da Forno committed
1041

Reto Da Forno's avatar
Reto Da Forno committed
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
            # Wait for FetchObsThreads to put items on queue:
            try:
                item = FetchObsThread_queue.get(block=True, timeout=5)
                (itemtype, obsid, fdir, f) = item[:4]
                logger.debug(loggerprefix + "Got element from queue: %d, %s, %s/%s" % (itemtype, str(obsid), fdir, f))
            except queue.Empty:
                # No one put any data onto the queue.
                # In normal operation, just ignore the error and try again:
                continue
            if itemtype == ITEM_TO_PROCESS:
                nextitem = workmanager.add(item)
            else: # type is ITEM_PROCESSED
                nextitem = workmanager.done(item)
            if nextitem is None:
                logger.debug(loggerprefix + "Next item is None.")
                continue
            (itemtype, obsid, fdir, f, workerstate) = nextitem
Reto Da Forno's avatar
Reto Da Forno committed
1059
1060
            #logger.debug(loggerprefix + "Next item is %s/%s (Obs%s)." % (fdir, f, str(obsid)))
            nodeid = obsdict_byid[obsid][1]
Reto Da Forno's avatar
Reto Da Forno committed
1061
1062
1063
            callback_f = worker_callback
            worker_f = worker_convert_and_aggregate
            # Match the filename against the patterns and schedule an appropriate worker function:
Reto Da Forno's avatar
Reto Da Forno committed
1064
            if (re.search("^gpio_monitor_[0-9]{14}\.csv$", f) != None):
Reto Da Forno's avatar
Reto Da Forno committed
1065
                pool        = service_pools_dict['gpiotracing']
Reto Da Forno's avatar
Reto Da Forno committed
1066
                worker_args = [nextitem, nodeid, testresultsfile_dict['gpiotracing'][0], logqueue, None]
1067
                worker_f    = worker_gpiotracing
Reto Da Forno's avatar
Reto Da Forno committed
1068
                logger.debug(loggerprefix + "resultfile_path: %s" % str(testresultsfile_dict['gpiotracing'][0]))
1069
1070
1071
                #if (enableviz == 1):
                #    worker_args[6] = flocklab.viz_gpio_monitor
            elif (re.search("^powerprofiling_[0-9]{14}\.rld$", f) != None):
Reto Da Forno's avatar
Reto Da Forno committed
1072
                pool        = service_pools_dict['powerprofiling'] 
Reto Da Forno's avatar
Reto Da Forno committed
1073
                worker_args = [nextitem, nodeid, testresultsfile_dict['powerprofiling'][0], logqueue, ppFileFormat]
Reto Da Forno's avatar
Reto Da Forno committed
1074
                worker_f    = worker_powerprof
1075
1076
                #if (enableviz == 1):
                #    worker_args[6] = flocklab.viz_powerprofiling
Reto Da Forno's avatar
Reto Da Forno committed
1077
1078
            elif (re.search("^serial_[0-9]{14}\.db$", f) != None):
                pool        = service_pools_dict['serial']
1079
                worker_args = [nextitem, nodeid, testresultsfile_dict['serial'][0], testresultsfile_dict['serial'][1], commitsize, vizimgdir, parse_serial, convert_serial, None, logqueue]
Reto Da Forno's avatar
Reto Da Forno committed
1080
            elif (re.search("^error_[0-9]{14}\.log$", f) != None):
1081
                pool        = service_pools_dict['logs']
Reto Da Forno's avatar
Reto Da Forno committed
1082
                worker_args = [nextitem, nodeid, testresultsfile_dict['errorlog'][0], logqueue, None]
1083
1084
1085
                worker_f    = worker_logs
            elif (re.search("^timesync_[0-9]{14}\.log$", f) != None):
                pool        = service_pools_dict['logs']
Reto Da Forno's avatar
Reto Da Forno committed
1086
                worker_args = [nextitem, nodeid, testresultsfile_dict['timesynclog'][0], logqueue, None]
1087
                worker_f    = worker_logs
Reto Da Forno's avatar
Reto Da Forno committed
1088
            else:
Reto Da Forno's avatar
Reto Da Forno committed
1089
                logger.warn(loggerprefix + "Results file %s/%s from observer %s did not match any of the known patterns" % (fdir, f, obsid))
Reto Da Forno's avatar
Reto Da Forno committed
1090
1091
1092
1093
1094
1095
1096
                continue
            # Schedule worker function from the service's pool. The result will be reported to the callback function.
            pool.apply_async(func=worker_f, args=tuple(worker_args), callback=callback_f)
        # Stop signal for main loop has been set ---
        # Stop worker pool:
        for service, pool in service_pools_dict.items():
            if pool:
Reto Da Forno's avatar
Reto Da Forno committed
1097
                logger.debug("Closing pool for %s..." % service)
Reto Da Forno's avatar
Reto Da Forno committed
1098
1099
1100
                pool.close()
        for service, pool in service_pools_dict.items():
            if pool:
Reto Da Forno's avatar
Reto Da Forno committed
1101
                logger.debug("Waiting for pool for %s to close..." % service)
Reto Da Forno's avatar
Reto Da Forno committed
1102
1103
                pool.join()
        logger.debug("Closed all pools.")
Reto Da Forno's avatar
Reto Da Forno committed
1104
        
Reto Da Forno's avatar
Reto Da Forno committed
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
        # Stop logging:
        logger.debug("Stopping log queue thread...")
        LogQueueThread_stopEvent.set()
        
        # Set DB status:
        logger.debug("Setting test status in DB to 'synced'...")
        try:
            (cn, cur) = flocklab.connect_to_db()
            flocklab.set_test_status(cur, cn, testid, 'synced')
            cur.close()
            cn.close()
        except:
Reto Da Forno's avatar
Reto Da Forno committed
1117
            logger.warn("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
1118
1119
1120
1121
1122
1123
1124
1125
        
        # Delete the obsfile directories as they are not needed anymore:
        if ((obsfiledir != None) and (os.path.exists(obsfiledir))):
            shutil.rmtree(obsfiledir)
        # Delete old debug files
        if os.path.exists(flocklab.config.get('fetcher', 'obsfile_debug_dir')):
            for d in [fn for fn in os.listdir(flocklab.config.get('fetcher', 'obsfile_debug_dir')) if os.stat("%s/%s" % (flocklab.config.get('fetcher', 'obsfile_debug_dir'), fn)).st_mtime < int(time.time()) - int(flocklab.config.get('fetcher', 'obsfile_debug_dir_max_age_days')) * 24 * 3600]:
                shutil.rmtree("%s/%s" % (flocklab.config.get('fetcher', 'obsfile_debug_dir'),d))
1126
        if len(errors) > 0:
Reto Da Forno's avatar
Reto Da Forno committed
1127
1128
1129
1130
1131
1132
1133
            msg = ""
            for error in errors:
                msg += error
            flocklab.error_logandexit(msg, errno.EBADMSG)
        else:
            ret = flocklab.SUCCESS
    
Reto Da Forno's avatar
Reto Da Forno committed
1134
    sys.exit(ret)
1135
1136
1137
### END main()

if __name__ == "__main__":
Reto Da Forno's avatar
Reto Da Forno committed
1138
1139
1140
1141
    try:
        main(sys.argv[1:])
    except Exception:
        msg = "Encountered error: %s: %s\n%s\nCommand line was: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc(), str(sys.argv))
Reto Da Forno's avatar
Reto Da Forno committed
1142
        flocklab.error_logandexit(msg, errno.EAGAIN)