flocklab_fetcher.py 56.2 KB
Newer Older
1
2
#! /usr/bin/env python3

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
"""
Copyright (c) 2010 - 2020, ETH Zurich, Computer Engineering Group
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
  list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
  this list of conditions and the following disclaimer in the documentation
  and/or other materials provided with the distribution.

* Neither the name of the copyright holder nor the names of its
  contributors may be used to endorse or promote products derived from
  this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

"""

import os, sys, getopt, traceback, MySQLdb, signal, random, time, errno, multiprocessing, subprocess, re, logging, __main__, threading, struct, types, queue, math, shutil, lxml.etree, tempfile
36
37
import lib.daemon as daemon
import lib.flocklab as flocklab
38
import lib.dwt_parse as dwt
Reto Da Forno's avatar
Reto Da Forno committed
39
40
from rocketlogger.data import RocketLoggerData
import pandas as pd
Reto Da Forno's avatar
Reto Da Forno committed
41
import numpy as np
Reto Da Forno's avatar
Reto Da Forno committed
42

43

Reto Da Forno's avatar
Reto Da Forno committed
44
45
logger                   = None
debug                    = False
46
testid                   = None
Reto Da Forno's avatar
Reto Da Forno committed
47
48
49
50
51
52
53
54
55
56
57
errors                   = []
FetchObsThread_list      = []
FetchObsThread_stopEvent = None
FetchObsThread_queue     = None
obsfiledir               = None
testresultsdir           = None
testresultsfile_dict     = {}
mainloop_stop            = False
owner_fk                 = None
obsdict_byid             = None
serialdict               = None
58
59

ITEM_TO_PROCESS = 0
Reto Da Forno's avatar
Reto Da Forno committed
60
ITEM_PROCESSED  = 1
61

62

63
64
65
66
67
68
##############################################################################
#
# Error classes
#
##############################################################################
class DbFileEof(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
69
    pass
70
71

class DbFileReadError(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
72
73
74
75
    def __init__(self, expectedSize, actualSize, fpos):
        self.expectedSize = expectedSize
        self.actualSize = actualSize
        self.fpos = fpos
76
77
78
79
80
81
82
83
84
85
### END Error classes



##############################################################################
#
# Class ServiceInfo
#
##############################################################################
class ServiceInfo():
Reto Da Forno's avatar
Reto Da Forno committed
86
87
88
    def __init__(self, servicename):
        self.servicename = servicename
        self.files = []
89
        self.pattern = "^%s_[0-9]+\.[a-z]+$" % servicename
90

Reto Da Forno's avatar
Reto Da Forno committed
91
92
    def matchFileName(self, filename):
        return re.search(self.pattern, os.path.basename(filename)) is not None
93

Reto Da Forno's avatar
Reto Da Forno committed
94
95
    def addFile(self, filename):
        self.files.append(filename)
96

Reto Da Forno's avatar
Reto Da Forno committed
97
98
99
100
    def stripFileList(self, removelast=True):
        self.files.sort()
        if ((len(self.files) > 0) and removelast):
            self.files.pop()
101
102
103
104
105
106
107
108
109
### END ServiceInfo


##############################################################################
#
# sigterm_handler
#
##############################################################################
def sigterm_handler(signum, frame):
110
111
    """If the program is terminated by sending it the signal SIGTERM
    (e.g. by executing 'kill') or SIGINT (pressing ctrl-c),
Reto Da Forno's avatar
Reto Da Forno committed
112
    this signal handler is invoked for cleanup."""
113

Reto Da Forno's avatar
Reto Da Forno committed
114
115
116
117
    global mainloop_stop
    global FetchObsThread_stopEvent

    logger.info("Process received SIGTERM or SIGINT signal")
118

Reto Da Forno's avatar
Reto Da Forno committed
119
120
    # Signal all observer fetcher threads to stop:
    logger.debug("Stopping observer fetcher threads...")
Reto Da Forno's avatar
Reto Da Forno committed
121
    shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
Reto Da Forno's avatar
Reto Da Forno committed
122
123
124
125
126
127
128
129
    try:
        FetchObsThread_stopEvent.set()
    except:
        pass
    for thread in FetchObsThread_list:
        try:
            thread.join(shutdown_timeout)
        except:
130
            logger.warning("Fetcher thread did not stop within %d seconds." % shutdown_timeout)
Reto Da Forno's avatar
Reto Da Forno committed
131
132
133
    # Set DB status:
    logger.debug("Setting test status in DB to 'syncing'...")
    try:
Reto Da Forno's avatar
Reto Da Forno committed
134
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
135
136
137
138
        flocklab.set_test_status(cur, cn, testid, 'syncing')
        cur.close()
        cn.close()
    except:
139
        logger.warning("Could not connect to database.")
140

Reto Da Forno's avatar
Reto Da Forno committed
141
142
143
    # Tell the main loop to stop:
    mainloop_stop = True
    logger.debug("Set stop signal for main loop.")
144
145
146
147
148
149
150
151
152
### END sigterm_handler


##############################################################################
#
# Functions for parsing observer DB files data
#
##############################################################################
def parse_serial(buf):
Reto Da Forno's avatar
Reto Da Forno committed
153
154
    _data = struct.unpack("iii%ds" % (len(buf) - 12), buf) #int service; struct timeval timestamp;char * data
    return (_data[0], _data[3], "%i.%06i" % (_data[1], _data[2]))
155
156
157
158
159
160
161
162


##############################################################################
#
# Functions for converting observer DB data
#
##############################################################################
def convert_serial(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
163
    try:
164
        result = "%s,%s,%s,%s,%s\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1].decode('utf8').rstrip())
Reto Da Forno's avatar
Reto Da Forno committed
165
    except UnicodeDecodeError:
Reto Da Forno's avatar
Reto Da Forno committed
166
        # discard result, return empty string
167
        result = "%s,%s,%s,%s,\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]])
Reto Da Forno's avatar
Reto Da Forno committed
168
    return result
169
170
171
172
173
174


##############################################################################
#
# read_from_db_file: Read from an open DB file from an observer
#
175
##############################################################################
176
def read_from_db_file(dbfile):
Reto Da Forno's avatar
Reto Da Forno committed
177
178
179
180
181
182
183
184
185
186
187
188
    _buf = dbfile.read(4)
    if len(_buf) < 4:
        dbfile.close()
        raise DbFileEof()
    else:
        _size = struct.unpack("<I",_buf)
        _buf = dbfile.read(_size[0])
        if len(_buf) != _size[0]:
            _fpos = dbfile.tell() - 4 - len(_buf)
            dbfile.close()
            raise DbFileReadError(_size[0], len(_buf), _fpos)
        return _buf
189
190
191
192
193
### END read_from_db_file


##############################################################################
#
194
# worker_dbfiles: Parses observer DB files.
195
196
#
##############################################################################
197
def worker_dbfiles(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, commitsize=1, parse_f=None, convert_f=None, logqueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
198
199
200
201
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
Reto Da Forno's avatar
Reto Da Forno committed
202
        input_filename = "%s/%s" % (fdir,f)
Reto Da Forno's avatar
Reto Da Forno committed
203
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
204
        # Open file:
Reto Da Forno's avatar
Reto Da Forno committed
205
        dbfile = open(input_filename, 'rb')
Reto Da Forno's avatar
Reto Da Forno committed
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
        rows = 0
        conv_values = []
        while not dbfile.closed:
            # Process DB file line by line:
            try:
                # Parse one line:
                buf = read_from_db_file(dbfile)
                obsdata = parse_f(buf)
                # Convert data if needed:
                if convert_f != None:
                    conv_data = convert_f(obsdata, obsid, nodeid)
                    conv_values.append(conv_data)
                    rows += 1
                # Visualize data:
                if (commitsize > 0) & (rows >= commitsize):
                    # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
222
                    #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for writing..." % (resultfile_path)))
Reto Da Forno's avatar
Reto Da Forno committed
223
224
225
226
227
                    resultfile_lock.acquire()
                    f = open(resultfile_path, 'a')
                    f.writelines(conv_values)
                    f.close()
                    resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
228
                    logqueue.put_nowait((loggername, logging.DEBUG, "Committed results to %s after %d rows" % (resultfile_path, rows)))
Reto Da Forno's avatar
Reto Da Forno committed
229
230
231
232
233
234
                    rows = 0
                    conv_values = []
            except DbFileEof:
                # logqueue.put_nowait((loggername, logging.DEBUG, "DbFileEof has occurred."))
                break # dbfile has been closed in parser (most likely because EOF was reached)
            except DbFileReadError as err:
Reto Da Forno's avatar
Reto Da Forno committed
235
                msg = "%s: Packet size (%i) did not match payload size (%i) @ %d." %(input_filename, err.expectedSize, err.actualSize, err.fpos)
236
                _errors.append((msg, obsid))
Reto Da Forno's avatar
Reto Da Forno committed
237
                logqueue.put_nowait((loggername, logging.ERROR, msg))
238
        if (len(conv_values) > 0):
Reto Da Forno's avatar
Reto Da Forno committed
239
            # There is still data left. Do a last commit
240
241
242
243
244
245
            # Write data to file:
            resultfile_lock.acquire()
            f = open(resultfile_path, 'a')
            f.writelines(conv_values)
            f.close()
            resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
246
            logqueue.put_nowait((loggername, logging.DEBUG, "Committed final results to %s after %d rows" % (resultfile_path, rows)))
247
        # Remove processed file:
Reto Da Forno's avatar
Reto Da Forno committed
248
        os.unlink(input_filename)
Reto Da Forno's avatar
Reto Da Forno committed
249
250
    except:
        msg = "Error in worker process: %s: %s\n%s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
251
        _errors.append((msg, obsid))
Reto Da Forno's avatar
Reto Da Forno committed
252
253
254
255
256
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
257
### END worker_dbfiles
258
259


Reto Da Forno's avatar
Reto Da Forno committed
260
261
262
263
264
265
266
##############################################################################
#
# worker_gpiotracing: Worker function for converting and aggregating gpio
#               tracing data. Unlike for the other services, this function works on
#               whole observer DB files.
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
267
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
Reto Da Forno's avatar
Reto Da Forno committed
268
269
    try:
        _errors = []
270
        cur_p = multiprocessing.current_process()
Reto Da Forno's avatar
Reto Da Forno committed
271
        (itemtype, obsid, fdir, f, workerstate) = queueitem
272
        inputfilename = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
273
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
274
275

        with open(resultfile_path, "a") as outfile:
276
            infile = open(inputfilename, "r")
277
278
            for line in infile:
                try:
279
                    (timestamp, ticks, pin, level) = line.strip().split(',', 3)
Reto Da Forno's avatar
Reto Da Forno committed
280
                    outfile.write("%s,%s,%d,%s,%s,%s\n" % (timestamp, ticks, obsid, nodeid, pin, level))
281
282
283
284
                except ValueError:
                    logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
                    break
            infile.close()
285
        os.remove(inputfilename)
Reto Da Forno's avatar
Reto Da Forno committed
286
    except:
Reto Da Forno's avatar
Reto Da Forno committed
287
        msg = "Error in gpiotracing worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
288
        _errors.append((msg, obsid))
Reto Da Forno's avatar
Reto Da Forno committed
289
290
291
292
293
294
295
296
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_gpiotracing


297
298
299
##############################################################################
#
# worker_powerprof: Worker function for converting and aggregating power
Reto Da Forno's avatar
Reto Da Forno committed
300
301
#        profiling data. Unlike for the other services, this function works on
#        whole observer DB files.
302
303
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
304
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
Reto Da Forno's avatar
Reto Da Forno committed
305
306
307
308
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
309
        inputfilename = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
310
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
311

Reto Da Forno's avatar
Reto Da Forno committed
312
313
314
315
316
        if arg and arg == 'rld':
            # RLD file format
            # simply move the file into the results directory
            try:
                resfilename = "%s.%s.%s.rld" % (os.path.splitext(resultfile_path)[0], obsid, nodeid)
317
                os.rename(inputfilename, resfilename)
Reto Da Forno's avatar
Reto Da Forno committed
318
319
320
            except FileExistsError:
                # TODO: properly handle case where file already exists (several rld files per observer)
                msg = "File '%s' already exists, dropping test results." % (resfilename)
321
                _errors.append((msg, obsid))
Reto Da Forno's avatar
Reto Da Forno committed
322
323
324
                logqueue.put_nowait((loggername, logging.ERROR, msg))
        else:
            # CSV file format
325
            rld_data = RocketLoggerData(inputfilename).merge_channels()
Reto Da Forno's avatar
Reto Da Forno committed
326
327
328
329
330
331
332
333
334
            # get network time and convert to UNIX timestamp (UTC)
            timeidx = rld_data.get_time(absolute_time=True, time_reference='network')     # TODO adjust parameters for RL 1.99+
            timeidxunix = timeidx.astype('uint64') / 1e9   # convert to s
            current_ch = rld_data.get_data('I1') * 1000    # convert to mA
            voltage_ch = rld_data.get_data('V2') - rld_data.get_data('V1')    # voltage difference
            rld_dataframe = pd.DataFrame(np.hstack((current_ch, voltage_ch)), index=timeidxunix, columns=['I', 'V'])
            rld_dataframe.insert(0, 'observer_id', obsid)
            rld_dataframe.insert(1, 'node_id', nodeid)
            rld_dataframe.to_csv(resultfile_path, sep=',', index_label='time', header=False, mode='a')
Reto Da Forno's avatar
Reto Da Forno committed
335

336
            os.remove(inputfilename)
Reto Da Forno's avatar
Reto Da Forno committed
337
338
    except:
        msg = "Error in powerprof worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
339
        _errors.append((msg, obsid))
Reto Da Forno's avatar
Reto Da Forno committed
340
341
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
Reto Da Forno's avatar
Reto Da Forno committed
342
343
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
Reto Da Forno's avatar
Reto Da Forno committed
344
345
346
347
348
349
        return (_errors, tuple(processeditem))
### END worker_powerprof


##############################################################################
#
350
# worker_logs
Reto Da Forno's avatar
Reto Da Forno committed
351
352
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
353
def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
Reto Da Forno's avatar
Reto Da Forno committed
354
355
356
357
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
358
        inputfilename = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
359
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
360
361

        with open(resultfile_path, "a") as outfile:
362
            infile = open(inputfilename, "r")
Reto Da Forno's avatar
Reto Da Forno committed
363
            for line in infile:
364
                (timestamp, msg) = line.strip().split(',', 1)
Reto Da Forno's avatar
Reto Da Forno committed
365
                outfile.write("%s,%d,%s,%s\n" % (timestamp, obsid, nodeid, msg))
Reto Da Forno's avatar
Reto Da Forno committed
366
            infile.close()
367
        os.remove(inputfilename)
Reto Da Forno's avatar
Reto Da Forno committed
368
    except:
369
        msg = "Error in logs worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
370
        _errors.append((msg, obsid))
Reto Da Forno's avatar
Reto Da Forno committed
371
372
373
374
375
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
376
### END worker_logs()
377
378


379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
##############################################################################
#
# worker_serial
#
##############################################################################
def worker_serial(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
        inputfilename = "%s/%s" % (fdir, f)
        loggername = "(%s.%d) " % (cur_p.name, obsid)

        with open(resultfile_path, "a") as outfile:
            infile = open(inputfilename, "r")
394
395
396
397
398
            # read line by line and check for decode errors
            while True:
                try:
                    line = infile.readline()
                except UnicodeDecodeError:
399
                    continue      # ignore invalid lines
400
401
                if not line:
                    break
402
403
404
405
                try:
                    (timestamp, msg) = line.strip().split(',', 1)
                except:
                    continue
Reto Da Forno's avatar
Reto Da Forno committed
406
                result = "%s,%d,%s,r,%s\n" % (timestamp, obsid, nodeid, msg.rstrip())
407
408
409
410
411
                outfile.write(result)
            infile.close()
        os.remove(inputfilename)
    except:
        msg = "Error in serial worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
412
        _errors.append((msg, obsid))
413
414
415
416
417
418
419
420
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_serial()


Reto Da Forno's avatar
wip    
Reto Da Forno committed
421
422
423
424
425
426
427
428
429
430
##############################################################################
#
# worker_datatrace
#
##############################################################################
def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
Reto Da Forno's avatar
Reto Da Forno committed
431
        input_filename = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
wip    
Reto Da Forno committed
432
        loggername = "(%s.%d) " % (cur_p.name, obsid)
433
434
        # DEBUG
        shutil.copyfile(input_filename, "%s_raw" % resultfile_path)
435
        # parse the file
Reto Da Forno's avatar
Reto Da Forno committed
436
437
438
439
        # first line of the log file contains the variable names
        varnames = ""
        with open(input_filename, "r") as f:
            varnames = f.readline().strip().split()
440
        try:
441
442
            # process raw datatrace log (parse & apply time correction)
            dfData, dfLocalTs, dfOverflow = dwt.processDatatraceOutput(input_filename)
443
444
445
        except ValueError:
            logqueue.put_nowait((loggername, logging.WARNING, "Empty data trace results file."))
        else:
Roman Trüb's avatar
Roman Trüb committed
446
            # add observer and node ID
447
448
            dfData['obsid'] = obsid
            dfData['nodeid'] = nodeid
Roman Trüb's avatar
Roman Trüb committed
449
            # convert comparator ID to variable name
450
            dfData['varname'] = dfData.comparator.apply(lambda x: (varnames[x] if x < len(varnames) else str(x)))
Roman Trüb's avatar
Roman Trüb committed
451
            # append datatrace elements from obsever to datatrace log file
452
            with open(resultfile_path, "a") as outfile:
453
                dfData.to_csv(
454
455
456
457
458
                  outfile,
                  columns=['global_ts', 'obsid', 'nodeid', 'varname', 'data', 'operation', 'PC'],
                  index=False,
                  header=False
                )
Reto Da Forno's avatar
wip    
Reto Da Forno committed
459
460
    except:
        msg = "Error in datatrace worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
461
        _errors.append((msg, obsid))
Reto Da Forno's avatar
wip    
Reto Da Forno committed
462
463
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
464
465
        # delete files
        os.remove(input_filename)
Reto Da Forno's avatar
wip    
Reto Da Forno committed
466
467
468
469
470
471
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_datatrace()


472
473
474
##############################################################################
#
# worker_callback: Callback function which reports errors from worker processes
Reto Da Forno's avatar
Reto Da Forno committed
475
#        back to the main process
476
477
478
#
##############################################################################
def worker_callback(result):
Reto Da Forno's avatar
Reto Da Forno committed
479
480
    global errors
    global FetchObsThread_queue
481

482
483
    # the result contains two elements:
    # 1st: a list of errors
Reto Da Forno's avatar
Reto Da Forno committed
484
    if len(result[0]) > 0:
485
486
        try:
            for (err, obsid) in result[0]:
Reto Da Forno's avatar
Reto Da Forno committed
487
                msg = "An error occurred when processing the results for Observer %d: %s" % (obsid, str(err))
488
489
                errors.append(msg)
        except:
Reto Da Forno's avatar
Reto Da Forno committed
490
            errors.append("Failed to convert the error list in worker_callback (%s)." % str(result[0]))
491

492
    # 2nd: a list of the processed elements
Reto Da Forno's avatar
Reto Da Forno committed
493
494
495
496
497
498
    try:
        FetchObsThread_queue.put(item=result[1], block=True, timeout=10)
    except queue.Full:
        msg = "Queue full after processing element"
        logger.error(msg)
    return 0
499
500
501
502
503
504
505
506
507
### END worker_callback


##############################################################################
#
# LogQueueThread
#
##############################################################################
class LogQueueThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
508
    """    Thread which logs from queue to logfile.
509
    """
Reto Da Forno's avatar
Reto Da Forno committed
510
    def __init__(self, logqueue, logger, stopEvent):
511
        threading.Thread.__init__(self)
Reto Da Forno's avatar
Reto Da Forno committed
512
513
514
        self._logger    = logger
        self._stopEvent = stopEvent
        self._logqueue  = logqueue
Reto Da Forno's avatar
Reto Da Forno committed
515
516
517

    def run(self):
        self._logger.info("LogQueueThread started")
518

Reto Da Forno's avatar
Reto Da Forno committed
519
520
521
522
523
524
525
        # Let thread run until someone calls terminate() on it:
        while not self._stopEvent.is_set():
            try:
                (loggername, loglevel, msg) = self._logqueue.get(block=True, timeout=1)
                self._logger.log(loglevel, loggername + msg)
            except queue.Empty:
                pass
526

Reto Da Forno's avatar
Reto Da Forno committed
527
528
        # Stop the process:
        self._logger.info("LogQueueThread stopped")
529
530
531
532
533
534
535
536
537
### END LogQueueThread


##############################################################################
#
# FetchObsThread
#
##############################################################################
class FetchObsThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
538
    """    Thread which downloads database files from an observer to the server.
539
    """
Reto Da Forno's avatar
Reto Da Forno committed
540
    def __init__(self, obsid, obsethernet, dirname, debugdirname, workQueue, stopEvent):
541
        threading.Thread.__init__(self)
Reto Da Forno's avatar
Reto Da Forno committed
542
        self._obsid            = obsid
543
544
545
        self._obsethernet      = obsethernet
        self._obsfiledir       = dirname
        self._obsfiledebugdir  = debugdirname
Reto Da Forno's avatar
Reto Da Forno committed
546
547
        self._workQueue        = workQueue
        self._stopEvent        = stopEvent
548
        self._logger           = logger
549

Reto Da Forno's avatar
Reto Da Forno committed
550
551
        self._min_sleep        = flocklab.config.getint("fetcher", "min_sleeptime")
        self._max_randsleep    = flocklab.config.getint("fetcher", "max_rand_sleeptime")
552
        self._obstestresfolder = "%s/%d" % (flocklab.config.get("observer", "testresultfolder"), testid)
553

Reto Da Forno's avatar
Reto Da Forno committed
554
555
    def run(self):
        try:
Reto Da Forno's avatar
Reto Da Forno committed
556
            self._loggerprefix = "(FetchObsThread.%d) "%self._obsid
Reto Da Forno's avatar
Reto Da Forno committed
557
558
            self._logger.info(self._loggerprefix + "FetchObsThread starting...")
            removelast = True
559

Reto Da Forno's avatar
Reto Da Forno committed
560
561
            # Let thread run until someone calls terminate() on it:
            while removelast == True:
562
563
                """ Get data from the observer over SCP.
                Then request data from the observer and store it in the server's filesystem.
Reto Da Forno's avatar
Reto Da Forno committed
564
565
566
567
568
569
570
571
572
573
                Then sleep some random time before fetching data again.
                """
                # Wait for some random time:
                waittime =self._min_sleep + random.randrange(0,self._max_randsleep)
                #DEBUG self._logger.debug(self._loggerprefix + "Going to wait for %d seconds" %(waittime))
                self._stopEvent.wait(waittime) # The wait will be interrupted if the stop signal has been set causing the thread to download all remaining files
                if self._stopEvent.is_set():
                    removelast = False
                #self._logger.debug(self._loggerprefix + "Woke up")
                # Get list of available files
Reto Da Forno's avatar
Reto Da Forno committed
574
                cmd = ['ssh' , self._obsethernet, "ls %s/" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
575
576
577
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)  # universal_newlines makes sure that a string is returned instead of a byte object
                out, err = p.communicate(None)
                rs = p.returncode
578
                if (rs == flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
579
                    services = {}
580
                    for servicename in [ "gpio_monitor", "powerprofiling", "serial", "error", "timesync", "datatrace" ]:
Reto Da Forno's avatar
Reto Da Forno committed
581
582
                        services[servicename] = ServiceInfo(servicename)
                    # Read filenames
Reto Da Forno's avatar
Reto Da Forno committed
583
                    for resfile in out.split():
Reto Da Forno's avatar
Reto Da Forno committed
584
585
                        # Check name and append to corresponding list
                        for service in services.values():
Reto Da Forno's avatar
Reto Da Forno committed
586
587
                            if service.matchFileName(resfile):
                                service.addFile("%s/%s" % (self._obstestresfolder, resfile))
Reto Da Forno's avatar
Reto Da Forno committed
588
589
590
591
592
                                break
                    copyfilelist = []
                    # Remove latest from each list as the observer might still be writing into it (unless stop event has been set).
                    for service in services.values():
                        service.stripFileList(removelast)
Reto Da Forno's avatar
Reto Da Forno committed
593
594
                        for resfile in service.files:
                            copyfilelist.append(resfile)
Reto Da Forno's avatar
Reto Da Forno committed
595
                        #if (len(service.files) > 0):
596
                        #    self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (" ".join(service.files), service.servicename))
597

Reto Da Forno's avatar
Reto Da Forno committed
598
599
                    if len(copyfilelist) > 0:
                        # Download the database files:
Reto Da Forno's avatar
Reto Da Forno committed
600
                        self._logger.debug(self._loggerprefix + "Downloading results files %s" % (" ".join(copyfilelist)))
Reto Da Forno's avatar
Reto Da Forno committed
601
                        cmd = ['scp', '-q' ]
Reto Da Forno's avatar
Reto Da Forno committed
602
                        cmd.extend(["%s:%s" % (self._obsethernet, x) for x in copyfilelist])
603
                        cmd.append("%s/" % self._obsfiledir)
Reto Da Forno's avatar
Reto Da Forno committed
604
605
606
607
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
                        if rs != 0:
Reto Da Forno's avatar
Reto Da Forno committed
608
                            self._logger.debug(self._loggerprefix + "Could not download all results files from observer. Dataloss occurred for this observer.")
Reto Da Forno's avatar
Reto Da Forno committed
609
                            self._logger.debug(self._loggerprefix + "Tried to execute %s, result was %d, stdout: %s, error: %s" % (str(cmd), rs, out, err))
Reto Da Forno's avatar
Reto Da Forno committed
610
                        else:
Reto Da Forno's avatar
Reto Da Forno committed
611
                            self._logger.debug("Downloaded results files from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
612
613
614
                            # put a copy to the debug directory
                            for f in copyfilelist:
                                fname = os.path.basename(f)
Reto Da Forno's avatar
Reto Da Forno committed
615
                                shutil.copyfile("%s/%s" % (self._obsfiledir, fname), "%s/%s" % (self._obsfiledebugdir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
616
617
618
619
620
621
622
623
                            # Tell the fetcher to start working on the files:
                            for f in copyfilelist:
                                fname = os.path.basename(f)
                                try:
                                    self._workQueue.put(item=(ITEM_TO_PROCESS, self._obsid, self._obsfiledir, fname, None), block=True, timeout=10)
                                except queue.Full:
                                    # Make sure the file is downloaded again at a later point:
                                    copyfilelist.remove(f)
Reto Da Forno's avatar
Reto Da Forno committed
624
                                    os.unlink("%s/%s" % (self._obsfiledir, fname))
625
                                    self._logger.warning(self._loggerprefix + "FetchObsThread queue is full. Cannot put %s/%s on it." % (self._obsfiledir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
626
627
                            # Remove remote files if any are left:
                            if (len(copyfilelist) > 0):
628
                                cmd = ['ssh' ,'%s'%(self._obsethernet), "cd %s;"%self._obstestresfolder, "rm"]
Reto Da Forno's avatar
Reto Da Forno committed
629
630
631
632
633
                                cmd.extend(copyfilelist)
                                self._logger.debug(self._loggerprefix + "Removing files on observer...")
                                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                                out, err = p.communicate(None)
                                rs = p.wait()
634
                                if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
635
                                    self._logger.error(self._loggerprefix + "Could not remove files on observer, result was %d, error: %s" % (rs, err))
Reto Da Forno's avatar
Reto Da Forno committed
636
637
638
639
                            else:
                                self._logger.debug(self._loggerprefix + "No files left to delete on observer.")
                    else:
                        self._logger.debug(self._loggerprefix + "No files to download from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
640

641
                    if False and removelast == False: # this is the last execution of the while loop
642
                        cmd = ['ssh' ,'%s'%(self._obsethernet), "rm -rf %s" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
643
644
645
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
646
                        if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
647
                            self._logger.error(self._loggerprefix + "Could not remove results directory from observer, result was %d. Error: %s" % (rs, err.strip()))
Reto Da Forno's avatar
Reto Da Forno committed
648

Reto Da Forno's avatar
Reto Da Forno committed
649
                else:
Reto Da Forno's avatar
Reto Da Forno committed
650
                    self._logger.error(self._loggerprefix + "SSH to observer did not succeed, fetcher thread terminated with code %d. Error: %s" % (rs, err.strip()))
Reto Da Forno's avatar
Reto Da Forno committed
651
                    break   # abort
652

Reto Da Forno's avatar
Reto Da Forno committed
653
        except:
Reto Da Forno's avatar
Reto Da Forno committed
654
            logger.error(self._loggerprefix + "FetchObsThread crashed: %s, %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
655

Reto Da Forno's avatar
Reto Da Forno committed
656
657
        # Stop the process:
        self._logger.info(self._loggerprefix + "FetchObsThread stopped")
658
659
660
661
662
663
664
665
666
### END FetchObsThread


##############################################################################
#
# Start Fetcher
#
##############################################################################
def start_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
667
668
669
670
671
    global obsfiledir
    global FetchObsThread_list
    global FetchObsThread_queue
    global FetchObsThread_stopEvent
    global obsfetcher_dict
672

Reto Da Forno's avatar
Reto Da Forno committed
673
674
675
676
    # Daemonize the process ---
    daemon.daemonize(None, closedesc=False)
    logger.info("Daemon started")
    logger.info("Going to fetch data for test ID %d" %testid)
677

Reto Da Forno's avatar
Reto Da Forno committed
678
679
    # Get needed metadata from database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
680
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
681
    except:
Reto Da Forno's avatar
Reto Da Forno committed
682
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
683
    try:
684
685
686
        cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address
                       FROM `tbl_serv_observer` AS `a`
                       LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk
687
688
                       WHERE `b`.test_fk = %d GROUP BY `a`.observer_id;
                    """ % testid)
Reto Da Forno's avatar
Reto Da Forno committed
689
690
    except MySQLdb.Error as err:
        msg = str(err)
Reto Da Forno's avatar
Reto Da Forno committed
691
        flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
692
    except:
693
        logger.warning("Error %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
694
695
696
697
698
699
700
    rs = cur.fetchall()
    cur.close()
    cn.close()
    logger.debug("Got list of FlockLab observers from database.")
    if not rs:
        logger.info("No observers found for this test. Nothing has to be done, thus exiting...")
        return errno.ENODATA
701

Reto Da Forno's avatar
Reto Da Forno committed
702
703
    # Start fetcher threads ---
    # Create a directory structure to store the downloaded files from the DB:
704
    obsfiledir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
705
706
    if not os.path.exists(obsfiledir):
        os.makedirs(obsfiledir)
707
    obsfiledebugdir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_debug_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
708
709
710
711
712
713
714
715
716
    if not os.path.exists(obsfiledebugdir):
        os.makedirs(obsfiledebugdir)
        #DEBUG logger.debug("Created %s"%obsfiledir)
    # Start one fetching thread per observer
    FetchObsThread_stopEvent = threading.Event()
    FetchObsThread_queue = queue.Queue(maxsize=10000)
    for observer in rs:
        obsid = int(observer[0])
        # Create needed directories:
Reto Da Forno's avatar
Reto Da Forno committed
717
        dirname = "%s/%d" % (obsfiledir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
718
719
        if (not os.path.exists(dirname)):
            os.makedirs(dirname)
Reto Da Forno's avatar
Reto Da Forno committed
720
        debugdirname = "%s/%d" % (obsfiledebugdir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
721
722
        if (not os.path.exists(debugdirname)):
            os.makedirs(debugdirname)
723
        # Start thread:
Reto Da Forno's avatar
Reto Da Forno committed
724
        try:
Reto Da Forno's avatar
Reto Da Forno committed
725
            thread = FetchObsThread(obsid, observer[1], dirname, debugdirname, FetchObsThread_queue, FetchObsThread_stopEvent)
Reto Da Forno's avatar
Reto Da Forno committed
726
727
            FetchObsThread_list.append(thread)
            thread.start()
Reto Da Forno's avatar
Reto Da Forno committed
728
            logger.debug("Started fetcher thread for observer %d" % (obsid))
Reto Da Forno's avatar
Reto Da Forno committed
729
        except:
730
            logger.warning("Error when starting fetcher thread for observer %d: %s, %s" % (obsid, str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
731
            continue
732

733
    return flocklab.SUCCESS
734
735
736
737
738
739
740
741
742
### END start_fetcher


##############################################################################
#
# Stop Fetcher
#
##############################################################################
def stop_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
743
744
745
746
747
748
749
    # Get oldest running instance of the fetcher for the selected test ID which is the main process and send it the terminate signal:
    try:
        pid = flocklab.get_fetcher_pid(testid)
        # Signal the process to stop:
        if (pid > 0):
            # Do not stop this instance if it is the only one running:
            if (pid == os.getpid()):
Reto Da Forno's avatar
Reto Da Forno committed
750
751
                raise ValueError
            logger.debug("Sending SIGTERM signal to process %d" % pid)
Reto Da Forno's avatar
Reto Da Forno committed
752
753
754
755
756
757
758
759
760
761
762
763
764
765
            os.kill(pid, signal.SIGTERM)
            # wait for process to finish (timeout..)
            shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
            pidpath = "/proc/%d" % pid
            while os.path.exists(pidpath) & (shutdown_timeout > 0):
                time.sleep(1)
                shutdown_timeout = shutdown_timeout - 1
            if os.path.exists(pidpath):
                logger.error("Fetcher with PID %d is still running, killing process..." % pid)
                # send kill signal
                os.kill(pid, signal.SIGKILL)
                time.sleep(3)
                # check if there is a remaining fetcher process
                pid = flocklab.get_fetcher_pid(testid)
Reto Da Forno's avatar
Reto Da Forno committed
766
                while pid > 0 and pid != os.getpid():
767
                    logger.warning("Found a remaining fetcher thread with PID %d, killing it now..." % (pid))
768
                    os.kill(pid, signal.SIGKILL)
Reto Da Forno's avatar
Reto Da Forno committed
769
                    pid = flocklab.get_fetcher_pid(testid)
Reto Da Forno's avatar
Reto Da Forno committed
770
                raise ValueError
Reto Da Forno's avatar
Reto Da Forno committed
771
        else:
Reto Da Forno's avatar
Reto Da Forno committed
772
            raise ValueError
773
    except ValueError:
Reto Da Forno's avatar
Reto Da Forno committed
774
775
776
        # Set DB status in order to allow dispatcher and scheduler to go on.:
        logger.debug("Setting test status in DB to 'synced'...")
        try:
Reto Da Forno's avatar
Reto Da Forno committed
777
            (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
778
779
780
781
            flocklab.set_test_status(cur, cn, testid, 'synced')
            cur.close()
            cn.close()
        except:
782
            logger.warning("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
783
        return errno.ENOPKG
784

785
    return flocklab.SUCCESS
786
787
788
789
790
791
792
793
794
### END stop_fetcher


##############################################################################
#
# Class WorkManager
#
##############################################################################
class WorkManager():
Reto Da Forno's avatar
Reto Da Forno committed
795
796
797
798
    def __init__(self):
        self.worklist = {}
        self.pattern = re.compile("_[0-9].*")
        self.workcount = 0
799

Reto Da Forno's avatar
Reto Da Forno committed
800
801
802
803
    def _next_item_with_state(self, service, obsid):
        stateitem = list(self.worklist[service][obsid][1][0])
        stateitem[4] = self.worklist[service][obsid][0]
        return tuple(stateitem)
804

Reto Da Forno's avatar
Reto Da Forno committed
805
    def add(self, item):
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
        try:
            service = self.pattern.sub("",item[3])
            obsid = item[1]
            if service not in self.worklist:
                self.worklist[service] = {}
            if obsid not in self.worklist[service]:
                self.worklist[service][obsid] = [None, []] # workerstate / worklist
            # if list is empty, we're good to process, otherwise just append it and return None
            if len(self.worklist[service][obsid][1]) == 0:
                self.worklist[service][obsid][1].append(item)
                self.workcount = self.workcount + 1
                return self._next_item_with_state(service, obsid)
            else:
                self.worklist[service][obsid][1].append(item)
                self.workcount = self.workcount + 1
                return None
        except:
            logger.error("Error in WorkManager.add(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
824

Reto Da Forno's avatar
Reto Da Forno committed
825
    def done(self, item):
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
        try:
            service = self.pattern.sub("",item[3])
            obsid = item[1]
            if item[1:-1] == self.worklist[service][obsid][1][0][1:-1]:
                self.worklist[service][obsid][0] = item[4] # save state
                self.worklist[service][obsid][1].pop(0)
                self.workcount = self.workcount - 1
            else:
                logger.error("work done for item that was not enqueued: %s" % str(item))
            # if there is more work to do, return next item
            if len(self.worklist[service][obsid][1]) > 0:
                return self._next_item_with_state(service, obsid)
            else:
                return None
        except:
            logger.error("Error in WorkManager.done(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
842

Reto Da Forno's avatar
Reto Da Forno committed
843
844
    def finished(self):
        return self.workcount == 0
845

846
847
848
849
850
851
852
853
854
### END WorkManager


##############################################################################
#
# Usage
#
##############################################################################
def usage():
Reto Da Forno's avatar
Reto Da Forno committed
855
    print("Usage: %s --testid=<int> [--stop] [--debug] [--help]" % __file__)
Reto Da Forno's avatar
Reto Da Forno committed
856
857
858
859
860
    print("Options:")
    print("  --testid=<int>\t\tTest ID of test to which incoming data belongs.")
    print("  --stop\t\t\tOptional. Causes the program to stop a possibly running instance of the fetcher.")
    print("  --debug\t\t\tOptional. Print debug messages to log.")
    print("  --help\t\t\tOptional. Print this help.")
861
862
863
864
865
866
867
868
869
### END usage()


##############################################################################
#
# Main
#
##############################################################################
def main(argv):
870

Reto Da Forno's avatar
Reto Da Forno committed
871
872
873
874
875
876
877
878
879
    ### Get global variables ###
    global logger
    global debug
    global testid
    global testresultsdir
    global testresultsfile_dict
    global owner_fk
    global obsdict_byid
    global serialdict
880

Reto Da Forno's avatar
Reto Da Forno committed
881
    stop = False
882

Reto Da Forno's avatar
Reto Da Forno committed
883
    # Get logger:
Reto Da Forno's avatar
Reto Da Forno committed
884
    logger = flocklab.get_logger()
885

Reto Da Forno's avatar
Reto Da Forno committed
886
    # Get the config file ---
887
    flocklab.load_config()
Reto Da Forno's avatar
Reto Da Forno committed
888
889

    # Get command line parameters ---
890
    try:
Reto Da Forno's avatar
Reto Da Forno committed
891
892
893
        opts, args = getopt.getopt(argv, "hedt:", ["help", "stop", "debug", "testid="])
    except getopt.GetoptError as err:
        print(str(err))
894
        logger.warning(str(err))
Reto Da Forno's avatar
Reto Da Forno committed
895
896
897
898
        usage()
        sys.exit(errno.EINVAL)
    except:
        msg = "Error when getting arguments: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
899
        flocklab.error_logandexit(msg, errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
900
901
902
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
903
            sys.exit(flocklab.SUCCESS)
Reto Da Forno's avatar
Reto Da Forno committed
904
905
906
907
908
909
910
911
912
        elif opt in ("-d", "--debug"):
            debug = True
            logger.setLevel(logging.DEBUG)
        elif opt in ("-t", "--testid"):
            try:
                testid = int(arg)
            except ValueError:
                err = "Wrong API usage: testid has to be integer"
                print(str(err))
913
                logger.warning(str(err))
Reto Da Forno's avatar
Reto Da Forno committed
914
915
916
917
918
919
                usage()
                sys.exit(errno.EINVAL)
        elif opt in ("-e", "--stop"):
            stop = True
        else:
            print("Wrong API usage")
920
            logger.warning("Wrong API usage")
Reto Da Forno's avatar
Reto Da Forno committed
921
            sys.exit(errno.EINVAL)
922

Reto Da Forno's avatar
Reto Da Forno committed
923
924
925
    # Check if the necessary parameters are set ---
    if not testid:
        print("Wrong API usage")
926
        logger.warning("Wrong API usage")
Reto Da Forno's avatar
Reto Da Forno committed
927
        sys.exit(errno.EINVAL)
928

Reto Da Forno's avatar
Reto Da Forno committed
929
930
    # Check if the Test ID exists in the database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
931
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
932
    except:
Reto Da Forno's avatar
Reto Da Forno committed
933
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
934
935
936
937
938
939
    rs = flocklab.check_test_id(cur, testid)
    cur.close()
    cn.close()
    if rs != 0:
        if rs == 3:
            msg = "Test ID %d does not exist in database." %testid
Reto Da Forno's avatar
Reto Da Forno committed
940
            flocklab.error_logandexit(msg, errno.EINVAL)
Reto Da Forno's avatar
Reto Da Forno committed
941
        else:
Reto Da Forno's avatar
Reto Da Forno committed
942
            msg = "Error when trying to get test ID from database: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
943
            flocklab.error_logandexit(msg, errno.EIO)
944

Reto Da Forno's avatar
Reto Da Forno committed
945
946
    # Add Test ID to logger name ---
    logger.name += " (Test %d)"%testid
947

Reto Da Forno's avatar
Reto Da Forno committed
948
    # Start / stop the fetcher ---
949
    ret = flocklab.SUCCESS
Reto Da Forno's avatar
Reto Da Forno committed
950
951
952
    if stop:
        ret = stop_fetcher()
        logger.info("FlockLab fetcher stopped.")
Reto Da Forno's avatar
Reto Da Forno committed
953
954
        sys.exit(ret)

955
    # Start the fetcher processes which download data from the observers:
Reto Da Forno's avatar
Reto Da Forno committed
956
957
958
    ret = start_fetcher()
    if ret == flocklab.SUCCESS:
        logger.info("FlockLab fetcher started.")
Reto Da Forno's avatar
Reto Da Forno committed
959
    else:
Reto Da Forno's avatar
Reto Da Forno committed
960
961
962
        msg = "Start function returned error. Exiting..."
        os.kill(os.getpid(), signal.SIGTERM)
        rs = flocklab.error_logandexit(msg, ret)
963

Reto Da Forno's avatar
Reto Da Forno committed
964
965
966
967
    # Get needed metadata ---
    try:
        (cn, cur) = flocklab.connect_to_db()
    except:
Reto Da Forno's avatar
Reto Da Forno committed
968
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
969
970
971
972
973
974
975
976
977
978
979
980
    rs = flocklab.get_test_owner(cur, testid)
    if isinstance(rs, tuple):
        owner_fk = rs[0]
    else:
        owner_fk = None
    rs = flocklab.get_test_obs(cur, testid)
    if isinstance(rs, tuple):
        obsdict_byid = rs[1]
    else:
        obsdict_byid = None
    # Dict for serial service: 'r' means reader (data read from the target), 'w' means writer (data written to the target):
    serialdict = {0: 'r', 1: 'w'}
981

Reto Da Forno's avatar
Reto Da Forno committed
982
983
984
    #find out the start and stoptime of the test
    cur.execute("SELECT `time_start_wish`, `time_end_wish` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
    # Times are going to be of datetime type:
985
    ret = cur.fetchone()
Reto Da Forno's avatar
Reto Da Forno committed
986
987
    teststarttime = ret[0]
    teststoptime  = ret[1]
Reto Da Forno's avatar
Reto Da Forno committed
988
    ppFileFormat  = None
989

Reto Da Forno's avatar
Reto Da Forno committed
990
991
    # Find out which services are used to allocate working threads later on ---
    # Get the XML config from the database and check which services are used in the test.
Reto Da Forno's avatar
wip    
Reto Da Forno committed
992
    servicesUsed_dict = {'gpiotracing': 'gpioTracingConf', 'powerprofiling': 'powerProfilingConf', 'serial': 'serialConf', 'datatrace': 'dataTraceConf'}
Reto Da Forno's avatar
Reto Da Forno committed
993
994
995
996
997
998
999
1000
1001
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
    ret = cur.fetchone()
    if not ret:
        msg = "No XML found in database for testid %d." %testid
        errors.append(msg)
        logger.error(msg)
        for service, xmlname in servicesUsed_dict.items():
            servicesUsed_dict[service] = True
    else:
Reto Da Forno's avatar
Reto Da Forno committed
1002
        try:
Reto Da Forno's avatar
Reto Da Forno committed
1003
1004
1005
1006
1007
            logger.debug("Got XML from database.")
            parser = lxml.etree.XMLParser(remove_comments=True)
            tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
            ns = {'d': flocklab.config.get('xml', 'namespace')}
            for service, xmlname in servicesUsed_dict.items():
1008
                if tree.xpath('//d:%s' % xmlname, namespaces=ns):
Reto Da Forno's avatar
Reto Da Forno committed
1009
                    servicesUsed_dict[service] = True
1010
                    logger.debug("Usage of %s detected." % service)