flocklab_fetcher.py 56.8 KB
Newer Older
1
2
#! /usr/bin/env python3

3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
"""
Copyright (c) 2010 - 2020, ETH Zurich, Computer Engineering Group
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
  list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
  this list of conditions and the following disclaimer in the documentation
  and/or other materials provided with the distribution.

* Neither the name of the copyright holder nor the names of its
  contributors may be used to endorse or promote products derived from
  this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

"""

import os, sys, getopt, traceback, MySQLdb, signal, random, time, errno, multiprocessing, subprocess, re, logging, __main__, threading, struct, types, queue, math, shutil, lxml.etree, tempfile
36
37
import lib.daemon as daemon
import lib.flocklab as flocklab
38
import lib.dwt_parse as dwt
Reto Da Forno's avatar
Reto Da Forno committed
39
40
from rocketlogger.data import RocketLoggerData
import pandas as pd
Reto Da Forno's avatar
Reto Da Forno committed
41
import numpy as np
Reto Da Forno's avatar
Reto Da Forno committed
42

43

Reto Da Forno's avatar
Reto Da Forno committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
logger                   = None
debug                    = False
testid                   = None 
errors                   = []
FetchObsThread_list      = []
FetchObsThread_stopEvent = None
FetchObsThread_queue     = None
obsfiledir               = None
testresultsdir           = None
testresultsfile_dict     = {}
mainloop_stop            = False
owner_fk                 = None
obsdict_byid             = None
serialdict               = None
58
59

ITEM_TO_PROCESS = 0
Reto Da Forno's avatar
Reto Da Forno committed
60
ITEM_PROCESSED  = 1
61

62

63
64
65
66
67
68
##############################################################################
#
# Error classes
#
##############################################################################
class DbFileEof(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
69
    pass
70
71

class DbFileReadError(Exception):
Reto Da Forno's avatar
Reto Da Forno committed
72
73
74
75
    def __init__(self, expectedSize, actualSize, fpos):
        self.expectedSize = expectedSize
        self.actualSize = actualSize
        self.fpos = fpos
76
77
78
79
80
81
82
83
84
85
### END Error classes



##############################################################################
#
# Class ServiceInfo
#
##############################################################################
class ServiceInfo():
Reto Da Forno's avatar
Reto Da Forno committed
86
87
88
    def __init__(self, servicename):
        self.servicename = servicename
        self.files = []
89
        self.pattern = "^%s_[0-9]+\.[a-z]+$" % servicename
Reto Da Forno's avatar
Reto Da Forno committed
90
91
92
93
94
95
96
97
98
99
100
    
    def matchFileName(self, filename):
        return re.search(self.pattern, os.path.basename(filename)) is not None
        
    def addFile(self, filename):
        self.files.append(filename)
    
    def stripFileList(self, removelast=True):
        self.files.sort()
        if ((len(self.files) > 0) and removelast):
            self.files.pop()
101
102
103
104
105
106
107
108
109
### END ServiceInfo


##############################################################################
#
# sigterm_handler
#
##############################################################################
def sigterm_handler(signum, frame):
Reto Da Forno's avatar
Reto Da Forno committed
110
111
112
113
114
115
116
117
118
119
120
    """If the program is terminated by sending it the signal SIGTERM 
    (e.g. by executing 'kill') or SIGINT (pressing ctrl-c), 
    this signal handler is invoked for cleanup."""
    
    global mainloop_stop
    global FetchObsThread_stopEvent

    logger.info("Process received SIGTERM or SIGINT signal")
        
    # Signal all observer fetcher threads to stop:
    logger.debug("Stopping observer fetcher threads...")
Reto Da Forno's avatar
Reto Da Forno committed
121
    shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
Reto Da Forno's avatar
Reto Da Forno committed
122
123
124
125
126
127
128
129
    try:
        FetchObsThread_stopEvent.set()
    except:
        pass
    for thread in FetchObsThread_list:
        try:
            thread.join(shutdown_timeout)
        except:
130
            logger.warning("Fetcher thread did not stop within %d seconds." % shutdown_timeout)
Reto Da Forno's avatar
Reto Da Forno committed
131
132
133
    # Set DB status:
    logger.debug("Setting test status in DB to 'syncing'...")
    try:
Reto Da Forno's avatar
Reto Da Forno committed
134
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
135
136
137
138
        flocklab.set_test_status(cur, cn, testid, 'syncing')
        cur.close()
        cn.close()
    except:
139
        logger.warning("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
140
141
142
143
    
    # Tell the main loop to stop:
    mainloop_stop = True
    logger.debug("Set stop signal for main loop.")
144
145
146
147
148
149
150
151
152
### END sigterm_handler


##############################################################################
#
# Functions for parsing observer DB files data
#
##############################################################################
def parse_serial(buf):
Reto Da Forno's avatar
Reto Da Forno committed
153
154
    _data = struct.unpack("iii%ds" % (len(buf) - 12), buf) #int service; struct timeval timestamp;char * data
    return (_data[0], _data[3], "%i.%06i" % (_data[1], _data[2]))
155
156
157
158
159
160
161
162


##############################################################################
#
# Functions for converting observer DB data
#
##############################################################################
def convert_serial(obsdata, observer_id, node_id):
Reto Da Forno's avatar
Reto Da Forno committed
163
    try:
164
        result = "%s,%s,%s,%s,%s\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]], obsdata[1].decode('utf8').rstrip())
Reto Da Forno's avatar
Reto Da Forno committed
165
    except UnicodeDecodeError:
Reto Da Forno's avatar
Reto Da Forno committed
166
        # discard result, return empty string
167
        result = "%s,%s,%s,%s,\n" % (obsdata[2], observer_id, node_id, serialdict[obsdata[0]])
Reto Da Forno's avatar
Reto Da Forno committed
168
    return result
169
170
171
172
173
174


##############################################################################
#
# read_from_db_file: Read from an open DB file from an observer
#
175
##############################################################################
176
def read_from_db_file(dbfile):
Reto Da Forno's avatar
Reto Da Forno committed
177
178
179
180
181
182
183
184
185
186
187
188
    _buf = dbfile.read(4)
    if len(_buf) < 4:
        dbfile.close()
        raise DbFileEof()
    else:
        _size = struct.unpack("<I",_buf)
        _buf = dbfile.read(_size[0])
        if len(_buf) != _size[0]:
            _fpos = dbfile.tell() - 4 - len(_buf)
            dbfile.close()
            raise DbFileReadError(_size[0], len(_buf), _fpos)
        return _buf
189
190
191
192
193
### END read_from_db_file


##############################################################################
#
194
# worker_dbfiles: Parses observer DB files.
195
196
#
##############################################################################
197
def worker_dbfiles(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, commitsize=1, parse_f=None, convert_f=None, logqueue=None):
Reto Da Forno's avatar
Reto Da Forno committed
198
199
200
201
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
Reto Da Forno's avatar
Reto Da Forno committed
202
        input_filename = "%s/%s" % (fdir,f)
Reto Da Forno's avatar
Reto Da Forno committed
203
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
204
        # Open file:
Reto Da Forno's avatar
Reto Da Forno committed
205
        dbfile = open(input_filename, 'rb')
Reto Da Forno's avatar
Reto Da Forno committed
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
        rows = 0
        conv_values = []
        while not dbfile.closed:
            # Process DB file line by line:
            try:
                # Parse one line:
                buf = read_from_db_file(dbfile)
                obsdata = parse_f(buf)
                # Convert data if needed:
                if convert_f != None:
                    conv_data = convert_f(obsdata, obsid, nodeid)
                    conv_values.append(conv_data)
                    rows += 1
                # Visualize data:
                if (commitsize > 0) & (rows >= commitsize):
                    # Write data to file:
Reto Da Forno's avatar
Reto Da Forno committed
222
                    #logqueue.put_nowait((loggername, logging.DEBUG, "Opening file %s for writing..." % (resultfile_path)))
Reto Da Forno's avatar
Reto Da Forno committed
223
224
225
226
227
                    resultfile_lock.acquire()
                    f = open(resultfile_path, 'a')
                    f.writelines(conv_values)
                    f.close()
                    resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
228
                    logqueue.put_nowait((loggername, logging.DEBUG, "Committed results to %s after %d rows" % (resultfile_path, rows)))
Reto Da Forno's avatar
Reto Da Forno committed
229
230
231
232
233
234
                    rows = 0
                    conv_values = []
            except DbFileEof:
                # logqueue.put_nowait((loggername, logging.DEBUG, "DbFileEof has occurred."))
                break # dbfile has been closed in parser (most likely because EOF was reached)
            except DbFileReadError as err:
Reto Da Forno's avatar
Reto Da Forno committed
235
                msg = "%s: Packet size (%i) did not match payload size (%i) @ %d." %(input_filename, err.expectedSize, err.actualSize, err.fpos)
Reto Da Forno's avatar
Reto Da Forno committed
236
237
                _errors.append((msg, errno.EIO, obsid))
                logqueue.put_nowait((loggername, logging.ERROR, msg))
238
        if (len(conv_values) > 0):
Reto Da Forno's avatar
Reto Da Forno committed
239
            # There is still data left. Do a last commit
240
241
242
243
244
245
            # Write data to file:
            resultfile_lock.acquire()
            f = open(resultfile_path, 'a')
            f.writelines(conv_values)
            f.close()
            resultfile_lock.release()
Reto Da Forno's avatar
Reto Da Forno committed
246
            logqueue.put_nowait((loggername, logging.DEBUG, "Committed final results to %s after %d rows" % (resultfile_path, rows)))
247
        # Remove processed file:
Reto Da Forno's avatar
Reto Da Forno committed
248
        os.unlink(input_filename)
Reto Da Forno's avatar
Reto Da Forno committed
249
250
251
252
253
254
255
256
    except:
        msg = "Error in worker process: %s: %s\n%s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
257
### END worker_dbfiles
258
259


Reto Da Forno's avatar
Reto Da Forno committed
260
261
262
263
264
265
266
##############################################################################
#
# worker_gpiotracing: Worker function for converting and aggregating gpio
#               tracing data. Unlike for the other services, this function works on
#               whole observer DB files.
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
267
def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
Reto Da Forno's avatar
Reto Da Forno committed
268
269
    try:
        _errors = []
270
        cur_p = multiprocessing.current_process()
Reto Da Forno's avatar
Reto Da Forno committed
271
        (itemtype, obsid, fdir, f, workerstate) = queueitem
272
        inputfilename = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
273
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
274
275

        with open(resultfile_path, "a") as outfile:
276
            infile = open(inputfilename, "r")
277
278
            for line in infile:
                try:
279
280
                    (timestamp, ticks, pin, level) = line.strip().split(',', 3)
                    outfile.write("%s,%s,%s,%s,%s,%s\n" % (timestamp, ticks, obsid, nodeid, pin, level))
281
282
283
284
                except ValueError:
                    logqueue.put_nowait((loggername, logging.ERROR, "Could not parse line '%s' in gpiotracing worker process." % line))
                    break
            infile.close()
285
        os.remove(inputfilename)
Reto Da Forno's avatar
Reto Da Forno committed
286
    except:
Reto Da Forno's avatar
Reto Da Forno committed
287
        msg = "Error in gpiotracing worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
288
289
290
291
292
293
294
295
296
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_gpiotracing


297
298
299
##############################################################################
#
# worker_powerprof: Worker function for converting and aggregating power
Reto Da Forno's avatar
Reto Da Forno committed
300
301
#        profiling data. Unlike for the other services, this function works on
#        whole observer DB files.
302
303
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
304
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
Reto Da Forno's avatar
Reto Da Forno committed
305
306
307
308
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
309
        inputfilename = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
310
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
311

Reto Da Forno's avatar
Reto Da Forno committed
312
313
314
315
316
        if arg and arg == 'rld':
            # RLD file format
            # simply move the file into the results directory
            try:
                resfilename = "%s.%s.%s.rld" % (os.path.splitext(resultfile_path)[0], obsid, nodeid)
317
                os.rename(inputfilename, resfilename)
Reto Da Forno's avatar
Reto Da Forno committed
318
319
320
321
322
323
324
            except FileExistsError:
                # TODO: properly handle case where file already exists (several rld files per observer)
                msg = "File '%s' already exists, dropping test results." % (resfilename)
                _errors.append((msg, errno.EEXIST, obsid))
                logqueue.put_nowait((loggername, logging.ERROR, msg))
        else:
            # CSV file format
325
            rld_data = RocketLoggerData(inputfilename).merge_channels()
Reto Da Forno's avatar
Reto Da Forno committed
326
327
328
329
330
331
332
333
334
            # get network time and convert to UNIX timestamp (UTC)
            timeidx = rld_data.get_time(absolute_time=True, time_reference='network')     # TODO adjust parameters for RL 1.99+
            timeidxunix = timeidx.astype('uint64') / 1e9   # convert to s
            current_ch = rld_data.get_data('I1') * 1000    # convert to mA
            voltage_ch = rld_data.get_data('V2') - rld_data.get_data('V1')    # voltage difference
            rld_dataframe = pd.DataFrame(np.hstack((current_ch, voltage_ch)), index=timeidxunix, columns=['I', 'V'])
            rld_dataframe.insert(0, 'observer_id', obsid)
            rld_dataframe.insert(1, 'node_id', nodeid)
            rld_dataframe.to_csv(resultfile_path, sep=',', index_label='time', header=False, mode='a')
Reto Da Forno's avatar
Reto Da Forno committed
335

336
            os.remove(inputfilename)
Reto Da Forno's avatar
Reto Da Forno committed
337
338
339
340
341
    except:
        msg = "Error in powerprof worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
Reto Da Forno's avatar
Reto Da Forno committed
342
343
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
Reto Da Forno's avatar
Reto Da Forno committed
344
345
346
347
348
349
        return (_errors, tuple(processeditem))
### END worker_powerprof


##############################################################################
#
350
# worker_logs
Reto Da Forno's avatar
Reto Da Forno committed
351
352
#
##############################################################################
Reto Da Forno's avatar
Reto Da Forno committed
353
def worker_logs(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
Reto Da Forno's avatar
Reto Da Forno committed
354
355
356
357
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
358
        inputfilename = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
Reto Da Forno committed
359
        loggername = "(%s.%d) " % (cur_p.name, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
360
361

        with open(resultfile_path, "a") as outfile:
362
            infile = open(inputfilename, "r")
Reto Da Forno's avatar
Reto Da Forno committed
363
            for line in infile:
364
365
                (timestamp, msg) = line.strip().split(',', 1)
                outfile.write("%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, msg))
Reto Da Forno's avatar
Reto Da Forno committed
366
            infile.close()
367
        os.remove(inputfilename)
Reto Da Forno's avatar
Reto Da Forno committed
368
    except:
369
        msg = "Error in logs worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
370
371
372
373
374
375
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
376
### END worker_logs()
377
378


379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
##############################################################################
#
# worker_serial
#
##############################################################################
def worker_serial(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
        inputfilename = "%s/%s" % (fdir, f)
        loggername = "(%s.%d) " % (cur_p.name, obsid)

        with open(resultfile_path, "a") as outfile:
            infile = open(inputfilename, "r")
394
395
396
397
398
            # read line by line and check for decode errors
            while True:
                try:
                    line = infile.readline()
                except UnicodeDecodeError:
399
                    continue      # ignore invalid lines
400
401
                if not line:
                    break
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
                try:
                    (timestamp, msg) = line.strip().split(',', 1)
                except:
                    continue
                result = "%s,%s,%s,r,%s\n" % (timestamp, obsid, nodeid, msg.rstrip())
                outfile.write(result)
            infile.close()
        os.remove(inputfilename)
    except:
        msg = "Error in serial worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_serial()


Reto Da Forno's avatar
wip    
Reto Da Forno committed
421
422
423
424
425
426
427
428
429
430
##############################################################################
#
# worker_datatrace
#
##############################################################################
def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue=None, arg=None):
    try:
        _errors = []
        cur_p = multiprocessing.current_process()
        (itemtype, obsid, fdir, f, workerstate) = queueitem
Reto Da Forno's avatar
Reto Da Forno committed
431
        input_filename = "%s/%s" % (fdir, f)
Reto Da Forno's avatar
wip    
Reto Da Forno committed
432
        loggername = "(%s.%d) " % (cur_p.name, obsid)
433
        # parse the file
Reto Da Forno's avatar
Reto Da Forno committed
434
435
436
437
438
439
440
        # first line of the log file contains the variable names
        varnames = ""
        with open(input_filename, "r") as f:
            varnames = f.readline().strip().split()
        (fd1, tmpfile1) = tempfile.mkstemp()
        (fd2, tmpfile2) = tempfile.mkstemp()
        dwt.parse_dwt_output(input_filename, tmpfile1)
441
        # apply linear regression to correct the timestamps
Reto Da Forno's avatar
Reto Da Forno committed
442
        dwt.correct_ts_with_regression(tmpfile1, tmpfile2)
Reto Da Forno's avatar
wip    
Reto Da Forno committed
443
        with open(resultfile_path, "a") as outfile:
444
            infile = open(tmpfile2, "r")
Reto Da Forno's avatar
wip    
Reto Da Forno committed
445
            for line in infile:
Reto Da Forno's avatar
Reto Da Forno committed
446
447
448
449
450
451
452
453
                # input format: global_ts, comparator, data, PC, operation, local_ts
                (timestamp, var, val, pc, access, localts) = line.strip().split(',')
                if access == 'operation' or access == '' or var == '':
                    continue
                if flocklab.parse_int(var) < len(varnames):
                    var = varnames[flocklab.parse_int(var)]
                # output format: timestamp,observer_id,node_id,variable,value,access,pc
                outfile.write("%s,%s,%s,%s,%s,%s,%s\n" % (timestamp, obsid, nodeid, var, val, access, pc))
Reto Da Forno's avatar
wip    
Reto Da Forno committed
454
            infile.close()
455
456
457
        # debug
        #shutil.copyfile(input_filename, "%s_raw" % resultfile_path)
        #shutil.copyfile(tmpfile1, "%s_uncorrected.csv" % resultfile_path)
Reto Da Forno's avatar
wip    
Reto Da Forno committed
458
459
    except:
        msg = "Error in datatrace worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
460
        # for some reason, the logging below does not work properly -> print msg into the log directly
461
462
        #logger = flocklab.get_logger()
        #logger.error(msg)
Reto Da Forno's avatar
wip    
Reto Da Forno committed
463
464
465
        _errors.append((msg, errno.ECOMM, obsid))
        logqueue.put_nowait((loggername, logging.ERROR, msg))
    finally:
466
467
468
469
        # delete files
        os.remove(input_filename)
        os.remove(tmpfile1)
        os.remove(tmpfile2)
Reto Da Forno's avatar
wip    
Reto Da Forno committed
470
471
472
473
474
475
        processeditem = list(queueitem)
        processeditem[0] = ITEM_PROCESSED
        return (_errors, tuple(processeditem))
### END worker_datatrace()


476
477
478
##############################################################################
#
# worker_callback: Callback function which reports errors from worker processes
Reto Da Forno's avatar
Reto Da Forno committed
479
#        back to the main process
480
481
482
#
##############################################################################
def worker_callback(result):
Reto Da Forno's avatar
Reto Da Forno committed
483
484
485
486
487
    global errors
    global FetchObsThread_queue
    
    if len(result[0]) > 0:
        for (err, eno, obsid) in result:
Reto Da Forno's avatar
Reto Da Forno committed
488
            msg = "Error %d when processing results for Observer ID %s: %s" % (eno, obsid, err)
Reto Da Forno's avatar
Reto Da Forno committed
489
490
491
492
493
494
495
496
            errors.append(msg)
    
    try:
        FetchObsThread_queue.put(item=result[1], block=True, timeout=10)
    except queue.Full:
        msg = "Queue full after processing element"
        logger.error(msg)
    return 0
497
498
499
500
501
502
503
504
505
### END worker_callback


##############################################################################
#
# LogQueueThread
#
##############################################################################
class LogQueueThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
506
507
508
509
    """    Thread which logs from queue to logfile.
    """ 
    def __init__(self, logqueue, logger, stopEvent):
        threading.Thread.__init__(self) 
Reto Da Forno's avatar
Reto Da Forno committed
510
511
512
        self._logger    = logger
        self._stopEvent = stopEvent
        self._logqueue  = logqueue
Reto Da Forno's avatar
Reto Da Forno committed
513
514
515
516
517
518
519
520
521
522
523

    def run(self):
        self._logger.info("LogQueueThread started")
            
        # Let thread run until someone calls terminate() on it:
        while not self._stopEvent.is_set():
            try:
                (loggername, loglevel, msg) = self._logqueue.get(block=True, timeout=1)
                self._logger.log(loglevel, loggername + msg)
            except queue.Empty:
                pass
524
        
Reto Da Forno's avatar
Reto Da Forno committed
525
526
        # Stop the process:
        self._logger.info("LogQueueThread stopped")
527
528
529
530
531
532
533
534
535
### END LogQueueThread


##############################################################################
#
# FetchObsThread
#
##############################################################################
class FetchObsThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
536
537
    """    Thread which downloads database files from an observer to the server.
    """ 
Reto Da Forno's avatar
Reto Da Forno committed
538
    def __init__(self, obsid, obsethernet, dirname, debugdirname, workQueue, stopEvent):
Reto Da Forno's avatar
Reto Da Forno committed
539
540
        threading.Thread.__init__(self) 
        self._obsid            = obsid
541
542
543
        self._obsethernet      = obsethernet
        self._obsfiledir       = dirname
        self._obsfiledebugdir  = debugdirname
Reto Da Forno's avatar
Reto Da Forno committed
544
545
        self._workQueue        = workQueue
        self._stopEvent        = stopEvent
546
        self._logger           = logger
Reto Da Forno's avatar
Reto Da Forno committed
547
        
Reto Da Forno's avatar
Reto Da Forno committed
548
549
        self._min_sleep        = flocklab.config.getint("fetcher", "min_sleeptime")
        self._max_randsleep    = flocklab.config.getint("fetcher", "max_rand_sleeptime")
550
        self._obstestresfolder = "%s/%d" % (flocklab.config.get("observer", "testresultfolder"), testid)
Reto Da Forno's avatar
Reto Da Forno committed
551
552
553
        
    def run(self):
        try:
Reto Da Forno's avatar
Reto Da Forno committed
554
            self._loggerprefix = "(FetchObsThread.%d) "%self._obsid
Reto Da Forno's avatar
Reto Da Forno committed
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
            self._logger.info(self._loggerprefix + "FetchObsThread starting...")
            removelast = True
                
            # Let thread run until someone calls terminate() on it:
            while removelast == True:
                """ Get data from the observer over SCP. 
                Then request data from the observer and store it in the server's filesystem. 
                Then sleep some random time before fetching data again.
                """
                # Wait for some random time:
                waittime =self._min_sleep + random.randrange(0,self._max_randsleep)
                #DEBUG self._logger.debug(self._loggerprefix + "Going to wait for %d seconds" %(waittime))
                self._stopEvent.wait(waittime) # The wait will be interrupted if the stop signal has been set causing the thread to download all remaining files
                if self._stopEvent.is_set():
                    removelast = False
                #self._logger.debug(self._loggerprefix + "Woke up")
                # Get list of available files
Reto Da Forno's avatar
Reto Da Forno committed
572
                cmd = ['ssh' , self._obsethernet, "ls %s/" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
573
574
575
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)  # universal_newlines makes sure that a string is returned instead of a byte object
                out, err = p.communicate(None)
                rs = p.returncode
576
                if (rs == flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
577
                    services = {}
578
                    for servicename in [ "gpio_monitor", "powerprofiling", "serial", "error", "timesync", "datatrace" ]:
Reto Da Forno's avatar
Reto Da Forno committed
579
580
                        services[servicename] = ServiceInfo(servicename)
                    # Read filenames
Reto Da Forno's avatar
Reto Da Forno committed
581
                    for resfile in out.split():
Reto Da Forno's avatar
Reto Da Forno committed
582
583
                        # Check name and append to corresponding list
                        for service in services.values():
Reto Da Forno's avatar
Reto Da Forno committed
584
585
                            if service.matchFileName(resfile):
                                service.addFile("%s/%s" % (self._obstestresfolder, resfile))
Reto Da Forno's avatar
Reto Da Forno committed
586
587
588
589
590
                                break
                    copyfilelist = []
                    # Remove latest from each list as the observer might still be writing into it (unless stop event has been set).
                    for service in services.values():
                        service.stripFileList(removelast)
Reto Da Forno's avatar
Reto Da Forno committed
591
592
                        for resfile in service.files:
                            copyfilelist.append(resfile)
Reto Da Forno's avatar
Reto Da Forno committed
593
                        #if (len(service.files) > 0):
594
                        #    self._logger.debug(self._loggerprefix + "Will process files %s for service %s" % (" ".join(service.files), service.servicename))
Reto Da Forno's avatar
Reto Da Forno committed
595
596
597
                    
                    if len(copyfilelist) > 0:
                        # Download the database files:
Reto Da Forno's avatar
Reto Da Forno committed
598
                        self._logger.debug(self._loggerprefix + "Downloading results files %s" % (" ".join(copyfilelist)))
Reto Da Forno's avatar
Reto Da Forno committed
599
                        cmd = ['scp', '-q' ]
Reto Da Forno's avatar
Reto Da Forno committed
600
                        cmd.extend(["%s:%s" % (self._obsethernet, x) for x in copyfilelist])
601
                        cmd.append("%s/" % self._obsfiledir)
Reto Da Forno's avatar
Reto Da Forno committed
602
603
604
605
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
                        if rs != 0:
Reto Da Forno's avatar
Reto Da Forno committed
606
                            self._logger.debug(self._loggerprefix + "Could not download all results files from observer. Dataloss occurred for this observer.")
Reto Da Forno's avatar
Reto Da Forno committed
607
                            self._logger.debug(self._loggerprefix + "Tried to execute %s, result was %d, stdout: %s, error: %s" % (str(cmd), rs, out, err))
Reto Da Forno's avatar
Reto Da Forno committed
608
                        else:
Reto Da Forno's avatar
Reto Da Forno committed
609
                            self._logger.debug("Downloaded results files from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
610
611
612
                            # put a copy to the debug directory
                            for f in copyfilelist:
                                fname = os.path.basename(f)
Reto Da Forno's avatar
Reto Da Forno committed
613
                                shutil.copyfile("%s/%s" % (self._obsfiledir, fname), "%s/%s" % (self._obsfiledebugdir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
614
615
616
617
618
619
620
621
                            # Tell the fetcher to start working on the files:
                            for f in copyfilelist:
                                fname = os.path.basename(f)
                                try:
                                    self._workQueue.put(item=(ITEM_TO_PROCESS, self._obsid, self._obsfiledir, fname, None), block=True, timeout=10)
                                except queue.Full:
                                    # Make sure the file is downloaded again at a later point:
                                    copyfilelist.remove(f)
Reto Da Forno's avatar
Reto Da Forno committed
622
                                    os.unlink("%s/%s" % (self._obsfiledir, fname))
623
                                    self._logger.warning(self._loggerprefix + "FetchObsThread queue is full. Cannot put %s/%s on it." % (self._obsfiledir, fname))
Reto Da Forno's avatar
Reto Da Forno committed
624
625
                            # Remove remote files if any are left:
                            if (len(copyfilelist) > 0):
626
                                cmd = ['ssh' ,'%s'%(self._obsethernet), "cd %s;"%self._obstestresfolder, "rm"]
Reto Da Forno's avatar
Reto Da Forno committed
627
628
629
630
631
                                cmd.extend(copyfilelist)
                                self._logger.debug(self._loggerprefix + "Removing files on observer...")
                                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                                out, err = p.communicate(None)
                                rs = p.wait()
632
                                if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
633
                                    self._logger.error(self._loggerprefix + "Could not remove files on observer, result was %d, error: %s" % (rs, err))
Reto Da Forno's avatar
Reto Da Forno committed
634
635
636
637
                            else:
                                self._logger.debug(self._loggerprefix + "No files left to delete on observer.")
                    else:
                        self._logger.debug(self._loggerprefix + "No files to download from observer.")
Reto Da Forno's avatar
Reto Da Forno committed
638

Reto Da Forno's avatar
Reto Da Forno committed
639
                    if removelast == False: # this is the last execution of the while loop
640
                        cmd = ['ssh' ,'%s'%(self._obsethernet), "rm -rf %s" % self._obstestresfolder]
Reto Da Forno's avatar
Reto Da Forno committed
641
642
643
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                        out, err = p.communicate(None)
                        rs = p.wait()
644
                        if (rs != flocklab.SUCCESS):
Reto Da Forno's avatar
Reto Da Forno committed
645
                            self._logger.error(self._loggerprefix + "Could not remove results directory from observer, result was %d. Error: %s" % (rs, err.strip()))
Reto Da Forno's avatar
Reto Da Forno committed
646

Reto Da Forno's avatar
Reto Da Forno committed
647
                else:
Reto Da Forno's avatar
Reto Da Forno committed
648
                    self._logger.error(self._loggerprefix + "SSH to observer did not succeed, fetcher thread terminated with code %d. Error: %s" % (rs, err.strip()))
Reto Da Forno's avatar
Reto Da Forno committed
649
                    break   # abort
Reto Da Forno's avatar
Reto Da Forno committed
650
651
        
        except:
Reto Da Forno's avatar
Reto Da Forno committed
652
            logger.error(self._loggerprefix + "FetchObsThread crashed: %s, %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
Reto Da Forno's avatar
Reto Da Forno committed
653
654
655
        
        # Stop the process:
        self._logger.info(self._loggerprefix + "FetchObsThread stopped")
656
657
658
659
660
661
662
663
664
### END FetchObsThread


##############################################################################
#
# Start Fetcher
#
##############################################################################
def start_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
665
666
667
668
669
670
671
672
673
674
675
676
677
    global obsfiledir
    global FetchObsThread_list
    global FetchObsThread_queue
    global FetchObsThread_stopEvent
    global obsfetcher_dict
    
    # Daemonize the process ---
    daemon.daemonize(None, closedesc=False)
    logger.info("Daemon started")
    logger.info("Going to fetch data for test ID %d" %testid)
        
    # Get needed metadata from database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
678
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
679
    except:
Reto Da Forno's avatar
Reto Da Forno committed
680
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
681
    try:
682
683
684
685
686
        cur.execute("""SELECT `a`.observer_id, `a`.ethernet_address 
                       FROM `tbl_serv_observer` AS `a` 
                       LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `b` ON `a`.serv_observer_key = `b`.observer_fk 
                       WHERE `b`.test_fk = %d GROUP BY `a`.observer_id;
                    """ % testid)
Reto Da Forno's avatar
Reto Da Forno committed
687
688
    except MySQLdb.Error as err:
        msg = str(err)
Reto Da Forno's avatar
Reto Da Forno committed
689
        flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
690
    except:
691
        logger.warning("Error %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
692
693
694
695
696
697
698
699
700
701
    rs = cur.fetchall()
    cur.close()
    cn.close()
    logger.debug("Got list of FlockLab observers from database.")
    if not rs:
        logger.info("No observers found for this test. Nothing has to be done, thus exiting...")
        return errno.ENODATA
        
    # Start fetcher threads ---
    # Create a directory structure to store the downloaded files from the DB:
702
    obsfiledir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
703
704
    if not os.path.exists(obsfiledir):
        os.makedirs(obsfiledir)
705
    obsfiledebugdir = "%s/%d" % (flocklab.config.get('fetcher', 'obsfile_debug_dir'), testid)
Reto Da Forno's avatar
Reto Da Forno committed
706
707
708
709
710
711
712
713
714
    if not os.path.exists(obsfiledebugdir):
        os.makedirs(obsfiledebugdir)
        #DEBUG logger.debug("Created %s"%obsfiledir)
    # Start one fetching thread per observer
    FetchObsThread_stopEvent = threading.Event()
    FetchObsThread_queue = queue.Queue(maxsize=10000)
    for observer in rs:
        obsid = int(observer[0])
        # Create needed directories:
Reto Da Forno's avatar
Reto Da Forno committed
715
        dirname = "%s/%d" % (obsfiledir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
716
717
        if (not os.path.exists(dirname)):
            os.makedirs(dirname)
Reto Da Forno's avatar
Reto Da Forno committed
718
        debugdirname = "%s/%d" % (obsfiledebugdir, obsid)
Reto Da Forno's avatar
Reto Da Forno committed
719
720
721
722
        if (not os.path.exists(debugdirname)):
            os.makedirs(debugdirname)
        # Start thread: 
        try:
Reto Da Forno's avatar
Reto Da Forno committed
723
            thread = FetchObsThread(obsid, observer[1], dirname, debugdirname, FetchObsThread_queue, FetchObsThread_stopEvent)
Reto Da Forno's avatar
Reto Da Forno committed
724
725
            FetchObsThread_list.append(thread)
            thread.start()
Reto Da Forno's avatar
Reto Da Forno committed
726
            logger.debug("Started fetcher thread for observer %d" % (obsid))
Reto Da Forno's avatar
Reto Da Forno committed
727
        except:
728
            logger.warning("Error when starting fetcher thread for observer %d: %s, %s" % (obsid, str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
729
            continue
Reto Da Forno's avatar
Reto Da Forno committed
730
    
731
    return flocklab.SUCCESS
732
733
734
735
736
737
738
739
740
### END start_fetcher


##############################################################################
#
# Stop Fetcher
#
##############################################################################
def stop_fetcher():
Reto Da Forno's avatar
Reto Da Forno committed
741
742
743
744
745
746
747
    # Get oldest running instance of the fetcher for the selected test ID which is the main process and send it the terminate signal:
    try:
        pid = flocklab.get_fetcher_pid(testid)
        # Signal the process to stop:
        if (pid > 0):
            # Do not stop this instance if it is the only one running:
            if (pid == os.getpid()):
Reto Da Forno's avatar
Reto Da Forno committed
748
749
                raise ValueError
            logger.debug("Sending SIGTERM signal to process %d" % pid)
Reto Da Forno's avatar
Reto Da Forno committed
750
751
752
753
754
755
756
757
758
759
760
761
762
763
            os.kill(pid, signal.SIGTERM)
            # wait for process to finish (timeout..)
            shutdown_timeout = flocklab.config.getint("fetcher", "shutdown_timeout")
            pidpath = "/proc/%d" % pid
            while os.path.exists(pidpath) & (shutdown_timeout > 0):
                time.sleep(1)
                shutdown_timeout = shutdown_timeout - 1
            if os.path.exists(pidpath):
                logger.error("Fetcher with PID %d is still running, killing process..." % pid)
                # send kill signal
                os.kill(pid, signal.SIGKILL)
                time.sleep(3)
                # check if there is a remaining fetcher process
                pid = flocklab.get_fetcher_pid(testid)
Reto Da Forno's avatar
Reto Da Forno committed
764
                while pid > 0 and pid != os.getpid():
765
                    logger.warning("Found a remaining fetcher thread with PID %d, killing it now..." % (pid))
766
                    os.kill(pid, signal.SIGKILL)
Reto Da Forno's avatar
Reto Da Forno committed
767
                    pid = flocklab.get_fetcher_pid(testid)
Reto Da Forno's avatar
Reto Da Forno committed
768
                raise ValueError
Reto Da Forno's avatar
Reto Da Forno committed
769
        else:
Reto Da Forno's avatar
Reto Da Forno committed
770
            raise ValueError
771
    except ValueError:
Reto Da Forno's avatar
Reto Da Forno committed
772
773
774
        # Set DB status in order to allow dispatcher and scheduler to go on.:
        logger.debug("Setting test status in DB to 'synced'...")
        try:
Reto Da Forno's avatar
Reto Da Forno committed
775
            (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
776
777
778
779
            flocklab.set_test_status(cur, cn, testid, 'synced')
            cur.close()
            cn.close()
        except:
780
            logger.warning("Could not connect to database.")
Reto Da Forno's avatar
Reto Da Forno committed
781
782
        return errno.ENOPKG
    
783
    return flocklab.SUCCESS
784
785
786
787
788
789
790
791
792
### END stop_fetcher


##############################################################################
#
# Class WorkManager
#
##############################################################################
class WorkManager():
Reto Da Forno's avatar
Reto Da Forno committed
793
794
795
796
797
798
799
800
801
802
803
    def __init__(self):
        self.worklist = {}
        self.pattern = re.compile("_[0-9].*")
        self.workcount = 0
        
    def _next_item_with_state(self, service, obsid):
        stateitem = list(self.worklist[service][obsid][1][0])
        stateitem[4] = self.worklist[service][obsid][0]
        return tuple(stateitem)
        
    def add(self, item):
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
        try:
            service = self.pattern.sub("",item[3])
            obsid = item[1]
            if service not in self.worklist:
                self.worklist[service] = {}
            if obsid not in self.worklist[service]:
                self.worklist[service][obsid] = [None, []] # workerstate / worklist
            # if list is empty, we're good to process, otherwise just append it and return None
            if len(self.worklist[service][obsid][1]) == 0:
                self.worklist[service][obsid][1].append(item)
                self.workcount = self.workcount + 1
                return self._next_item_with_state(service, obsid)
            else:
                self.worklist[service][obsid][1].append(item)
                self.workcount = self.workcount + 1
                return None
        except:
            logger = flocklab.get_logger()
            logger.error("Error in WorkManager.add(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
Reto Da Forno's avatar
Reto Da Forno committed
823
824
        
    def done(self, item):
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
        try:
            service = self.pattern.sub("",item[3])
            obsid = item[1]
            if item[1:-1] == self.worklist[service][obsid][1][0][1:-1]:
                self.worklist[service][obsid][0] = item[4] # save state
                self.worklist[service][obsid][1].pop(0)
                self.workcount = self.workcount - 1
            else:
                logger.error("work done for item that was not enqueued: %s" % str(item))
            # if there is more work to do, return next item
            if len(self.worklist[service][obsid][1]) > 0:
                return self._next_item_with_state(service, obsid)
            else:
                return None
        except:
            logger = flocklab.get_logger()
            logger.error("Error in WorkManager.done(): %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc()))
Reto Da Forno's avatar
Reto Da Forno committed
842
843
844
845
    
    def finished(self):
        return self.workcount == 0
    
846
847
848
849
850
851
852
853
854
### END WorkManager


##############################################################################
#
# Usage
#
##############################################################################
def usage():
Reto Da Forno's avatar
Reto Da Forno committed
855
    print("Usage: %s --testid=<int> [--stop] [--debug] [--help]" % __file__)
Reto Da Forno's avatar
Reto Da Forno committed
856
857
858
859
860
    print("Options:")
    print("  --testid=<int>\t\tTest ID of test to which incoming data belongs.")
    print("  --stop\t\t\tOptional. Causes the program to stop a possibly running instance of the fetcher.")
    print("  --debug\t\t\tOptional. Print debug messages to log.")
    print("  --help\t\t\tOptional. Print this help.")
861
862
863
864
865
866
867
868
869
### END usage()


##############################################################################
#
# Main
#
##############################################################################
def main(argv):
Reto Da Forno's avatar
Reto Da Forno committed
870
871
872
873
874
875
876
877
878
879
880
881
882
883
    
    ### Get global variables ###
    global logger
    global debug
    global testid
    global testresultsdir
    global testresultsfile_dict
    global owner_fk
    global obsdict_byid
    global serialdict
    
    stop = False
    
    # Get logger:
Reto Da Forno's avatar
Reto Da Forno committed
884
    logger = flocklab.get_logger()
Reto Da Forno's avatar
Reto Da Forno committed
885
886
        
    # Get the config file ---
887
    flocklab.load_config()
Reto Da Forno's avatar
Reto Da Forno committed
888
889

    # Get command line parameters ---
890
    try:
Reto Da Forno's avatar
Reto Da Forno committed
891
892
893
        opts, args = getopt.getopt(argv, "hedt:", ["help", "stop", "debug", "testid="])
    except getopt.GetoptError as err:
        print(str(err))
894
        logger.warning(str(err))
Reto Da Forno's avatar
Reto Da Forno committed
895
896
897
898
        usage()
        sys.exit(errno.EINVAL)
    except:
        msg = "Error when getting arguments: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
899
        flocklab.error_logandexit(msg, errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
900
901
902
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
903
            sys.exit(flocklab.SUCCESS)
Reto Da Forno's avatar
Reto Da Forno committed
904
905
906
907
908
909
910
911
912
        elif opt in ("-d", "--debug"):
            debug = True
            logger.setLevel(logging.DEBUG)
        elif opt in ("-t", "--testid"):
            try:
                testid = int(arg)
            except ValueError:
                err = "Wrong API usage: testid has to be integer"
                print(str(err))
913
                logger.warning(str(err))
Reto Da Forno's avatar
Reto Da Forno committed
914
915
916
917
918
919
                usage()
                sys.exit(errno.EINVAL)
        elif opt in ("-e", "--stop"):
            stop = True
        else:
            print("Wrong API usage")
920
            logger.warning("Wrong API usage")
Reto Da Forno's avatar
Reto Da Forno committed
921
922
923
924
925
            sys.exit(errno.EINVAL)
    
    # Check if the necessary parameters are set ---
    if not testid:
        print("Wrong API usage")
926
        logger.warning("Wrong API usage")
Reto Da Forno's avatar
Reto Da Forno committed
927
928
929
930
        sys.exit(errno.EINVAL)
        
    # Check if the Test ID exists in the database ---
    try:
Reto Da Forno's avatar
Reto Da Forno committed
931
        (cn, cur) = flocklab.connect_to_db()
Reto Da Forno's avatar
Reto Da Forno committed
932
    except:
Reto Da Forno's avatar
Reto Da Forno committed
933
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
934
935
936
937
938
939
    rs = flocklab.check_test_id(cur, testid)
    cur.close()
    cn.close()
    if rs != 0:
        if rs == 3:
            msg = "Test ID %d does not exist in database." %testid
Reto Da Forno's avatar
Reto Da Forno committed
940
            flocklab.error_logandexit(msg, errno.EINVAL)
Reto Da Forno's avatar
Reto Da Forno committed
941
        else:
Reto Da Forno's avatar
Reto Da Forno committed
942
            msg = "Error when trying to get test ID from database: %s: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]))
Reto Da Forno's avatar
Reto Da Forno committed
943
            flocklab.error_logandexit(msg, errno.EIO)
Reto Da Forno's avatar
Reto Da Forno committed
944
945
946
947
948
        
    # Add Test ID to logger name ---
    logger.name += " (Test %d)"%testid
    
    # Start / stop the fetcher ---
949
    ret = flocklab.SUCCESS
Reto Da Forno's avatar
Reto Da Forno committed
950
951
952
    if stop:
        ret = stop_fetcher()
        logger.info("FlockLab fetcher stopped.")
Reto Da Forno's avatar
Reto Da Forno committed
953
954
955
956
957
958
        sys.exit(ret)

    # Start the fetcher processes which download data from the observers: 
    ret = start_fetcher()
    if ret == flocklab.SUCCESS:
        logger.info("FlockLab fetcher started.")
Reto Da Forno's avatar
Reto Da Forno committed
959
    else:
Reto Da Forno's avatar
Reto Da Forno committed
960
961
962
963
964
965
966
967
        msg = "Start function returned error. Exiting..."
        os.kill(os.getpid(), signal.SIGTERM)
        rs = flocklab.error_logandexit(msg, ret)
        
    # Get needed metadata ---
    try:
        (cn, cur) = flocklab.connect_to_db()
    except:
Reto Da Forno's avatar
Reto Da Forno committed
968
        flocklab.error_logandexit("Could not connect to database.", errno.EAGAIN)
Reto Da Forno's avatar
Reto Da Forno committed
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
    rs = flocklab.get_test_owner(cur, testid)
    if isinstance(rs, tuple):
        owner_fk = rs[0]
    else:
        owner_fk = None
    rs = flocklab.get_test_obs(cur, testid)
    if isinstance(rs, tuple):
        obsdict_byid = rs[1]
    else:
        obsdict_byid = None
    # Dict for serial service: 'r' means reader (data read from the target), 'w' means writer (data written to the target):
    serialdict = {0: 'r', 1: 'w'}
    
    #find out the start and stoptime of the test
    cur.execute("SELECT `time_start_wish`, `time_end_wish` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
    # Times are going to be of datetime type:
    ret = cur.fetchone() 
    teststarttime = ret[0]
    teststoptime  = ret[1]
Reto Da Forno's avatar
Reto Da Forno committed
988
    ppFileFormat  = None
Reto Da Forno's avatar
Reto Da Forno committed
989
990
991
    
    # Find out which services are used to allocate working threads later on ---
    # Get the XML config from the database and check which services are used in the test.
Reto Da Forno's avatar
wip    
Reto Da Forno committed
992
    servicesUsed_dict = {'gpiotracing': 'gpioTracingConf', 'powerprofiling': 'powerProfilingConf', 'serial': 'serialConf', 'datatrace': 'dataTraceConf'}
Reto Da Forno's avatar
Reto Da Forno committed
993
994
995
996
997
998
999
1000
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
    ret = cur.fetchone()
    if not ret:
        msg = "No XML found in database for testid %d." %testid
        errors.append(msg)
        logger.error(msg)
        for service, xmlname in servicesUsed_dict.items():
            servicesUsed_dict[service] = True
For faster browsing, not all history is shown. View entire blame