flocklab_dispatcher.py 66.8 KB
Newer Older
1
2
#! /usr/bin/env python3

3
import sys, os, getopt, errno, threading, shutil, time, datetime, subprocess, tempfile, queue, re, logging, traceback, __main__, types, hashlib, lxml.etree, MySQLdb, signal
4
import lib.flocklab as flocklab
Reto Da Forno's avatar
Reto Da Forno committed
5
import flocklab as fltools
6
7


8
9
logger = None
debug  = False
10
11
12
13
14
15
16
17
18
abort  = False


##############################################################################
#
# sigterm_handler
#
##############################################################################
def sigterm_handler(signum, frame):
19
    global abort
20
21
22
    logger.info("Process received SIGTERM signal.")
    abort = True
### END sigterm_handler
23
24
25
26
27
28
29
30


##############################################################################
#
# StopTestThread
#
##############################################################################
class StopTestThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
31
32
33
34
35
    """    Thread which calls the test stop script on an observer. 
    """ 
    def __init__(self, obskey, obsdict_key, errors_queue, testid):
        threading.Thread.__init__(self) 
        self._obskey        = obskey
36
37
        self._obsdict_key   = obsdict_key
        self._errors_queue  = errors_queue
Reto Da Forno's avatar
Reto Da Forno committed
38
39
40
41
42
        self._abortEvent    = threading.Event()
        self._testid        = testid
        
    def run(self):
        try:
43
            logger.debug("Start StopTestThread for observer ID %d" % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
44
            errors = []
45
            starttime = time.time()
Reto Da Forno's avatar
Reto Da Forno committed
46
            # First test if the observer is online and if the SD card is mounted: 
Reto Da Forno's avatar
Reto Da Forno committed
47
            cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), "mount | grep /dev/mmcblk0p1"]
Reto Da Forno's avatar
Reto Da Forno committed
48
49
50
51
52
53
54
55
56
57
58
59
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            while p.returncode == None:
                self._abortEvent.wait(1.0)
                p.poll()
            if self._abortEvent.is_set():
                p.kill()
            else:
                out, err = p.communicate()
            rs = p.returncode
            if (rs != 0):
                if (rs == 1):
                    if ("No such file or directory" in err):
60
                        msg = "SD card on observer ID %s is not mounted, observer will thus be omitted for this test." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
61
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
62
                        msg = "Observer ID %s is not reachable (returned %d: %s, %s)." % (self._obsdict_key[self._obskey][1], rs, out, err)
Reto Da Forno's avatar
Reto Da Forno committed
63
                else:
64
                    msg = "Observer ID %s is not responsive (SSH returned %d)." % (self._obsdict_key[self._obskey][1], rs)
Reto Da Forno's avatar
Reto Da Forno committed
65
66
67
68
                errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                logger.error(msg)
            else:
                # Call the script on the observer which stops the test:
Reto Da Forno's avatar
Reto Da Forno committed
69
                remote_cmd = flocklab.config.get("observer", "stoptestscript") + " --testid=%d" % self._testid
Reto Da Forno's avatar
Reto Da Forno committed
70
71
                if debug:
                    remote_cmd += " --debug"
Reto Da Forno's avatar
Reto Da Forno committed
72
                cmd = ['ssh' ,'%s' % (self._obsdict_key[self._obskey][2]), remote_cmd]
Reto Da Forno's avatar
Reto Da Forno committed
73
74
75
76
77
78
79
80
81
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
                while p.returncode == None:
                    self._abortEvent.wait(1.0)
                    p.poll()
                if self._abortEvent.is_set():
                    p.kill()
                else:
                    out, err = p.communicate()
                rs = p.returncode
82
                if (rs == flocklab.SUCCESS):
83
                    logger.debug("Test stop script on observer ID %s succeeded (took %us)." % (self._obsdict_key[self._obskey][1], int(time.time() - starttime)))
Reto Da Forno's avatar
Reto Da Forno committed
84
                elif (rs == 255):
85
                    msg = "Observer ID %s is not reachable, thus not able to stop test. Dataloss occurred possibly for this observer." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
86
87
88
                    errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                    logger.error(msg)
                else:
Reto Da Forno's avatar
Reto Da Forno committed
89
                    errors.append(("Test stop script on observer ID %s failed with error code %d." % (str(self._obsdict_key[self._obskey][1]), rs), rs, self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
90
                    logger.error("Test stop script on observer ID %s failed with error code %d:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out).strip()))
91
                    logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
92
93
        except:
            logger.debug("Exception: %s, %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
94
95
96
97
            # Main thread requested abort.
            # Close a possibly still running subprocess:
            if (p is not None) and (p.poll() is not None):
                p.kill()
98
            msg = "StopTestThread for observer ID %d aborted." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
99
100
101
102
103
104
105
106
            errors.append((msg, errno.ECOMM, self._obsdict_key[self._obskey][1]))
            logger.error(msg)
        finally:
            if (len(errors) > 0):
                self._errors_queue.put((self._obskey, errors))
            
    def abort(self):
        self._abortEvent.set()
107
108
109
110
111
112
113
114
115
116
### END StopTestThread



##############################################################################
#
# StartTestThread
#
##############################################################################
class StartTestThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
117
118
119
    """    Thread which uploads all config files to an observer and
        starts the test on the observer. 
    """ 
120
    def __init__(self, obskey, obsdict_key, xmldict_key, imagedict_key, errors_queue, testid, serialproxy):
Reto Da Forno's avatar
Reto Da Forno committed
121
122
        threading.Thread.__init__(self) 
        self._obskey        = obskey
Reto Da Forno's avatar
Reto Da Forno committed
123
124
125
126
        self._obsdict_key   = obsdict_key
        self._xmldict_key   = xmldict_key
        self._imagedict_key = imagedict_key
        self._errors_queue  = errors_queue
Reto Da Forno's avatar
Reto Da Forno committed
127
128
        self._abortEvent    = threading.Event()
        self._testid        = testid
129
        self._serialproxy   = serialproxy
Reto Da Forno's avatar
Reto Da Forno committed
130
131
132
        
    def run(self):
        errors = []
133
134
        testconfigfolder = "%s/%d" % (flocklab.config.get("observer", "testconfigfolder"), self._testid)
        obsdataport      = flocklab.config.getint('serialproxy', 'obsdataport')
135
        starttime        = time.time()
Reto Da Forno's avatar
Reto Da Forno committed
136
        try:
Reto Da Forno's avatar
Reto Da Forno committed
137
            logger.debug("Start StartTestThread for observer ID %d" % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
138
            # First test if the observer is online and if the SD card is mounted: 
139
            cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), "ls ~/data/ && mkdir %s" % testconfigfolder]
Reto Da Forno's avatar
Reto Da Forno committed
140
141
142
143
144
145
146
147
148
149
150
151
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            while p.returncode == None:
                self._abortEvent.wait(1.0)
                p.poll()
            if self._abortEvent.is_set():
                p.kill()
            else:
                out, err = p.communicate()
            rs = p.returncode
            if (rs != 0):
                if (rs == 1):
                    if ("No such file or directory" in err):
152
                        msg = "SD card on observer ID %s is not mounted, observer will thus be omitted for this test." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
153
                    else:
154
                        msg = "Observer ID %s is not reachable, it will thus be omitted for this test (returned: %d: %s, %s)." % (self._obsdict_key[self._obskey][1], rs, out, err)
Reto Da Forno's avatar
Reto Da Forno committed
155
                else:
156
                    msg = "Observer ID %s is not responsive, it will thus be omitted for this test (SSH returned %d). Command: %s" % (self._obsdict_key[self._obskey][1], rs, " ".join(cmd))
Reto Da Forno's avatar
Reto Da Forno committed
157
158
159
160
161
162
163
164
165
166
                errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                logger.error(msg)
            else:
                fileuploadlist = [self._xmldict_key[self._obskey][0]]
                if self._obskey in list(self._imagedict_key.keys()):
                    for image in self._imagedict_key[self._obskey]:
                        fileuploadlist.append(image[0])
                # Now upload the image and XML config file:
                cmd = ['scp', '-q']
                cmd.extend(fileuploadlist)
Reto Da Forno's avatar
Reto Da Forno committed
167
                cmd.append('%s:%s/.' % (self._obsdict_key[self._obskey][2], testconfigfolder))
Reto Da Forno's avatar
Reto Da Forno committed
168
169
170
171
172
173
174
                p = subprocess.Popen(cmd)
                while p.returncode == None:
                    self._abortEvent.wait(1.0)
                    p.poll()
                if self._abortEvent.is_set():
                    p.kill()
                rs = p.returncode
175
                if (rs != flocklab.SUCCESS):
176
                    msg = "Upload of target image and config XML to observer ID %s failed with error number %d\nTried to execute: %s" % (self._obsdict_key[self._obskey][1], rs, (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
177
178
179
                    errors.append((msg, rs, self._obsdict_key[self._obskey][1]))
                    logger.error(msg)
                else:
180
                    logger.debug("Upload of target image and config XML to observer ID %s succeeded." % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
181
                    # Run the script on the observer which starts the test:
182
183
184
                    remote_cmd = flocklab.config.get("observer", "starttestscript") + " --testid=%d --xml=%s/%s" % (self._testid, testconfigfolder, os.path.basename(self._xmldict_key[self._obskey][0]))
                    if self._serialproxy:
                        remote_cmd += " --serialport=%d" % (obsdataport)
Reto Da Forno's avatar
Reto Da Forno committed
185
186
                    if debug:
                        remote_cmd += " --debug"
Reto Da Forno's avatar
Reto Da Forno committed
187
                    cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), remote_cmd]
Reto Da Forno's avatar
Reto Da Forno committed
188
189
190
191
192
193
                    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
                    while p.returncode == None:
                        self._abortEvent.wait(1.0)
                        p.poll()
                    if self._abortEvent.is_set():
                        p.kill()
Reto Da Forno's avatar
Reto Da Forno committed
194
                        logger.debug("Abort is set, start test process for observer %s killed." % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
195
196
197
                    else:
                        out, err = p.communicate()
                    rs = p.wait()
198
                    if rs != flocklab.SUCCESS:
Reto Da Forno's avatar
Reto Da Forno committed
199
200
                        errors.append(("Test start script on observer ID %s failed with error code %d." % (self._obsdict_key[self._obskey][1], rs), rs, self._obsdict_key[self._obskey][1]))
                        logger.error("Test start script on observer ID %s failed with error code %d and message:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out)))
Reto Da Forno's avatar
Reto Da Forno committed
201
                    else:
202
                        logger.debug("Test start script on observer ID %s succeeded (took %us)." % (self._obsdict_key[self._obskey][1], int(time.time() - starttime)))
203
204
205
206
207
208
209
                    # Remove image file and xml on server:
                    os.remove(self._xmldict_key[self._obskey][0])
                    logger.debug("Removed XML config %s for observer ID %s" % (self._xmldict_key[self._obskey][0], self._obsdict_key[self._obskey][1]))
                    if self._obskey in list(self._imagedict_key.keys()):
                        for image in self._imagedict_key[self._obskey]:
                            os.remove(image[0])
                            logger.debug("Removed target image %s for observer ID %s" % (self._imagedict_key[self._obskey][0], self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
210
            
Reto Da Forno's avatar
Reto Da Forno committed
211
212
        except:
            logger.debug("Exception: %s, %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
213
214
215
216
            # Main thread requested abort.
            # Close a possibly still running subprocess:
            if (p is not None) and (p.poll() is not None):
                p.kill()
217
            msg = "StartTestThread for observer ID %d aborted." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
218
219
220
221
222
223
224
225
226
            errors.append((msg, errno.ECOMM, self._obsdict_key[self._obskey][1]))
            logger.error(msg)
        finally:
            if (len(errors) > 0):
                self._errors_queue.put((self._obskey, errors))
        
    def abort(self):
        self._abortEvent.set()
    
227
228
229
230
231
232
233
234
235
236
### END StartTestThread



##############################################################################
#
# start_test
#
##############################################################################
def start_test(testid, cur, cn, obsdict_key, obsdict_id):
Reto Da Forno's avatar
Reto Da Forno committed
237
238
239
240
241
242
    errors = []
    warnings = []
    
    try:    
        logger.debug("Entering start_test() function...")
        # First, validate the XML file again. If validation fails, return immediately:
Reto Da Forno's avatar
Reto Da Forno committed
243
        cmd = [flocklab.config.get('dispatcher','validationscript'), '--testid=%d' % testid]
Reto Da Forno's avatar
Reto Da Forno committed
244
245
246
247
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
        out, err = p.communicate()
        rs = p.returncode
        if rs != 0:
248
249
            logger.error("Error %s returned from %s" % (str(rs), flocklab.config.get('dispatcher','validationscript')))
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
250
251
252
            msg = "Validation of XML failed. Output of script was: %s %s" % (str(out), str(err))
            logger.error(msg)
            errors.append(msg)
Reto Da Forno's avatar
Reto Da Forno committed
253
254
255
256
257
258
259
260
261
        
        if len(errors) == 0:
            # Update DB status ---
            # Update the status of the test in the db:
            flocklab.set_test_status(cur, cn, testid, 'preparing')
            
            # Get start/stop time ---
            cur.execute("SELECT `time_start_wish`, `time_end_wish`, `owner_fk` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
            # Times are going to be of datetime type:
262
            ret = cur.fetchone()
Reto Da Forno's avatar
Reto Da Forno committed
263
264
265
            starttime = ret[0]
            stoptime  = ret[1]
            owner_fk = ret[2]
266
267
            logger.debug("Got start time wish for test from database: %s" % starttime)
            logger.debug("Got end time wish for test from database: %s" % stoptime)
Reto Da Forno's avatar
Reto Da Forno committed
268
            
Reto Da Forno's avatar
wip    
Reto Da Forno committed
269
270
271
272
273
274
275
276
            # Check whether datatrace debug feature is used
            datatrace_used = False
            cur.execute("SELECT serv_tests_key FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s) AND (`testconfig_xml` LIKE '%%<dataTraceConf>%%')" % testid)
            ret = cur.fetchone()
            if ret:
                datatrace_used = True
                logger.debug("Use of data trace detected.")
            
Reto Da Forno's avatar
Reto Da Forno committed
277
278
279
            # Image processing ---
            # Get all images from the database:
            imagedict_key = {}
Reto Da Forno's avatar
wip    
Reto Da Forno committed
280
            symtable = {}
281
            sql_image =     """ SELECT `t`.`binary`, `m`.`observer_fk`, `m`.`node_id`, LOWER(`a`.`architecture`), `t`.`serv_targetimages_key`, LOWER(`p`.`name`) AS `platname`, `a`.`core` AS `core`
Reto Da Forno's avatar
Reto Da Forno committed
282
283
284
285
286
287
288
289
                                FROM `tbl_serv_targetimages` AS `t` 
                                LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `m` 
                                    ON `t`.`serv_targetimages_key` = `m`.`targetimage_fk` 
                                LEFT JOIN `tbl_serv_platforms` AS `p`
                                    ON `t`.`platforms_fk` = `p`.`serv_platforms_key`
                                LEFT JOIN `tbl_serv_architectures` AS `a`
                                    ON `t`.`core` = `a`.`core` AND `p`.`serv_platforms_key` = `a`.`platforms_fk`
                                WHERE `m`.`test_fk` = %d
290
                            """
Reto Da Forno's avatar
Reto Da Forno committed
291
292
293
294
295
            cur.execute(sql_image%testid)
            ret = cur.fetchall()
            for r in ret:
                binary      = r[0]
                obs_fk      = r[1]
296
                obs_id      = obsdict_key[obs_fk][1]
Reto Da Forno's avatar
Reto Da Forno committed
297
298
                node_id     = r[2]
                arch        = r[3]
299
300
301
                tgimage_key = r[4]
                platname    = r[5]
                core        = r[6]
Reto Da Forno's avatar
Reto Da Forno committed
302
303
304
                
                # Prepare image ---
                (fd, imagepath) = tempfile.mkstemp()
Reto Da Forno's avatar
Reto Da Forno committed
305
                binpath = "%s.hex" % (os.path.splitext(imagepath)[0])
Reto Da Forno's avatar
Reto Da Forno committed
306
                
307
308
309
310
311
312
313
314
315
316
317
318
319
                # First, check if image is already in hex format ---
                if flocklab.is_hex_file(data=binary):
                    f = open(binpath, "wb")
                    f.write(binary)
                    f.close()
                else:
                    imagefile = os.fdopen(fd, 'w+b')
                    imagefile.write(binary)
                    imagefile.close()
                    logger.debug("Got target image ID %s for observer ID %s with node ID %s from database and wrote it to temp file %s (hash %s)" % (str(tgimage_key), str(obs_id), str(node_id), imagepath, hashlib.sha1(binary).hexdigest()))
                    
                    logger.debug("Found %s target architecture on platform %s for observer ID %s (node ID to be used: %s)." % (arch, platname, str(obs_id), str(node_id)))
                    
Reto Da Forno's avatar
wip    
Reto Da Forno committed
320
321
322
323
                    # get symbols table if necessary
                    if datatrace_used and not obs_id in symtable:    # allow only one entry per observer
                        symtable[obs_id] = flocklab.extract_variables_from_symtable(flocklab.get_symtable_from_binary(imagepath))
                    
324
325
326
327
328
329
330
331
332
333
334
335
336
337
                    # binary patching
                    if (node_id != None):
                        # set node ID
                        if flocklab.patch_binary("FLOCKLAB_NODE_ID", node_id, imagepath, arch) != flocklab.SUCCESS:
                            msg = "Failed to patch symbol FLOCKLAB_NODE_ID in binary file %s." % (imagepath)
                            errors.append(msg)
                            logger.error(msg)
                        if flocklab.patch_binary("TOS_NODE_ID", node_id, imagepath, arch) != flocklab.SUCCESS:
                            msg = "Failed to patch symbol TOS_NODE_ID in binary file %s." % (imagepath)
                            errors.append(msg)
                            logger.error(msg)
                    # convert elf to intel hex
                    if flocklab.bin_to_hex(imagepath, arch, binpath) != flocklab.SUCCESS:
                        msg = "Failed to convert image file %s to Intel hex format." % (imagepath)
338
339
                        errors.append(msg)
                        logger.error(msg)
340
341
                        shutil.move(imagepath, binpath)
                        logger.debug("Copied binary file without modification.")
Reto Da Forno's avatar
Reto Da Forno committed
342
343
                
                # Remove the original file which is not used anymore:
344
                if os.path.exists(imagepath):
Reto Da Forno's avatar
Reto Da Forno committed
345
346
347
348
349
350
351
352
                    os.remove(imagepath)
                
                # Slot detection ---
                # Find out which slot number to use on the observer.
                #logger.debug("Detecting adapter for %s on observer ID %s" %(platname, obs_id))
                ret = flocklab.get_slot(cur, int(obs_fk), platname)
                if ret in range(1,5):
                    slot = ret
353
                    logger.debug("Found adapter for %s on observer ID %s in slot %d" % (platname, obs_id, slot))
Reto Da Forno's avatar
Reto Da Forno committed
354
355
                elif ret == 0:
                    slot = None
356
                    msg = "Could not find an adapter for %s on observer ID %s" % (platname, obs_id)
Reto Da Forno's avatar
Reto Da Forno committed
357
358
359
360
                    errors.append(msg)
                    logger.error(msg)
                else:
                    slot = None
361
                    msg = "Error when detecting adapter for %s on observer ID %s: function returned %d" % (platname, obs_id, ret)
Reto Da Forno's avatar
Reto Da Forno committed
362
363
                    errors.append(msg)
                    logger.error(msg)
364
                
Reto Da Forno's avatar
Reto Da Forno committed
365
366
367
                # Write the dictionary for the image:
                if not obs_fk in imagedict_key:
                    imagedict_key[obs_fk] = []
Reto Da Forno's avatar
Reto Da Forno committed
368
                imagedict_key[obs_fk].append((binpath, slot, platname, core))
Reto Da Forno's avatar
Reto Da Forno committed
369
370
371
372
373
                
            logger.info("Processed all target images from database.")
                
            # XML processing ---
            # Get the XML config from the database and generate a separate file for every observer used:
374
            cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" % testid)
Reto Da Forno's avatar
Reto Da Forno committed
375
376
377
378
379
380
            ret = cur.fetchone()
            if not ret:
                msg = "No XML found in database for testid %d." %testid
                errors.append(msg)
                logger.error(msg)
            else:
381
382
                parser = lxml.etree.XMLParser(remove_comments=True)
                tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
383
                ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
384
                logger.debug("Got XML from database.")
Reto Da Forno's avatar
wip    
Reto Da Forno committed
385
                # Create XML files for observers ---
Reto Da Forno's avatar
Reto Da Forno committed
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
                # Create an empty XML config file for every observer used and organize them in a dictionary:
                xmldict_key = {}
                for obs_key, obs_id, obs_ether in obsdict_key.values():
                    (fd, xmlpath) = tempfile.mkstemp()
                    xmlfhand = os.fdopen(fd, 'w+')
                    xmldict_key[obs_key] = (xmlpath, xmlfhand)
                    xmlfhand.write('<?xml version="1.0" encoding="UTF-8"?>\n\n<obsConf>\n\n')
                # Go through the blocks of the XML file and write the configs to the affected observer XML configs:
                # targetConf ---
                targetconfs = tree.xpath('//d:targetConf', namespaces=ns)
                if not targetconfs:
                    msg = "no <targetConf> element found in XML config (wrong namespace?)"
                    errors.append(msg)
                    logger.error(msg)
                for targetconf in targetconfs:
                    obsids = targetconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                    ret = targetconf.xpath('d:voltage', namespaces=ns)
                    if ret:
                        voltage = ret[0].text.strip()
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
406
                        voltage = str(flocklab.config.get("dispatcher", "default_tg_voltage"))
Reto Da Forno's avatar
Reto Da Forno committed
407
408
409
410
411
412
413
414
415
416
417
418
                    ret = targetconf.xpath('d:noImage', namespaces=ns)
                    if ret:
                        noImageSlot = ret[0].text.strip()
                    else:
                        noImageSlot = None
                    for obsid in obsids:
                        obsid = int(obsid)
                        obskey = obsdict_id[obsid][0]
                        xmldict_key[obskey][1].write("<obsTargetConf>\n")
                        xmldict_key[obskey][1].write("\t<voltage>%s</voltage>\n"%voltage)
                        if noImageSlot:
                            slot = noImageSlot
419
                            xmldict_key[obskey][1].write("\t<slotnr>%s</slotnr>\n" % (slot))
Reto Da Forno's avatar
Reto Da Forno committed
420
421
                        else:
                            for coreimage in imagedict_key[obskey]:
Reto Da Forno's avatar
Reto Da Forno committed
422
                                xmldict_key[obskey][1].write("\t<image core=\"%d\">%s/%d/%s</image>\n" % (coreimage[3], flocklab.config.get("observer", "testconfigfolder"),testid, os.path.basename(coreimage[0])))
423
424
                            xmldict_key[obskey][1].write("\t<slotnr>%s</slotnr>\n" % (imagedict_key[obskey][0][1]))
                            xmldict_key[obskey][1].write("\t<platform>%s</platform>\n" % (imagedict_key[obskey][0][2]))
425
                            xmldict_key[obskey][1].write("\t<os>%s</os>\n" % (imagedict_key[obskey][0][2]))
Reto Da Forno's avatar
Reto Da Forno committed
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
                            slot = imagedict_key[obskey][0][1]
                        xmldict_key[obskey][1].write("</obsTargetConf>\n\n")
                        #logger.debug("Wrote obsTargetConf XML for observer ID %s" %obsid)
                        # update test_image mapping with slot information
                        cur.execute("UPDATE `tbl_serv_map_test_observer_targetimages` SET `slot` = %s WHERE `observer_fk` = %d AND `test_fk`=%d" % (slot, obskey, testid))
                        cn.commit()
                
                # serialConf ---
                srconfs = tree.xpath('//d:serialConf', namespaces=ns)
                serialProxyUsed = False
                if srconfs:
                    # only use serialproxy if remote IP specified in xml
                    if tree.xpath('//d:serialConf/d:remoteIp', namespaces=ns):
                        serialProxyUsed = True
                    for srconf in srconfs:
                        obsids = srconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        xmlblock = "<obsSerialConf>\n"
                        port = srconf.xpath('d:port', namespaces=ns)
                        if port:
445
                            port = port[0].text.strip()
Reto Da Forno's avatar
Reto Da Forno committed
446
447
448
                            xmlblock += "\t<port>%s</port>\n" %port
                        baudrate = srconf.xpath('d:baudrate', namespaces=ns)
                        if baudrate:
449
                            baudrate = baudrate[0].text.strip()
Reto Da Forno's avatar
Reto Da Forno committed
450
451
452
                            xmlblock += "\t<baudrate>%s</baudrate>\n" %baudrate
                        mode = srconf.xpath('d:mode', namespaces=ns)
                        if mode:
453
                            mode = mode[0].text.strip()
Reto Da Forno's avatar
Reto Da Forno committed
454
455
456
457
458
459
460
                            xmlblock += "\t<mode>%s</mode>\n" %mode
                        xmlblock += "</obsSerialConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                else:
461
                    logger.debug("No <serialConf> found, not using serial service.")
Reto Da Forno's avatar
Reto Da Forno committed
462
                
463
                # debugConf ---
Reto Da Forno's avatar
wip    
Reto Da Forno committed
464
465
466
467
                dbgconfs = tree.xpath('//d:debugConf', namespaces=ns)
                if dbgconfs:
                    for dbgconf in dbgconfs:
                        obsids = dbgconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
468
                        xmlblock = "<obsDebugConf>\n"
Reto Da Forno's avatar
wip    
Reto Da Forno committed
469
                        remoteIp = dbgconf.xpath('d:remoteIp', namespaces=ns)
470
471
472
                        if remoteIp:
                            remoteIp = remoteIp[0].text.strip()
                            xmlblock += "\t<remoteIp>%s</remoteIp>\n" % (remoteIp)
Reto Da Forno's avatar
wip    
Reto Da Forno committed
473
                        gdbPort = dbgconf.xpath('d:gdbPort', namespaces=ns)
474
475
476
                        if gdbPort:
                            gdbPort = gdbPort[0].text.strip()
                            xmlblock += "\t<gdbPort>%s</gdbPort>\n" % (gdbPort)
Reto Da Forno's avatar
wip    
Reto Da Forno committed
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
                        dwtconfs = dbgconf.xpath('d:dataTraceConf', namespaces=ns)
                        for dwtconf in dwtconfs:
                            var  = dwtconf.xpath('d:variable', namespaces=ns)[0].text.strip()
                            # convert variable name to address
                            obskey = int(float(obsids[0]))
                            if obskey in symtable:
                                if var in symtable[obskey]:
                                    logger.debug("Variable %s replaced with address 0x%x." % (var, symtable[obskey][var][0]))
                                    var = "0x%x" % symtable[obskey][var][0]
                                else:
                                    logger.warning("Variable %s not found in symbol table." % var)
                                    continue
                            else: 
                                logger.debug("Key %u not found in symbol table." % (obskey))
                            mode = dwtconf.xpath('d:mode', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<dataTraceConf>\n\t\t<variable>%s</variable>\n\t\t<mode>%s</mode>\n\t</dataTraceConf>\n" % (var, mode)
493
494
495
496
497
498
499
500
                        xmlblock += "</obsDebugConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                else:
                    logger.debug("No <debugConf> found, not using debug service.")
                
Reto Da Forno's avatar
Reto Da Forno committed
501
502
                # gpioTracingConf ---
                gmconfs = tree.xpath('//d:gpioTracingConf', namespaces=ns)
503
504
505
                if gmconfs:
                    for gmconf in gmconfs:
                        obsids = gmconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
506
                        pinlist = gmconf.xpath('d:pins', namespaces=ns)
507
                        offset = gmconf.xpath('d:offset', namespaces=ns)
508
                        xmlblock = "<obsGpioMonitorConf>\n"
509
510
                        if pinlist:
                            xmlblock += "\t<pins>" + pinlist[0].text.strip() + "</pins>\n"
511
512
                        if offset:
                            xmlblock += "\t<offset>" + offset[0].text.strip() + "</offset>\n"
513
514
515
516
517
518
519
                        xmlblock += "</obsGpioMonitorConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                else:
                    logger.debug("No <gpioTracingConf> found, not using GPIO tracing service.")
520
                
Reto Da Forno's avatar
Reto Da Forno committed
521
                # gpioActuationConf ---
522
                # Create 2 pin settings for every observer used in the test:
523
524
                #   1) Pull reset pin of target high when test is to start
                #   2) Pull reset pin of target low when test is to stop
Reto Da Forno's avatar
Reto Da Forno committed
525
                xmlblock = "<obsGpioSettingConf>\n"
526
527
528
529
                startdatetime = starttime.replace(tzinfo=datetime.timezone.utc).timestamp()      #.strftime(flocklab.config.get("observer", "timeformat"))
                xmlblock += "\t<pinConf>\n\t\t<pin>RST</pin>\n\t\t<level>high</level>\n\t\t<timestamp>%s</timestamp>\n\t</pinConf>\n" % (startdatetime)
                stopdatetime = stoptime.replace(tzinfo=datetime.timezone.utc).timestamp()        #.strftime(flocklab.config.get("observer", "timeformat"))
                xmlblock += "\t<pinConf>\n\t\t<pin>RST</pin>\n\t\t<level>low</level>\n\t\t<timestamp>%s</timestamp>\n\t</pinConf>\n" % (stopdatetime)
Reto Da Forno's avatar
Reto Da Forno committed
530
531
532
533
534
535
536
537
538
539
540
                for obskey in obsdict_key.keys():
                    xmldict_key[obskey][1].write(xmlblock)
                # Now write the per-observer config:
                gsconfs = tree.xpath('//d:gpioActuationConf', namespaces=ns)
                for gsconf in gsconfs:
                    xmlblock = ""
                    obsids = gsconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                    pinconfs = gsconf.xpath('d:pinConf', namespaces=ns)
                    for pinconf in pinconfs:
                        pin  = pinconf.xpath('d:pin', namespaces=ns)[0].text.strip()
                        level = pinconf.xpath('d:level', namespaces=ns)[0].text.strip()
541
542
                        ofs = pinconf.xpath('d:offset', namespaces=ns)[0].text.strip()
                        xmlblock += "\t<pinConf>\n\t\t<pin>%s</pin>\n\t\t<level>%s</level>\n\t\t<offset>%s</offset>\n\t</pinConf>\n" % (pin, level, ofs)
Reto Da Forno's avatar
Reto Da Forno committed
543
544
545
546
547
548
549
                    for obsid in obsids:
                        obsid = int(obsid)
                        obskey = obsdict_id[obsid][0]
                        xmldict_key[obskey][1].write(xmlblock)
                xmlblock = "</obsGpioSettingConf>\n\n"
                for obskey in obsdict_key.keys():
                    xmldict_key[obskey][1].write(xmlblock)
550
                
Reto Da Forno's avatar
Reto Da Forno committed
551
552
                # powerProfilingConf ---
                ppconfs = tree.xpath('//d:powerProfilingConf', namespaces=ns)
553
554
555
556
                if ppconfs:
                    for ppconf in ppconfs:
                        obsids = ppconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        xmlblock = "<obsPowerprofConf>\n"
557
558
559
560
561
562
563
564
565
566
567
568
569
                        duration = ppconf.xpath('d:duration', namespaces=ns)
                        if duration:
                            duration = duration[0].text.strip()
                        else:
                            # if duration not given, run power profiling for the duration of the test
                            duration = (stoptime - starttime).total_seconds()
                            logger.debug("Power profiling duration set to %ds." % (duration))
                        xmlblock += "\t<duration>%s</duration>" % duration
                        # calculate the sampling start
                        offset  = ppconf.xpath('d:offset', namespaces=ns)
                        if offset:
                            offset = int(offset[0].text.strip())
                            tstart = datetime.datetime.timestamp(starttime + datetime.timedelta(seconds=offset))
570
571
                        else:
                            tstart = datetime.datetime.timestamp(starttime)
572
573
574
575
576
577
                        xmlblock += "\n\t<starttime>%s</starttime>" % (tstart)
                        # check if config contains samplingRate:
                        samplingrate    = ppconf.xpath('d:samplingRate', namespaces=ns)
                        if samplingrate:
                            samplingrate = samplingrate[0].text.strip()
                            xmlblock += "\n\t<samplingRate>%s</samplingRate>" % samplingrate
Reto Da Forno's avatar
Reto Da Forno committed
578
                        xmlblock += "\n</obsPowerprofConf>\n\n"
579
580
581
582
583
584
585
586
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                            #logger.debug("Wrote obsPowerprofConf XML for observer ID %s" %obsid)
                else:
                    logger.debug("No <powerProfilingConf> found, not using power profiling service.")
                 
Reto Da Forno's avatar
Reto Da Forno committed
587
                logger.debug("Wrote all observer XML configs.")
588
                
Reto Da Forno's avatar
Reto Da Forno committed
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
                # Close XML files ---
                for xmlpath, xmlfhand in xmldict_key.values():
                    xmlfhand.write("</obsConf>\n")
                    xmlfhand.close()
                    #logger.debug("Closed observer XML config %s"%xmlpath)
                #logger.debug("Closed all observer XML configs.")
                
        # Upload configs to observers and start test ---
        if len(errors) == 0:
            if not db_register_activity(testid, cur, cn, 'start', iter(obsdict_key.keys())):
                msg = "Could not access all needed observers for testid %d." %testid
                errors.append(msg)
                logger.error(msg)
        if len(errors) == 0:
            # -- START OF CRITICAL SECTION where dispatcher accesses used observers
            # Start a thread for each observer which uploads the config and calls the test start script on the observer
            thread_list = []
            errors_queue = queue.Queue()
            for obskey in obsdict_key.keys():
608
                thread = StartTestThread(obskey, obsdict_key, xmldict_key, imagedict_key, errors_queue, testid, serialProxyUsed)
Reto Da Forno's avatar
Reto Da Forno committed
609
610
611
612
613
614
                thread_list.append((thread, obskey))
                thread.start()
                #DEBUG logger.debug("Started thread for test start on observer ID %s" %(str(obsdict_key[obskey][1])))
            # Wait for all threads to finish:
            for (thread, obskey) in thread_list:
                # Wait max 75% of the setuptime:
615
                thread.join(timeout=(flocklab.config.getint('tests','setuptime')*0.75))
Reto Da Forno's avatar
Reto Da Forno committed
616
617
                if thread.isAlive():
                    # Timeout occurred. Signal the thread to abort:
Reto Da Forno's avatar
Reto Da Forno committed
618
                    logger.error("Telling thread for test start on observer ID %s to abort..." % (str(obsdict_key[obskey][1])))
Reto Da Forno's avatar
Reto Da Forno committed
619
620
                    thread.abort()
            # Wait again for the aborted threads:
621
            for (thread, obskey) in thread_list:
Reto Da Forno's avatar
Reto Da Forno committed
622
623
                thread.join(timeout=10)
                if thread.isAlive():
Reto Da Forno's avatar
Reto Da Forno committed
624
                    msg = "Thread for test start on observer ID %s timed out and will be aborted now." % (str(obsdict_key[obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
625
626
627
628
                    errors.append(msg)
                    logger.error(msg)
            # -- END OF CRITICAL SECTION where dispatcher accesses used observers
            db_unregister_activity(testid, cur, cn, 'start')
629
            
Reto Da Forno's avatar
Reto Da Forno committed
630
631
632
            # Get all errors (if any). Observers which return errors are not regarded as a general error. In this
            # case, the test is just started without the faulty observers if there is at least 1 observer that succeeded:
            obs_error = []
633
634
            #if not errors_queue.empty():
                #logger.error("Queue with errors from test start thread is not empty. Getting errors...")
Reto Da Forno's avatar
Reto Da Forno committed
635
636
637
            while not errors_queue.empty():
                errs = errors_queue.get()
                for err in errs[1]:
638
                    #logger.error("Error from test start thread for observer %s: %s" %(str(err[2]), str(err[0])))
Reto Da Forno's avatar
Reto Da Forno committed
639
640
641
                    obs_error.append(err[2])
                    warnings.append(err[0])
            if len(obs_error) > 0:
Reto Da Forno's avatar
Reto Da Forno committed
642
643
644
645
646
647
648
                # Abort or continue?
                if not flocklab.config.get("dispatcher", "continue_on_error"):
                    msg = "At least one observer failed to start the test, going to abort..."
                    errors.append(msg)
                    logger.error(msg)
                # Check if there is at least 1 observer which succeeded:
                elif (len(obsdict_id) == len(set(obs_error))):
Reto Da Forno's avatar
Reto Da Forno committed
649
650
651
652
653
654
655
656
                    msg = "None of the requested observers could successfully start the test."
                    errors.append(msg)
                    logger.error(msg)
        
        # Start proxy for serial service ---
        if len(errors) == 0:
            if serialProxyUsed:
                # Start serial proxy:
Reto Da Forno's avatar
Reto Da Forno committed
657
                logger.debug("Starting serial proxy...")
Reto Da Forno's avatar
Reto Da Forno committed
658
                cmd = [flocklab.config.get("dispatcher", "serialproxyscript"), "--notify"]
Reto Da Forno's avatar
Reto Da Forno committed
659
660
661
662
663
                if debug: 
                    cmd.append("--debug")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                rs = p.wait()
                if (rs != 0):
664
                    msg = "Serial proxy for test ID %d could not be started (error code %d)." % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
665
666
                    errors.append(msg)
                    logger.error(msg)
667
                    logger.debug("Executed command was: %s" % (str(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
668
669
670
                else:
                    logger.debug("Started serial proxy.")
    
Reto Da Forno's avatar
Reto Da Forno committed
671
        # Start fetcher ---
Reto Da Forno's avatar
Reto Da Forno committed
672
        if len(errors) == 0:
Reto Da Forno's avatar
Reto Da Forno committed
673
674
            logger.debug("Starting fetcher...")
            cmd = [flocklab.config.get("dispatcher", "fetcherscript"), "--testid=%d" % testid]
Reto Da Forno's avatar
Reto Da Forno committed
675
676
677
678
679
            if debug:
                cmd.append("--debug")
            p = subprocess.Popen(cmd)
            rs = p.wait()
            if rs != 0:
Reto Da Forno's avatar
Reto Da Forno committed
680
                msg = "Could not start fetcher for test ID %d. Fetcher returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
681
682
                errors.append(msg)
                logger.error(msg)
683
                logger.error("Tried to execute: %s" % (" ".join(cmd)))
684

Reto Da Forno's avatar
Reto Da Forno committed
685
686
687
688
689
690
691
692
        # check if we're still in time ---
        if len(errors) == 0:
            now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
            cur.execute("SELECT `serv_tests_key` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d AND `time_start_wish` <= '%s'" % (testid, now))
            if cur.fetchone() is not None:
                msg = "Setup for test ID %d took too much time." % (testid)
                errors.append(msg)
                logger.error(msg)
693

Reto Da Forno's avatar
Reto Da Forno committed
694
695
696
697
698
699
700
701
702
703
704
705
        # Update DB status, set start time ---
        if len(errors) == 0:
            logger.debug("Setting test status in DB to running...")
            flocklab.set_test_status(cur, cn, testid, 'running')
            cur.execute("UPDATE `tbl_serv_tests` SET `time_start_act` = `time_start_wish` WHERE `serv_tests_key` = %d" %testid)
            cn.commit()
        else:
            logger.debug("Setting test status in DB to aborting...")
            flocklab.set_test_status(cur, cn, testid, 'aborting')
            cur.execute("UPDATE `tbl_serv_tests` SET `time_start_act` = `time_start_wish`, `time_end_act` = UTC_TIMESTAMP() WHERE `serv_tests_key` = %d" %testid)
            cn.commit()
        logger.debug("At end of start_test(). Returning...")
706

Reto Da Forno's avatar
Reto Da Forno committed
707
708
        return (errors, warnings)
    except Exception:
709
        msg = "Unexpected error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
710
        logger.error(msg)
Reto Da Forno's avatar
Reto Da Forno committed
711
        raise
712
713
714
715
716
717
718
719
720
721
### END start_test()



##############################################################################
#
# stop_test
#
##############################################################################
def stop_test(testid, cur, cn, obsdict_key, obsdict_id, abort=False):
Reto Da Forno's avatar
Reto Da Forno committed
722
723
724
725
726
727
728
729
730
731
732
733
    errors = []
    warnings = []
    
    try:
        logger.info("Stopping test %d..."%testid)
        
        # Update DB status --- 
        if abort:
            status = 'aborting'
        else:
            status = 'cleaning up'
        logger.debug("Setting test status in DB to %s..." %status)
734
735
736
737
        if flocklab.set_test_status(cur, cn, testid, status) != flocklab.SUCCESS:
            msg = "Failed to set test status in DB."
            errors.append(msg)
            logger.error(msg)
Reto Da Forno's avatar
Reto Da Forno committed
738
739
740
741
742
743
744
745
746
747
        
        # Stop serial proxy ---
        # Get the XML config from the database and check if the serial service was used in the test:
        cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
        ret = cur.fetchone()
        if not ret:
            msg = "No XML found in database for testid %d." %testid
            errors.append(msg)
            logger.error(msg)
        else:
748
749
            parser = lxml.etree.XMLParser(remove_comments=True)
            tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
750
            ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
751
752
753
754
755
            logger.debug("Got XML from database.")
            # only stop serialproxy if remote IP specified in xml
            if tree.xpath('//d:serialConf/d:remoteIp', namespaces=ns):
                # Serial service was used. Thus stop the serial proxy:
                logger.debug("Usage of serial service detected. Stopping serial proxy...")
Reto Da Forno's avatar
Reto Da Forno committed
756
                cmd = [flocklab.config.get("dispatcher", "serialproxyscript"), "--notify"]
Reto Da Forno's avatar
Reto Da Forno committed
757
758
759
760
761
                if debug: 
                    cmd.append("--debug")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                rs = p.wait()
                if (rs != 0):
762
                    msg = "Serial proxy for test ID %d could not be stopped. Serial proxy returned %d." % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
763
764
                    errors.append(msg)
                    logger.error(msg)
765
                    logger.debug("Executed command was: %s" % (str(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
766
767
768
769
770
771
                else:
                    logger.debug("Stopped serial proxy.")
        
        # Stop test on observers ---
        if not db_register_activity(testid, cur, cn, 'stop', iter(obsdict_key.keys())):
            msg = "Some observers were occupied while stopping test."
772
            logger.warning(msg)
Reto Da Forno's avatar
Reto Da Forno committed
773
774
775
776
777
778
779
780
781
782
783
784
            warnings.append(msg)
        # Start a thread for each observer which calls the test stop script on the observer
        logger.info("Stopping test on observers...")
        thread_list = []
        errors_queue = queue.Queue()
        for obskey in obsdict_key.keys():
            thread = StopTestThread(obskey, obsdict_key, errors_queue,testid)
            thread_list.append((thread, obskey))
            thread.start()
            logger.debug("Started thread for test stop on observer ID %s" %(str(obsdict_key[obskey][1])))
        # Wait for all threads to finish:
        for (thread, obskey) in thread_list:
785
            thread.join(timeout=(flocklab.config.getint('tests','cleanuptime') * 0.75))
Reto Da Forno's avatar
Reto Da Forno committed
786
787
788
789
790
791
792
            if thread.isAlive():
                # Timeout occurred. Signal the thread to abort:
                msg = "Telling thread for test stop on observer ID %s to abort..." %(str(obsdict_key[obskey][1]))
                logger.error(msg)
                warnings.append(msg)
                thread.abort()
        # Wait again for the aborted threads:
Reto Da Forno's avatar
Reto Da Forno committed
793
        for (thread, obskey) in thread_list:
Reto Da Forno's avatar
Reto Da Forno committed
794
795
796
797
798
799
800
            thread.join(timeout=10)
            if thread.isAlive():
                msg = "Thread for test stop on observer ID %s is still alive but should be aborted now." %(str(obsdict_key[obskey][1]))
                errors.append(msg)
                logger.error(msg)
        db_unregister_activity(testid, cur, cn, 'stop')
        # cleanup resource allocation
Reto Da Forno's avatar
Reto Da Forno committed
801
        now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
Reto Da Forno's avatar
Reto Da Forno committed
802
803
804
805
806
        cur.execute("DELETE FROM tbl_serv_resource_allocation where `time_end` < '%s' OR `test_fk` = %d" % (now, testid))
        cn.commit()
        # Stop fetcher ---
        # This has to be done regardless of previous errors.
        logger.info("Stopping fetcher...")
807
        cmd = [flocklab.config.get("dispatcher", "fetcherscript"),"--testid=%d" % testid, "--stop"]
Reto Da Forno's avatar
Reto Da Forno committed
808
809
810
811
        if debug: 
            cmd.append("--debug")
        p = subprocess.Popen(cmd)
        rs = p.wait()
812
        if rs not in (flocklab.SUCCESS, errno.ENOPKG): # flocklab.SUCCESS (0) is successful stop, ENOPKG (65) means the service was not running. 
Reto Da Forno's avatar
Reto Da Forno committed
813
            msg = "Could not stop fetcher for test ID %d. Fetcher returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
814
815
            errors.append(msg)
            logger.error(msg)
816
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
817
818
    
        # Get all errors (if any). Observers which return errors are not regarded as a general error.
819
820
        #if not errors_queue.empty():
            #logger.error("Queue with errors from test stop thread is not empty. Getting errors...")
Reto Da Forno's avatar
Reto Da Forno committed
821
822
823
        while not errors_queue.empty():
            errs = errors_queue.get()
            for err in errs[1]:
824
                #logger.error("Error from test stop thread: %s" %(str(err[0])))
Reto Da Forno's avatar
Reto Da Forno committed
825
826
827
828
829
830
831
832
                warnings.append(err[0])
        
        # Set stop time in DB ---
        cur.execute("UPDATE `tbl_serv_tests` SET `time_end_act` = UTC_TIMESTAMP() WHERE `serv_tests_key` = %d" %testid)
        cn.commit()
        
        return (errors, warnings)
    except Exception:
833
        msg = "Unexpected error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
834
        logger.error(msg)
Reto Da Forno's avatar
Reto Da Forno committed
835
        raise
836
837
838
839
840
841
842
843
844
### END stop_test()


##############################################################################
#
# prepare_testresults
#
##############################################################################
def prepare_testresults(testid, cur):
Reto Da Forno's avatar
Reto Da Forno committed
845
846
847
848
849
850
    """    This function prepares testresults for the user. It calls the archiver.
        If several instances of the archiver
        are running, it may take a long time for this function to finish as it will wait
        for these functions to succeed.
    """
    errors = []
851
    tree = None
Reto Da Forno's avatar
Reto Da Forno committed
852
853
854
855
    
    # Check if results directory exists
    testresultsdir = "%s/%d" % (flocklab.config.get('fetcher', 'testresults_dir'), testid)
    if not os.path.isdir(testresultsdir):
856
        errors.append("Test results directory does not exist.")
Reto Da Forno's avatar
Reto Da Forno committed
857
858
        return errors
    
Reto Da Forno's avatar
Reto Da Forno committed
859
860
861
862
863
864
    logger.debug("Preparing testresults...")
    
    # Check if user wants test results as email ---
    logger.debug("Check if user wants testresults as email...")
    emailResults = False
    # Get the XML config from the database:
865
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" % testid)
Reto Da Forno's avatar
Reto Da Forno committed
866
867
    ret = cur.fetchone()
    if ret:
868
869
        parser = lxml.etree.XMLParser(remove_comments=True)
        tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
870
        ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
871
872
873
874
875
876
877
878
879
880
881
882
883
        logger.debug("Got XML from database.")
        # Check if user wants results as email
        ret = tree.xpath('//d:generalConf/d:emailResults', namespaces=ns)
        if not ret:
            logger.debug("Could not get relevant XML value <emailResults>, thus not emailing results to user.")
        else:
            if (ret[0].text.lower() == 'yes'):
                emailResults = True
    if not emailResults:
        logger.debug("User does not want test results as email.")
    else:
        logger.debug("User wants test results as email. Will trigger the email.")
    
884
885
    # Add config XML to results directory
    if flocklab.config.get('archiver', 'include_xmlconfig'):
Reto Da Forno's avatar
Reto Da Forno committed
886
887
888
889
890
        if tree:
            et = lxml.etree.ElementTree(tree)
            et.write("%s/testconfig.xml" % testresultsdir, pretty_print=True)
            logger.debug("XML config copied to results folder.")
        else:
891
            logger.warning("Could not copy XML config to test results directory.")
Reto Da Forno's avatar
Reto Da Forno committed
892
    
Reto Da Forno's avatar
Reto Da Forno committed
893
894
895
896
897
898
899
900
901
902
903
904
905
    # Generate plot ---
    if flocklab.config.getint('viz', 'generate_plots'):
        if not os.path.isdir(flocklab.config.get('viz', 'dir')):
            os.mkdir(flocklab.config.get('viz', 'dir'))
        logger.debug("Generating plots...")
        try:
            fltools.visualizeFlocklabTrace(testresultsdir, outputDir=flocklab.config.get('viz', 'dir'), interactive=False)
            logger.debug("Plots generated.")
        except Exception:
            logger.error("Failed to generate results plot for test %d. %s: %s" % (testid, str(sys.exc_info()[0]), str(sys.exc_info()[1])))
        except SystemExit:
            pass
    
Reto Da Forno's avatar
Reto Da Forno committed
906
    # Archive test results ---
907
    cmd = [flocklab.config.get('dispatcher', 'archiverscript'),"--testid=%d" % testid]
Reto Da Forno's avatar
Reto Da Forno committed
908
909
910
911
912
    if emailResults:
        cmd.append("--email")
    if debug: 
        cmd.append("--debug")
    # Call the script until it succeeds:
Reto Da Forno's avatar
Reto Da Forno committed
913
    waittime = flocklab.config.getint('dispatcher', 'archiver_waittime')
Reto Da Forno's avatar
Reto Da Forno committed
914
915
916
917
    rs = errno.EUSERS
    while rs == errno.EUSERS:
        p = subprocess.Popen(cmd)
        rs = p.wait()
918
        if rs not in (flocklab.SUCCESS, errno.EUSERS): # flocklab.SUCCESS (0) is successful stop, EUSERS (87) means the maximum number of allowed instances is reached. 
919
            msg = "Could not trigger archiver. Archiver returned error %d" % (rs)
Reto Da Forno's avatar
Reto Da Forno committed
920
            logger.error(msg)
921
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
922
923
924
925
            errors.append(msg)
            return errors
        if rs == errno.EUSERS:
            # Maximum number of instances is reached. Wait some time before calling again.
926
            logger.info("Archiver returned EUSERS. Wait for %d s before trying again..." % waittime)
Reto Da Forno's avatar
Reto Da Forno committed
927
928
929
930
931
            time.sleep(waittime)
    logger.debug("Call to archiver successful.")
    
    logger.debug("Prepared testresults.")
    return errors
932
933
934
935
936
937
938
939
940
### END prepare_testresults()


##############################################################################
#
# evalute_linkmeasurement
#
##############################################################################
def evalute_linkmeasurement(testid, cur):
Reto Da Forno's avatar
Reto Da Forno committed
941
942
943
944
    errors = []
    # if link measurement, evaluate data
    cur.execute("SELECT `username` FROM `tbl_serv_tests` LEFT JOIN `tbl_serv_users` ON (`serv_users_key`=`owner_fk`) WHERE (`serv_tests_key` = %s)" %testid)
    ret = cur.fetchone()
Reto Da Forno's avatar
Reto Da Forno committed
945
    if ret and ret[0]==flocklab.config.get('linktests', 'user'):
Reto Da Forno's avatar
Reto Da Forno committed
946
        logger.debug("Evaluating link measurements.")
Reto Da Forno's avatar
Reto Da Forno committed
947
        cmd = [flocklab.config.get('dispatcher', 'testtolinkmapscript')]
948
        p = subprocess.Popen(cmd)
Reto Da Forno's avatar
Reto Da Forno committed
949
        rs = p.wait()
950
        if rs != flocklab.SUCCESS:
Reto Da Forno's avatar
Reto Da Forno committed
951
952
953
954
955
956
            msg = "Error %s returned from testtolinkmap script" % str(rs)
            logger.error(msg)
            errors.append(msg)
        else:
            logger.debug("Link measurement evaluations finished.")
    return errors
957
958
959
960
961
962
963
964
965
### END evalute_linkmeasurement()


##############################################################################
#
# inform_user
#
##############################################################################
def inform_user(testid, cur, job, errors, warnings):
Reto Da Forno's avatar
Reto Da Forno committed
966
967
968
969
970
971
972
973
974
975
976
977
    if len(errors) != 0:
        subj = "Error notification"
        if job == 'start':
            msg = "The test with ID %d could not be started as planned because of the following errors:\n\n" %testid
        elif job == 'stop':
            msg = "The test with ID %d could not be stopped as planned because of the following errors:\n\n" %testid
        elif job == 'abort':
            msg = "The test with ID %d could not be aborted as requested because of the following errors:\n\n" %testid
        for error in errors:
            msg += "\t * %s\n" %error
        for warn in warnings:
            msg += "\t * %s\n" %warn
978
        ret = flocklab.FAILED
Reto Da Forno's avatar
Reto Da Forno committed
979
980
981
982
983
984
985
986
987
988
989
990
    elif len(warnings) != 0:
        if job == 'start':
            subj = "Test %d starting with warnings" %testid
            msg  = "Your test has been prepared and is going to start as planned, but consider the following warnings:\n\n" 
        elif job == 'stop':
            subj = "Test %d stopped with warnings" %testid
            msg = "Your test has been stopped as planned and the results will be available on the website soon.\nTest results are also accessible using webdav: webdavs://www.flocklab.ethz.ch/user/webdav/\nConsider the following warnings:\n\n"
        elif job == 'abort':
            subj = "Test %d aborted with warnings" %testid
            msg = "Your test has been aborted as requested and the results (if any) will be available on the website soon\nTest results are also accessible using webdav: webdavs://www.flocklab.ethz.ch/user/webdav/\nConsider the following warnings:\n\n"
        for warn in warnings:
            msg += "\t * %s\n" %warn
991
        ret = flocklab.SUCCESS
Reto Da Forno's avatar
Reto Da Forno committed
992
993
994
995
996
997
998
999
1000
    else:
        if job == 'start':
            subj = "Test %d starting as planned" %testid
            msg  = "Your test has been prepared and is going to start as planned." 
        elif job == 'stop':
            subj = "Test %d stopped as planned" %testid
            msg = "Your test has been stopped as planned. The results will be available on the website soon.\nTest results are also accessible using webdav: webdavs://www.flocklab.ethz.ch/user/webdav/"
        elif job == 'abort':
            subj = "Test %d aborted as requested" %testid
For faster browsing, not all history is shown. View entire blame