flocklab_dispatcher.py 69.2 KB
Newer Older
1
2
#! /usr/bin/env python3

3
import sys, os, getopt, errno, threading, shutil, time, datetime, subprocess, tempfile, queue, re, logging, traceback, __main__, types, hashlib, lxml.etree, MySQLdb, signal
4
5
6
import lib.flocklab as flocklab


7
8
logger = None
debug  = False
9
10
11
12
13
14
15
16
17
abort  = False


##############################################################################
#
# sigterm_handler
#
##############################################################################
def sigterm_handler(signum, frame):
18
    global abort
19
20
21
    logger.info("Process received SIGTERM signal.")
    abort = True
### END sigterm_handler
22
23
24
25
26
27
28
29


##############################################################################
#
# StopTestThread
#
##############################################################################
class StopTestThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
30
31
32
33
34
    """    Thread which calls the test stop script on an observer. 
    """ 
    def __init__(self, obskey, obsdict_key, errors_queue, testid):
        threading.Thread.__init__(self) 
        self._obskey        = obskey
35
36
        self._obsdict_key   = obsdict_key
        self._errors_queue  = errors_queue
Reto Da Forno's avatar
Reto Da Forno committed
37
38
39
40
41
        self._abortEvent    = threading.Event()
        self._testid        = testid
        
    def run(self):
        try:
42
            logger.debug("Start StopTestThread for observer ID %d" % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
43
            errors = []
44
            starttime = time.time()
Reto Da Forno's avatar
Reto Da Forno committed
45
            # First test if the observer is online and if the SD card is mounted: 
Reto Da Forno's avatar
Reto Da Forno committed
46
            cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), "mount | grep /dev/mmcblk0p1"]
Reto Da Forno's avatar
Reto Da Forno committed
47
48
49
50
51
52
53
54
55
56
57
58
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            while p.returncode == None:
                self._abortEvent.wait(1.0)
                p.poll()
            if self._abortEvent.is_set():
                p.kill()
            else:
                out, err = p.communicate()
            rs = p.returncode
            if (rs != 0):
                if (rs == 1):
                    if ("No such file or directory" in err):
59
                        msg = "SD card on observer ID %s is not mounted, observer will thus be omitted for this test." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
60
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
61
                        msg = "Observer ID %s is not reachable (returned %d: %s, %s)." % (self._obsdict_key[self._obskey][1], rs, out, err)
Reto Da Forno's avatar
Reto Da Forno committed
62
                else:
63
                    msg = "Observer ID %s is not responsive (SSH returned %d)." % (self._obsdict_key[self._obskey][1], rs)
Reto Da Forno's avatar
Reto Da Forno committed
64
65
66
67
                errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                logger.error(msg)
            else:
                # Call the script on the observer which stops the test:
Reto Da Forno's avatar
Reto Da Forno committed
68
                remote_cmd = flocklab.config.get("observer", "stoptestscript") + " --testid=%d" % self._testid
Reto Da Forno's avatar
Reto Da Forno committed
69
70
                if debug:
                    remote_cmd += " --debug"
Reto Da Forno's avatar
Reto Da Forno committed
71
                cmd = ['ssh' ,'%s' % (self._obsdict_key[self._obskey][2]), remote_cmd]
Reto Da Forno's avatar
Reto Da Forno committed
72
73
74
75
76
77
78
79
80
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
                while p.returncode == None:
                    self._abortEvent.wait(1.0)
                    p.poll()
                if self._abortEvent.is_set():
                    p.kill()
                else:
                    out, err = p.communicate()
                rs = p.returncode
81
                if (rs == flocklab.SUCCESS):
82
                    logger.debug("Test stop script on observer ID %s succeeded (took %us)." % (self._obsdict_key[self._obskey][1], int(time.time() - starttime)))
Reto Da Forno's avatar
Reto Da Forno committed
83
                elif (rs == 255):
84
                    msg = "Observer ID %s is not reachable, thus not able to stop test. Dataloss occurred possibly for this observer." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
85
86
87
                    errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                    logger.error(msg)
                else:
Reto Da Forno's avatar
Reto Da Forno committed
88
                    errors.append(("Test stop script on observer ID %s failed with error code %d." % (str(self._obsdict_key[self._obskey][1]), rs), rs, self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
89
                    logger.error("Test stop script on observer ID %s failed with error code %d:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out).strip()))
90
                    logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
91
92
        except:
            logger.debug("Exception: %s, %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
93
94
95
96
            # Main thread requested abort.
            # Close a possibly still running subprocess:
            if (p is not None) and (p.poll() is not None):
                p.kill()
97
            msg = "StopTestThread for observer ID %d aborted." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
98
99
100
101
102
103
104
105
            errors.append((msg, errno.ECOMM, self._obsdict_key[self._obskey][1]))
            logger.error(msg)
        finally:
            if (len(errors) > 0):
                self._errors_queue.put((self._obskey, errors))
            
    def abort(self):
        self._abortEvent.set()
106
107
108
109
110
111
112
113
114
115
### END StopTestThread



##############################################################################
#
# StartTestThread
#
##############################################################################
class StartTestThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
116
117
118
    """    Thread which uploads all config files to an observer and
        starts the test on the observer. 
    """ 
Reto Da Forno's avatar
Reto Da Forno committed
119
    def __init__(self, obskey, obsdict_key, xmldict_key, imagedict_key, errors_queue, testid):
Reto Da Forno's avatar
Reto Da Forno committed
120
121
        threading.Thread.__init__(self) 
        self._obskey        = obskey
Reto Da Forno's avatar
Reto Da Forno committed
122
123
124
125
        self._obsdict_key   = obsdict_key
        self._xmldict_key   = xmldict_key
        self._imagedict_key = imagedict_key
        self._errors_queue  = errors_queue
Reto Da Forno's avatar
Reto Da Forno committed
126
127
128
129
130
        self._abortEvent    = threading.Event()
        self._testid        = testid
        
    def run(self):
        errors = []
131
132
        testconfigfolder = "%s/%d" % (flocklab.config.get("observer", "testconfigfolder"), self._testid)
        obsdataport      = flocklab.config.getint('serialproxy', 'obsdataport')
133
        starttime        = time.time()
Reto Da Forno's avatar
Reto Da Forno committed
134
        try:
Reto Da Forno's avatar
Reto Da Forno committed
135
            logger.debug("Start StartTestThread for observer ID %d" % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
136
            # First test if the observer is online and if the SD card is mounted: 
137
            cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), "ls ~/data/ && mkdir %s" % testconfigfolder]
Reto Da Forno's avatar
Reto Da Forno committed
138
139
140
141
142
143
144
145
146
147
148
149
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            while p.returncode == None:
                self._abortEvent.wait(1.0)
                p.poll()
            if self._abortEvent.is_set():
                p.kill()
            else:
                out, err = p.communicate()
            rs = p.returncode
            if (rs != 0):
                if (rs == 1):
                    if ("No such file or directory" in err):
150
                        msg = "SD card on observer ID %s is not mounted, observer will thus be omitted for this test." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
151
                    else:
152
                        msg = "Observer ID %s is not reachable, it will thus be omitted for this test (returned: %d: %s, %s)." % (self._obsdict_key[self._obskey][1], rs, out, err)
Reto Da Forno's avatar
Reto Da Forno committed
153
                else:
154
                    msg = "Observer ID %s is not responsive, it will thus be omitted for this test (SSH returned %d). Command: %s" % (self._obsdict_key[self._obskey][1], rs, " ".join(cmd))
Reto Da Forno's avatar
Reto Da Forno committed
155
156
157
158
159
160
161
162
163
164
                errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                logger.error(msg)
            else:
                fileuploadlist = [self._xmldict_key[self._obskey][0]]
                if self._obskey in list(self._imagedict_key.keys()):
                    for image in self._imagedict_key[self._obskey]:
                        fileuploadlist.append(image[0])
                # Now upload the image and XML config file:
                cmd = ['scp', '-q']
                cmd.extend(fileuploadlist)
Reto Da Forno's avatar
Reto Da Forno committed
165
                cmd.append('%s:%s/.' % (self._obsdict_key[self._obskey][2], testconfigfolder))
Reto Da Forno's avatar
Reto Da Forno committed
166
167
168
169
170
171
172
                p = subprocess.Popen(cmd)
                while p.returncode == None:
                    self._abortEvent.wait(1.0)
                    p.poll()
                if self._abortEvent.is_set():
                    p.kill()
                rs = p.returncode
173
                if (rs != flocklab.SUCCESS):
174
                    msg = "Upload of target image and config XML to observer ID %s failed with error number %d\nTried to execute: %s" % (self._obsdict_key[self._obskey][1], rs, (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
175
176
177
                    errors.append((msg, rs, self._obsdict_key[self._obskey][1]))
                    logger.error(msg)
                else:
178
                    logger.debug("Upload of target image and config XML to observer ID %s succeeded." % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
179
                    # Run the script on the observer which starts the test:
Reto Da Forno's avatar
Reto Da Forno committed
180
                    remote_cmd = flocklab.config.get("observer", "starttestscript") + " --testid=%d --xml=%s/%s --serialport=%d" % (self._testid, testconfigfolder, os.path.basename(self._xmldict_key[self._obskey][0]), obsdataport)
Reto Da Forno's avatar
Reto Da Forno committed
181
182
                    if debug:
                        remote_cmd += " --debug"
Reto Da Forno's avatar
Reto Da Forno committed
183
                    cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), remote_cmd]
Reto Da Forno's avatar
Reto Da Forno committed
184
185
186
187
188
189
                    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
                    while p.returncode == None:
                        self._abortEvent.wait(1.0)
                        p.poll()
                    if self._abortEvent.is_set():
                        p.kill()
Reto Da Forno's avatar
Reto Da Forno committed
190
                        logger.debug("Abort is set, start test process for observer %s killed." % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
191
192
193
                    else:
                        out, err = p.communicate()
                    rs = p.wait()
194
                    if rs != flocklab.SUCCESS:
Reto Da Forno's avatar
Reto Da Forno committed
195
196
                        errors.append(("Test start script on observer ID %s failed with error code %d." % (self._obsdict_key[self._obskey][1], rs), rs, self._obsdict_key[self._obskey][1]))
                        logger.error("Test start script on observer ID %s failed with error code %d and message:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out)))
Reto Da Forno's avatar
Reto Da Forno committed
197
                    else:
198
                        logger.debug("Test start script on observer ID %s succeeded (took %us)." % (self._obsdict_key[self._obskey][1], int(time.time() - starttime)))
199
200
201
202
203
204
205
                    # Remove image file and xml on server:
                    os.remove(self._xmldict_key[self._obskey][0])
                    logger.debug("Removed XML config %s for observer ID %s" % (self._xmldict_key[self._obskey][0], self._obsdict_key[self._obskey][1]))
                    if self._obskey in list(self._imagedict_key.keys()):
                        for image in self._imagedict_key[self._obskey]:
                            os.remove(image[0])
                            logger.debug("Removed target image %s for observer ID %s" % (self._imagedict_key[self._obskey][0], self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
206
            
Reto Da Forno's avatar
Reto Da Forno committed
207
208
        except:
            logger.debug("Exception: %s, %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
209
210
211
212
            # Main thread requested abort.
            # Close a possibly still running subprocess:
            if (p is not None) and (p.poll() is not None):
                p.kill()
213
            msg = "StartTestThread for observer ID %d aborted." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
214
215
216
217
218
219
220
221
222
            errors.append((msg, errno.ECOMM, self._obsdict_key[self._obskey][1]))
            logger.error(msg)
        finally:
            if (len(errors) > 0):
                self._errors_queue.put((self._obskey, errors))
        
    def abort(self):
        self._abortEvent.set()
    
223
224
225
226
227
228
229
230
231
232
### END StartTestThread



##############################################################################
#
# start_test
#
##############################################################################
def start_test(testid, cur, cn, obsdict_key, obsdict_id):
Reto Da Forno's avatar
Reto Da Forno committed
233
234
235
236
237
238
    errors = []
    warnings = []
    
    try:    
        logger.debug("Entering start_test() function...")
        # First, validate the XML file again. If validation fails, return immediately:
Reto Da Forno's avatar
Reto Da Forno committed
239
        cmd = [flocklab.config.get('dispatcher','validationscript'), '--testid=%d'%testid]
Reto Da Forno's avatar
Reto Da Forno committed
240
241
242
243
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
        out, err = p.communicate()
        rs = p.returncode
        if rs != 0:
244
245
            logger.error("Error %s returned from %s" % (str(rs), flocklab.config.get('dispatcher','validationscript')))
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
246
247
248
249
250
251
252
253
254
255
            errors.append("Validation of XML failed. Output of script was: %s %s" % (str(out), str(err)))
        
        if len(errors) == 0:
            # Update DB status ---
            # Update the status of the test in the db:
            flocklab.set_test_status(cur, cn, testid, 'preparing')
            
            # Get start/stop time ---
            cur.execute("SELECT `time_start_wish`, `time_end_wish`, `owner_fk` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
            # Times are going to be of datetime type:
256
            ret = cur.fetchone()
Reto Da Forno's avatar
Reto Da Forno committed
257
258
259
            starttime = ret[0]
            stoptime  = ret[1]
            owner_fk = ret[2]
260
261
            logger.debug("Got start time wish for test from database: %s" % starttime)
            logger.debug("Got end time wish for test from database: %s" % stoptime)
Reto Da Forno's avatar
Reto Da Forno committed
262
263
264
265
            
            # Image processing ---
            # Get all images from the database:
            imagedict_key = {}
266
            sql_image =     """ SELECT `t`.`binary`, `m`.`observer_fk`, `m`.`node_id`, LOWER(`a`.`architecture`), `t`.`serv_targetimages_key`, LOWER(`p`.`name`) AS `platname`, `a`.`core` AS `core`
Reto Da Forno's avatar
Reto Da Forno committed
267
268
269
270
271
272
273
274
                                FROM `tbl_serv_targetimages` AS `t` 
                                LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `m` 
                                    ON `t`.`serv_targetimages_key` = `m`.`targetimage_fk` 
                                LEFT JOIN `tbl_serv_platforms` AS `p`
                                    ON `t`.`platforms_fk` = `p`.`serv_platforms_key`
                                LEFT JOIN `tbl_serv_architectures` AS `a`
                                    ON `t`.`core` = `a`.`core` AND `p`.`serv_platforms_key` = `a`.`platforms_fk`
                                WHERE `m`.`test_fk` = %d
275
                            """
Reto Da Forno's avatar
Reto Da Forno committed
276
277
278
279
280
            cur.execute(sql_image%testid)
            ret = cur.fetchall()
            for r in ret:
                binary      = r[0]
                obs_fk      = r[1]
281
                obs_id      = obsdict_key[obs_fk][1]
Reto Da Forno's avatar
Reto Da Forno committed
282
283
                node_id     = r[2]
                arch        = r[3]
284
285
286
                tgimage_key = r[4]
                platname    = r[5]
                core        = r[6]
Reto Da Forno's avatar
Reto Da Forno committed
287
288
289
                
                # Prepare image ---
                (fd, imagepath) = tempfile.mkstemp()
290
                binpath = "%s.ihex" % (os.path.splitext(imagepath)[0])
Reto Da Forno's avatar
Reto Da Forno committed
291
292
293
                imagefile = os.fdopen(fd, 'w+b')
                imagefile.write(binary)
                imagefile.close()
294
                logger.debug("Got target image ID %s for observer ID %s with node ID %s from database and wrote it to temp file %s (hash %s)" % (str(tgimage_key), str(obs_id), str(node_id), imagepath, hashlib.sha1(binary).hexdigest()))
Reto Da Forno's avatar
Reto Da Forno committed
295
                
296
                logger.debug("Found %s target architecture on platform %s for observer ID %s (node ID to be used: %s)." % (arch, platname, str(obs_id), str(node_id)))
297
298
                
                # binary patching
299
                if (node_id != None):
300
301
302
303
304
305
306
307
308
309
310
311
                    # set node ID
                    if flocklab.patch_binary("FLOCKLAB_NODE_ID", node_id, imagepath, arch) != flocklab.SUCCESS:
                        msg = "Failed to patch symbol FLOCKLAB_NODE_ID in binary file %s." % (imagepath)
                        errors.append(msg)
                        logger.error(msg)
                    if flocklab.patch_binary("TOS_NODE_ID", node_id, imagepath, arch) != flocklab.SUCCESS:
                        msg = "Failed to patch symbol TOS_NODE_ID in binary file %s." % (imagepath)
                        errors.append(msg)
                        logger.error(msg)
                # convert elf to intel hex
                if flocklab.bin_to_hex(imagepath, arch, binpath) != flocklab.SUCCESS:
                    msg = "Failed to convert image file %s to Intel hex format." % (imagepath)
Reto Da Forno's avatar
Reto Da Forno committed
312
313
                    errors.append(msg)
                    logger.error(msg)
314
315
                    shutil.move(imagepath, binpath)
                    logger.debug("Copied binary file without modification.")
Reto Da Forno's avatar
Reto Da Forno committed
316
317
                
                # Remove the original file which is not used anymore:
318
                if os.path.exists(imagepath):
Reto Da Forno's avatar
Reto Da Forno committed
319
320
321
322
323
324
325
326
                    os.remove(imagepath)
                
                # Slot detection ---
                # Find out which slot number to use on the observer.
                #logger.debug("Detecting adapter for %s on observer ID %s" %(platname, obs_id))
                ret = flocklab.get_slot(cur, int(obs_fk), platname)
                if ret in range(1,5):
                    slot = ret
327
                    logger.debug("Found adapter for %s on observer ID %s in slot %d" % (platname, obs_id, slot))
Reto Da Forno's avatar
Reto Da Forno committed
328
329
330
331
332
333
334
335
336
337
338
339
340
341
                elif ret == 0:
                    slot = None
                    msg = "Could not find an adapter for %s on observer ID %s" %(platname, obs_id)
                    errors.append(msg)
                    logger.error(msg)
                else:
                    slot = None
                    msg = "Error when detecting adapter for %s on observer ID %s: function returned %d" %(platname, obs_id, ret)
                    errors.append(msg)
                    logger.error(msg)
                        
                # Write the dictionary for the image:
                if not obs_fk in imagedict_key:
                    imagedict_key[obs_fk] = []
342
                imagedict_key[obs_fk].append((binpath, slot, platname, 0.0, core))
Reto Da Forno's avatar
Reto Da Forno committed
343
344
345
346
347
348
349
350
351
352
353
354
                
            logger.info("Processed all target images from database.")
                
            # XML processing ---
            # Get the XML config from the database and generate a separate file for every observer used:
            cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
            ret = cur.fetchone()
            if not ret:
                msg = "No XML found in database for testid %d." %testid
                errors.append(msg)
                logger.error(msg)
            else:
355
356
                parser = lxml.etree.XMLParser(remove_comments=True)
                tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
357
                ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
                logger.debug("Got XML from database.")
                # Create XML files ---
                # Create an empty XML config file for every observer used and organize them in a dictionary:
                xmldict_key = {}
                for obs_key, obs_id, obs_ether in obsdict_key.values():
                    (fd, xmlpath) = tempfile.mkstemp()
                    xmlfhand = os.fdopen(fd, 'w+')
                    xmldict_key[obs_key] = (xmlpath, xmlfhand)
                    xmlfhand.write('<?xml version="1.0" encoding="UTF-8"?>\n\n<obsConf>\n\n')
                # Go through the blocks of the XML file and write the configs to the affected observer XML configs:
                # targetConf ---
                targetconfs = tree.xpath('//d:targetConf', namespaces=ns)
                if not targetconfs:
                    msg = "no <targetConf> element found in XML config (wrong namespace?)"
                    errors.append(msg)
                    logger.error(msg)
                for targetconf in targetconfs:
                    obsids = targetconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                    ret = targetconf.xpath('d:voltage', namespaces=ns)
                    if ret:
                        voltage = ret[0].text.strip()
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
380
                        voltage = str(flocklab.config.get("dispatcher", "default_tg_voltage"))
Reto Da Forno's avatar
Reto Da Forno committed
381
382
383
384
385
386
387
388
389
390
391
392
                    ret = targetconf.xpath('d:noImage', namespaces=ns)
                    if ret:
                        noImageSlot = ret[0].text.strip()
                    else:
                        noImageSlot = None
                    for obsid in obsids:
                        obsid = int(obsid)
                        obskey = obsdict_id[obsid][0]
                        xmldict_key[obskey][1].write("<obsTargetConf>\n")
                        xmldict_key[obskey][1].write("\t<voltage>%s</voltage>\n"%voltage)
                        if noImageSlot:
                            slot = noImageSlot
393
                            xmldict_key[obskey][1].write("\t<slotnr>%s</slotnr>\n" % (slot))
Reto Da Forno's avatar
Reto Da Forno committed
394
                        else:
395
                            xmldict_key[obskey][1].write("\t<firmware>%s</firmware>\n" % (imagedict_key[obskey][0][3]))
Reto Da Forno's avatar
Reto Da Forno committed
396
                            for coreimage in imagedict_key[obskey]:
397
                                xmldict_key[obskey][1].write("\t<image core=\"%d\">%s/%d/%s</image>\n" % (coreimage[4], flocklab.config.get("observer", "testconfigfolder"),testid, os.path.basename(coreimage[0])))
398
399
                            xmldict_key[obskey][1].write("\t<slotnr>%s</slotnr>\n" % (imagedict_key[obskey][0][1]))
                            xmldict_key[obskey][1].write("\t<platform>%s</platform>\n" % (imagedict_key[obskey][0][2]))
400
                            xmldict_key[obskey][1].write("\t<os>%s</os>\n" % (imagedict_key[obskey][0][2]))
Reto Da Forno's avatar
Reto Da Forno committed
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
                            slot = imagedict_key[obskey][0][1]
                        xmldict_key[obskey][1].write("</obsTargetConf>\n\n")
                        #logger.debug("Wrote obsTargetConf XML for observer ID %s" %obsid)
                        # update test_image mapping with slot information
                        cur.execute("UPDATE `tbl_serv_map_test_observer_targetimages` SET `slot` = %s WHERE `observer_fk` = %d AND `test_fk`=%d" % (slot, obskey, testid))
                        cn.commit()
                
                # serialConf ---
                srconfs = tree.xpath('//d:serialConf', namespaces=ns)
                serialProxyUsed = False
                if srconfs:
                    # only use serialproxy if remote IP specified in xml
                    if tree.xpath('//d:serialConf/d:remoteIp', namespaces=ns):
                        serialProxyUsed = True
                    for srconf in srconfs:
                        obsids = srconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        xmlblock = "<obsSerialConf>\n"
                        port = srconf.xpath('d:port', namespaces=ns)
                        if port:
420
                            port = port[0].text.strip()
Reto Da Forno's avatar
Reto Da Forno committed
421
422
423
                            xmlblock += "\t<port>%s</port>\n" %port
                        baudrate = srconf.xpath('d:baudrate', namespaces=ns)
                        if baudrate:
424
                            baudrate = baudrate[0].text.strip()
Reto Da Forno's avatar
Reto Da Forno committed
425
426
427
                            xmlblock += "\t<baudrate>%s</baudrate>\n" %baudrate
                        mode = srconf.xpath('d:mode', namespaces=ns)
                        if mode:
428
                            mode = mode[0].text.strip()
Reto Da Forno's avatar
Reto Da Forno committed
429
430
431
432
433
434
435
436
                            xmlblock += "\t<mode>%s</mode>\n" %mode
                        xmlblock += "</obsSerialConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                            #logger.debug("Wrote obsSerialConf XML for observer ID %s" %obsid)
                else:
437
                    logger.debug("No <serialConf> found, not using serial service.")
Reto Da Forno's avatar
Reto Da Forno committed
438
                
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
                # debugConf ---
                srconfs = tree.xpath('//d:debugConf', namespaces=ns)
                if srconfs:
                    for srconf in srconfs:
                        obsids = srconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        xmlblock = "<obsDebugConf>\n"
                        remoteIp = srconf.xpath('d:remoteIp', namespaces=ns)
                        if remoteIp:
                            remoteIp = remoteIp[0].text.strip()
                            xmlblock += "\t<remoteIp>%s</remoteIp>\n" % (remoteIp)
                        gdbPort = srconf.xpath('d:gdbPort', namespaces=ns)
                        if gdbPort:
                            gdbPort = gdbPort[0].text.strip()
                            xmlblock += "\t<gdbPort>%s</gdbPort>\n" % (gdbPort)
                        xmlblock += "</obsDebugConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                            #logger.debug("Wrote obsDebugConf XML for observer ID %s" %obsid)
                else:
                    logger.debug("No <debugConf> found, not using debug service.")
                
Reto Da Forno's avatar
Reto Da Forno committed
462
463
                # gpioTracingConf ---
                gmconfs = tree.xpath('//d:gpioTracingConf', namespaces=ns)
464
465
466
467
                if gmconfs:
                    for gmconf in gmconfs:
                        obsids = gmconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        pinconfs = gmconf.xpath('d:pinConf', namespaces=ns)
468
                        pinlist = gmconf.xpath('d:pins', namespaces=ns)
469
                        xmlblock = "<obsGpioMonitorConf>\n"
470
471
                        if pinlist:
                            xmlblock += "\t<pins>" + pinlist[0].text.strip() + "</pins>\n"
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
                        for pinconf in pinconfs:
                            pin  = pinconf.xpath('d:pin', namespaces=ns)[0].text.strip()
                            edge = pinconf.xpath('d:edge', namespaces=ns)[0].text.strip()
                            mode = pinconf.xpath('d:mode', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<pinConf>\n\t\t<pin>%s</pin>\n\t\t<edge>%s</edge>\n\t\t<mode>%s</mode>\n" %(pin, edge, mode)
                            cb_gs_add = pinconf.xpath('d:callbackGpioActAdd', namespaces=ns)
                            if cb_gs_add:
                                pin = cb_gs_add[0].xpath('d:pin', namespaces=ns)[0].text.strip()
                                level = cb_gs_add[0].xpath('d:level', namespaces=ns)[0].text.strip()
                                offsets = cb_gs_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
                                offsetms = cb_gs_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
                                xmlblock += "\t\t<callbackGpioSetAdd>\n\t\t\t<pin>%s</pin>\n\t\t\t<level>%s</level>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackGpioSetAdd>\n" %(pin, level, offsets, offsetms)
                            cb_pp_add = pinconf.xpath('d:callbackPowerProfAdd', namespaces=ns)
                            if cb_pp_add:
                                duration = cb_pp_add[0].xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()
                                offsets = cb_pp_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
                                offsetms = cb_pp_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
                                xmlblock += "\t\t<callbackPowerprofAdd>\n\t\t\t<duration>%s</duration>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackPowerprofAdd>\n" %(duration, offsets, offsetms)
                            xmlblock += "\t</pinConf>\n"
                        xmlblock += "</obsGpioMonitorConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                          #logger.debug("Wrote obsGpioMonitorConf XML for observer ID %s" %obsid)
                else:
                    logger.debug("No <gpioTracingConf> found, not using GPIO tracing service.")
Reto Da Forno's avatar
Reto Da Forno committed
499
500
501
502
503
504
                        
                # gpioActuationConf ---
                # Create 2 pin settings for every observer used in the test: 
                #        1) Pull reset pin of target low when test is to start
                #        2) Pull reset pin of target high when test is to stop
                xmlblock = "<obsGpioSettingConf>\n"
Reto Da Forno's avatar
Reto Da Forno committed
505
                startdatetime = starttime.strftime(flocklab.config.get("observer", "timeformat"))
Reto Da Forno's avatar
Reto Da Forno committed
506
507
                startmicrosecs = starttime.microsecond
                xmlblock += "\t<pinConf>\n\t\t<pin>RST</pin>\n\t\t<level>low</level>\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%d</absoluteMicrosecs>\n\t\t</absoluteTime>\n\t\t<intervalMicrosecs>0</intervalMicrosecs>\n\t\t<count>1</count>\n\t</pinConf>\n" %(startdatetime, startmicrosecs)
Reto Da Forno's avatar
Reto Da Forno committed
508
                stopdatetime = stoptime.strftime(flocklab.config.get("observer", "timeformat"))
Reto Da Forno's avatar
Reto Da Forno committed
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
                stopmicrosecs = stoptime.microsecond
                xmlblock += "\t<pinConf>\n\t\t<pin>RST</pin>\n\t\t<level>high</level>\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%d</absoluteMicrosecs>\n\t\t</absoluteTime>\n\t\t<intervalMicrosecs>0</intervalMicrosecs>\n\t\t<count>1</count>\n\t</pinConf>\n" %(stopdatetime, stopmicrosecs)
                for obskey in obsdict_key.keys():
                    xmldict_key[obskey][1].write(xmlblock)
                # Now write the per-observer config:
                gsconfs = tree.xpath('//d:gpioActuationConf', namespaces=ns)
                for gsconf in gsconfs:
                    xmlblock = ""
                    obsids = gsconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                    pinconfs = gsconf.xpath('d:pinConf', namespaces=ns)
                    for pinconf in pinconfs:
                        pin  = pinconf.xpath('d:pin', namespaces=ns)[0].text.strip()
                        level = pinconf.xpath('d:level', namespaces=ns)[0].text.strip()
                        abs_tim = pinconf.xpath('d:absoluteTime', namespaces=ns)
                        if abs_tim:
                            absdatetime = absolute2absoluteUTC_time(abs_tim[0].xpath('d:absoluteDateTime', namespaces=ns)[0].text.strip())
                            ret = abs_tim[0].xpath('d:absoluteMicrosecs', namespaces=ns)
                            if ret:
                                absmicrosec = int(ret[0].text.strip())
                            else:
                                absmicrosec = 0
                        rel_tim = pinconf.xpath('d:relativeTime', namespaces=ns)
                        if rel_tim:
                            relsec = int(rel_tim[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip())
                            ret = rel_tim[0].xpath('d:offsetMicrosecs', namespaces=ns)
                            if ret:
                                relmicrosec = int(ret[0].text.strip())
                            else:
                                relmicrosec = 0
                            # Relative times need to be converted into absolute times:
                            absmicrosec, absdatetime = relative2absolute_time(starttime, relsec, relmicrosec)    
                        periodic = pinconf.xpath('d:periodic', namespaces=ns)
                        if periodic:
                            interval = int(periodic[0].xpath('d:intervalMicrosecs', namespaces=ns)[0].text.strip())
                            count = int(periodic[0].xpath('d:count', namespaces=ns)[0].text.strip())
                        else:
                            interval = 0
                            count = 1
                        xmlblock += "\t<pinConf>\n\t\t<pin>%s</pin>\n\t\t<level>%s</level>\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%s</absoluteMicrosecs>\n\t\t</absoluteTime>\n\t\t<intervalMicrosecs>%i</intervalMicrosecs>\n\t\t<count>%i</count>\n\t</pinConf>\n" %(pin, level, absdatetime, absmicrosec, interval, count)
                    for obsid in obsids:
                        obsid = int(obsid)
                        obskey = obsdict_id[obsid][0]
                        xmldict_key[obskey][1].write(xmlblock)
                        #logger.debug("Wrote obsGpioSettingConf XML for observer ID %s" %obsid)
                xmlblock = "</obsGpioSettingConf>\n\n"
                for obskey in obsdict_key.keys():
                    xmldict_key[obskey][1].write(xmlblock)
556
                
Reto Da Forno's avatar
Reto Da Forno committed
557
558
                # powerProfilingConf ---
                ppconfs = tree.xpath('//d:powerProfilingConf', namespaces=ns)
559
560
561
562
563
564
                if ppconfs:
                    for ppconf in ppconfs:
                        obsids = ppconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        profconfs = ppconf.xpath('d:profConf', namespaces=ns)
                        xmlblock = "<obsPowerprofConf>\n"
                        for profconf in profconfs:
Reto Da Forno's avatar
Reto Da Forno committed
565
566
567
568
                            duration = profconf.xpath('d:duration', namespaces=ns)
                            if duration:
                                duration = duration[0].text.strip()
                            else:
Reto Da Forno's avatar
Reto Da Forno committed
569
570
571
572
573
574
575
576
                                try:
                                    duration = int(profconf.xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()) / 1000
                                except:
                                    duration = 0
                            #xmlblock += "\t<profConf>\n"
                            xmlblock += "\t<duration>%s</duration>" % duration
                            # calculate the sampling start
                            offset  = profconf.xpath('d:offset', namespaces=ns)
577
                            rel_tim = profconf.xpath('d:relativeTime', namespaces=ns)
Reto Da Forno's avatar
Reto Da Forno committed
578
579
580
581
582
583
584
                            abs_tim = profconf.xpath('d:absoluteTime', namespaces=ns)
                            if offset:
                                offset = int(offset[0].text.strip())
                                tstart = datetime.datetime.timestamp(starttime + datetime.timedelta(seconds=offset))
                            elif abs_tim:
                                tstart = datetime.datetime.timestamp(flocklab.get_xml_timestamp(abs_tim[0].xpath('d:absoluteDateTime', namespaces=ns)[0].text.strip()))
                            elif rel_tim:
585
                                relsec = int(rel_tim[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip())
Reto Da Forno's avatar
Reto Da Forno committed
586
587
588
589
                                tstart = datetime.datetime.timestamp(starttime + datetime.timedelta(seconds=relsec))
                            xmlblock += "\n\t<starttime>%s</starttime>" % (tstart)
                            # check if config contains samplingRate:
                            samplingrate    = profconf.xpath('d:samplingRate', namespaces=ns)
590
                            samplingdivider = profconf.xpath('d:samplingDivider', namespaces=ns)
Reto Da Forno's avatar
Reto Da Forno committed
591
592
593
594
                            if samplingrate:
                                samplingrate = samplingrate[0].text.strip()
                                xmlblock += "\n\t<samplingRate>%s</samplingRate>" % samplingrate
                            elif samplingdivider:
595
                                samplingdivider = samplingdivider[0].text.strip()
Reto Da Forno's avatar
Reto Da Forno committed
596
                                xmlblock += "\n\t<samplingDivider>%s</samplingDivider>" % samplingdivider
Reto Da Forno's avatar
Reto Da Forno committed
597
                            else:
Reto Da Forno's avatar
Reto Da Forno committed
598
599
600
601
602
                                samplingdivider = flocklab.config.get('dispatcher', 'default_sampling_divider')
                                xmlblock += "\n\t<samplingDivider>%s</samplingDivider>" % samplingdivider
                            #xmlblock += "\n\t</profConf>\n"
                            break    # for now, only parse the first block
                        xmlblock += "\n</obsPowerprofConf>\n\n"
603
604
605
606
607
608
609
610
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                            #logger.debug("Wrote obsPowerprofConf XML for observer ID %s" %obsid)
                else:
                    logger.debug("No <powerProfilingConf> found, not using power profiling service.")
                 
Reto Da Forno's avatar
Reto Da Forno committed
611
                logger.debug("Wrote all observer XML configs.")
612
                
Reto Da Forno's avatar
Reto Da Forno committed
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
                # Close XML files ---
                for xmlpath, xmlfhand in xmldict_key.values():
                    xmlfhand.write("</obsConf>\n")
                    xmlfhand.close()
                    #logger.debug("Closed observer XML config %s"%xmlpath)
                #logger.debug("Closed all observer XML configs.")
                
        # Upload configs to observers and start test ---
        if len(errors) == 0:
            if not db_register_activity(testid, cur, cn, 'start', iter(obsdict_key.keys())):
                msg = "Could not access all needed observers for testid %d." %testid
                errors.append(msg)
                logger.error(msg)
        if len(errors) == 0:
            # -- START OF CRITICAL SECTION where dispatcher accesses used observers
            # Start a thread for each observer which uploads the config and calls the test start script on the observer
            thread_list = []
            errors_queue = queue.Queue()
            for obskey in obsdict_key.keys():
Reto Da Forno's avatar
Reto Da Forno committed
632
                thread = StartTestThread(obskey, obsdict_key, xmldict_key, imagedict_key, errors_queue, testid)
Reto Da Forno's avatar
Reto Da Forno committed
633
634
635
636
637
638
                thread_list.append((thread, obskey))
                thread.start()
                #DEBUG logger.debug("Started thread for test start on observer ID %s" %(str(obsdict_key[obskey][1])))
            # Wait for all threads to finish:
            for (thread, obskey) in thread_list:
                # Wait max 75% of the setuptime:
639
                thread.join(timeout=(flocklab.config.getint('tests','setuptime')*0.75))
Reto Da Forno's avatar
Reto Da Forno committed
640
641
                if thread.isAlive():
                    # Timeout occurred. Signal the thread to abort:
Reto Da Forno's avatar
Reto Da Forno committed
642
                    logger.error("Telling thread for test start on observer ID %s to abort..." % (str(obsdict_key[obskey][1])))
Reto Da Forno's avatar
Reto Da Forno committed
643
644
                    thread.abort()
            # Wait again for the aborted threads:
645
            for (thread, obskey) in thread_list:
Reto Da Forno's avatar
Reto Da Forno committed
646
647
                thread.join(timeout=10)
                if thread.isAlive():
Reto Da Forno's avatar
Reto Da Forno committed
648
                    msg = "Thread for test start on observer ID %s timed out and will be aborted now." % (str(obsdict_key[obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
649
650
651
652
                    errors.append(msg)
                    logger.error(msg)
            # -- END OF CRITICAL SECTION where dispatcher accesses used observers
            db_unregister_activity(testid, cur, cn, 'start')
653
            
Reto Da Forno's avatar
Reto Da Forno committed
654
655
656
            # Get all errors (if any). Observers which return errors are not regarded as a general error. In this
            # case, the test is just started without the faulty observers if there is at least 1 observer that succeeded:
            obs_error = []
657
658
            #if not errors_queue.empty():
                #logger.error("Queue with errors from test start thread is not empty. Getting errors...")
Reto Da Forno's avatar
Reto Da Forno committed
659
660
661
            while not errors_queue.empty():
                errs = errors_queue.get()
                for err in errs[1]:
662
                    #logger.error("Error from test start thread for observer %s: %s" %(str(err[2]), str(err[0])))
Reto Da Forno's avatar
Reto Da Forno committed
663
664
665
                    obs_error.append(err[2])
                    warnings.append(err[0])
            if len(obs_error) > 0:
Reto Da Forno's avatar
Reto Da Forno committed
666
667
668
669
670
671
672
                # Abort or continue?
                if not flocklab.config.get("dispatcher", "continue_on_error"):
                    msg = "At least one observer failed to start the test, going to abort..."
                    errors.append(msg)
                    logger.error(msg)
                # Check if there is at least 1 observer which succeeded:
                elif (len(obsdict_id) == len(set(obs_error))):
Reto Da Forno's avatar
Reto Da Forno committed
673
674
675
676
677
678
679
680
                    msg = "None of the requested observers could successfully start the test."
                    errors.append(msg)
                    logger.error(msg)
        
        # Start proxy for serial service ---
        if len(errors) == 0:
            if serialProxyUsed:
                # Start serial proxy:
Reto Da Forno's avatar
Reto Da Forno committed
681
                logger.debug("Starting serial proxy...")
Reto Da Forno's avatar
Reto Da Forno committed
682
                cmd = [flocklab.config.get("dispatcher", "serialproxyscript"), "--notify"]
Reto Da Forno's avatar
Reto Da Forno committed
683
684
685
686
687
                if debug: 
                    cmd.append("--debug")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                rs = p.wait()
                if (rs != 0):
688
                    msg = "Serial proxy for test ID %d could not be started (error code %d)." % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
689
690
                    errors.append(msg)
                    logger.error(msg)
691
                    logger.debug("Executed command was: %s" % (str(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
692
693
694
                else:
                    logger.debug("Started serial proxy.")
    
Reto Da Forno's avatar
Reto Da Forno committed
695
        # Start fetcher ---
Reto Da Forno's avatar
Reto Da Forno committed
696
        if len(errors) == 0:
Reto Da Forno's avatar
Reto Da Forno committed
697
698
            logger.debug("Starting fetcher...")
            cmd = [flocklab.config.get("dispatcher", "fetcherscript"), "--testid=%d" % testid]
Reto Da Forno's avatar
Reto Da Forno committed
699
700
701
702
703
            if debug:
                cmd.append("--debug")
            p = subprocess.Popen(cmd)
            rs = p.wait()
            if rs != 0:
Reto Da Forno's avatar
Reto Da Forno committed
704
                msg = "Could not start fetcher for test ID %d. Fetcher returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
705
706
                errors.append(msg)
                logger.error(msg)
707
                logger.error("Tried to execute: %s" % (" ".join(cmd)))
708

Reto Da Forno's avatar
Reto Da Forno committed
709
710
711
712
713
714
715
716
        # check if we're still in time ---
        if len(errors) == 0:
            now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
            cur.execute("SELECT `serv_tests_key` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d AND `time_start_wish` <= '%s'" % (testid, now))
            if cur.fetchone() is not None:
                msg = "Setup for test ID %d took too much time." % (testid)
                errors.append(msg)
                logger.error(msg)
717

Reto Da Forno's avatar
Reto Da Forno committed
718
719
720
721
722
723
724
725
726
727
728
729
        # Update DB status, set start time ---
        if len(errors) == 0:
            logger.debug("Setting test status in DB to running...")
            flocklab.set_test_status(cur, cn, testid, 'running')
            cur.execute("UPDATE `tbl_serv_tests` SET `time_start_act` = `time_start_wish` WHERE `serv_tests_key` = %d" %testid)
            cn.commit()
        else:
            logger.debug("Setting test status in DB to aborting...")
            flocklab.set_test_status(cur, cn, testid, 'aborting')
            cur.execute("UPDATE `tbl_serv_tests` SET `time_start_act` = `time_start_wish`, `time_end_act` = UTC_TIMESTAMP() WHERE `serv_tests_key` = %d" %testid)
            cn.commit()
        logger.debug("At end of start_test(). Returning...")
730

Reto Da Forno's avatar
Reto Da Forno committed
731
732
        return (errors, warnings)
    except Exception:
733
        msg = "Unexpected error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
734
        logger.error(msg)
Reto Da Forno's avatar
Reto Da Forno committed
735
        raise
736
737
738
739
740
741
742
743
744
745
### END start_test()



##############################################################################
#
# stop_test
#
##############################################################################
def stop_test(testid, cur, cn, obsdict_key, obsdict_id, abort=False):
Reto Da Forno's avatar
Reto Da Forno committed
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
    errors = []
    warnings = []
    
    try:
        logger.info("Stopping test %d..."%testid)
        
        # Update DB status --- 
        if abort:
            status = 'aborting'
        else:
            status = 'cleaning up'
        logger.debug("Setting test status in DB to %s..." %status)
        flocklab.set_test_status(cur, cn, testid, status)
        
        # Stop serial proxy ---
        # Get the XML config from the database and check if the serial service was used in the test:
        cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
        ret = cur.fetchone()
        if not ret:
            msg = "No XML found in database for testid %d." %testid
            errors.append(msg)
            logger.error(msg)
        else:
769
770
            parser = lxml.etree.XMLParser(remove_comments=True)
            tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
771
            ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
772
773
774
775
776
            logger.debug("Got XML from database.")
            # only stop serialproxy if remote IP specified in xml
            if tree.xpath('//d:serialConf/d:remoteIp', namespaces=ns):
                # Serial service was used. Thus stop the serial proxy:
                logger.debug("Usage of serial service detected. Stopping serial proxy...")
Reto Da Forno's avatar
Reto Da Forno committed
777
                cmd = [flocklab.config.get("dispatcher", "serialproxyscript"), "--notify"]
Reto Da Forno's avatar
Reto Da Forno committed
778
779
780
781
782
                if debug: 
                    cmd.append("--debug")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                rs = p.wait()
                if (rs != 0):
783
                    msg = "Serial proxy for test ID %d could not be stopped. Serial proxy returned %d." % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
784
785
                    errors.append(msg)
                    logger.error(msg)
786
                    logger.debug("Executed command was: %s" % (str(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
                else:
                    logger.debug("Stopped serial proxy.")
        
        # Stop test on observers ---
        if not db_register_activity(testid, cur, cn, 'stop', iter(obsdict_key.keys())):
            msg = "Some observers were occupied while stopping test."
            logger.warn(msg)
            warnings.append(msg)
        # Start a thread for each observer which calls the test stop script on the observer
        logger.info("Stopping test on observers...")
        thread_list = []
        errors_queue = queue.Queue()
        for obskey in obsdict_key.keys():
            thread = StopTestThread(obskey, obsdict_key, errors_queue,testid)
            thread_list.append((thread, obskey))
            thread.start()
            logger.debug("Started thread for test stop on observer ID %s" %(str(obsdict_key[obskey][1])))
        # Wait for all threads to finish:
        for (thread, obskey) in thread_list:
806
            thread.join(timeout=(flocklab.config.getint('tests','cleanuptime') * 0.75))
Reto Da Forno's avatar
Reto Da Forno committed
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
            if thread.isAlive():
                # Timeout occurred. Signal the thread to abort:
                msg = "Telling thread for test stop on observer ID %s to abort..." %(str(obsdict_key[obskey][1]))
                logger.error(msg)
                warnings.append(msg)
                thread.abort()
        # Wait again for the aborted threads:
        for (thread, obskey) in thread_list:    
            thread.join(timeout=10)
            if thread.isAlive():
                msg = "Thread for test stop on observer ID %s is still alive but should be aborted now." %(str(obsdict_key[obskey][1]))
                errors.append(msg)
                logger.error(msg)
        db_unregister_activity(testid, cur, cn, 'stop')
        # cleanup resource allocation
Reto Da Forno's avatar
Reto Da Forno committed
822
        now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
Reto Da Forno's avatar
Reto Da Forno committed
823
824
825
826
827
        cur.execute("DELETE FROM tbl_serv_resource_allocation where `time_end` < '%s' OR `test_fk` = %d" % (now, testid))
        cn.commit()
        # Stop fetcher ---
        # This has to be done regardless of previous errors.
        logger.info("Stopping fetcher...")
828
        cmd = [flocklab.config.get("dispatcher", "fetcherscript"),"--testid=%d" % testid, "--stop"]
Reto Da Forno's avatar
Reto Da Forno committed
829
830
831
832
        if debug: 
            cmd.append("--debug")
        p = subprocess.Popen(cmd)
        rs = p.wait()
833
        if rs not in (flocklab.SUCCESS, errno.ENOPKG): # flocklab.SUCCESS (0) is successful stop, ENOPKG (65) means the service was not running. 
Reto Da Forno's avatar
Reto Da Forno committed
834
            msg = "Could not stop fetcher for test ID %d. Fetcher returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
835
836
            errors.append(msg)
            logger.error(msg)
837
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
838
839
    
        # Get all errors (if any). Observers which return errors are not regarded as a general error.
840
841
        #if not errors_queue.empty():
            #logger.error("Queue with errors from test stop thread is not empty. Getting errors...")
Reto Da Forno's avatar
Reto Da Forno committed
842
843
844
        while not errors_queue.empty():
            errs = errors_queue.get()
            for err in errs[1]:
845
                #logger.error("Error from test stop thread: %s" %(str(err[0])))
Reto Da Forno's avatar
Reto Da Forno committed
846
847
848
849
850
851
852
853
                warnings.append(err[0])
        
        # Set stop time in DB ---
        cur.execute("UPDATE `tbl_serv_tests` SET `time_end_act` = UTC_TIMESTAMP() WHERE `serv_tests_key` = %d" %testid)
        cn.commit()
        
        return (errors, warnings)
    except Exception:
854
        msg = "Unexpected error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
855
        logger.error(msg)
Reto Da Forno's avatar
Reto Da Forno committed
856
        raise
857
858
859
860
861
862
863
864
865
### END stop_test()


##############################################################################
#
# prepare_testresults
#
##############################################################################
def prepare_testresults(testid, cur):
Reto Da Forno's avatar
Reto Da Forno committed
866
867
868
869
870
871
    """    This function prepares testresults for the user. It calls the archiver.
        If several instances of the archiver
        are running, it may take a long time for this function to finish as it will wait
        for these functions to succeed.
    """
    errors = []
872
    tree = None
Reto Da Forno's avatar
Reto Da Forno committed
873
874
875
876
    
    # Check if results directory exists
    testresultsdir = "%s/%d" % (flocklab.config.get('fetcher', 'testresults_dir'), testid)
    if not os.path.isdir(testresultsdir):
877
        errors.append("Test results directory does not exist.")
Reto Da Forno's avatar
Reto Da Forno committed
878
879
        return errors
    
Reto Da Forno's avatar
Reto Da Forno committed
880
881
882
883
884
885
    logger.debug("Preparing testresults...")
    
    # Check if user wants test results as email ---
    logger.debug("Check if user wants testresults as email...")
    emailResults = False
    # Get the XML config from the database:
886
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" % testid)
Reto Da Forno's avatar
Reto Da Forno committed
887
888
    ret = cur.fetchone()
    if ret:
889
890
        parser = lxml.etree.XMLParser(remove_comments=True)
        tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
891
        ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
892
893
894
895
896
897
898
899
900
901
902
903
904
        logger.debug("Got XML from database.")
        # Check if user wants results as email
        ret = tree.xpath('//d:generalConf/d:emailResults', namespaces=ns)
        if not ret:
            logger.debug("Could not get relevant XML value <emailResults>, thus not emailing results to user.")
        else:
            if (ret[0].text.lower() == 'yes'):
                emailResults = True
    if not emailResults:
        logger.debug("User does not want test results as email.")
    else:
        logger.debug("User wants test results as email. Will trigger the email.")
    
905
906
    # Add config XML to results directory
    if flocklab.config.get('archiver', 'include_xmlconfig'):
Reto Da Forno's avatar
Reto Da Forno committed
907
908
909
910
911
912
913
        if tree:
            et = lxml.etree.ElementTree(tree)
            et.write("%s/testconfig.xml" % testresultsdir, pretty_print=True)
            logger.debug("XML config copied to results folder.")
        else:
            logger.warn("Could not copy XML config to test results directory.")
    
Reto Da Forno's avatar
Reto Da Forno committed
914
    # Archive test results ---
915
    cmd = [flocklab.config.get('dispatcher', 'archiverscript'),"--testid=%d" % testid]
Reto Da Forno's avatar
Reto Da Forno committed
916
917
918
919
920
    if emailResults:
        cmd.append("--email")
    if debug: 
        cmd.append("--debug")
    # Call the script until it succeeds:
Reto Da Forno's avatar
Reto Da Forno committed
921
    waittime = flocklab.config.getint('dispatcher', 'archiver_waittime')
Reto Da Forno's avatar
Reto Da Forno committed
922
923
924
925
    rs = errno.EUSERS
    while rs == errno.EUSERS:
        p = subprocess.Popen(cmd)
        rs = p.wait()
926
        if rs not in (flocklab.SUCCESS, errno.EUSERS): # flocklab.SUCCESS (0) is successful stop, EUSERS (87) means the maximum number of allowed instances is reached. 
927
            msg = "Could not trigger archiver. Archiver returned error %d" % (rs)
Reto Da Forno's avatar
Reto Da Forno committed
928
            logger.error(msg)
929
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
930
931
932
933
            errors.append(msg)
            return errors
        if rs == errno.EUSERS:
            # Maximum number of instances is reached. Wait some time before calling again.
934
            logger.info("Archiver returned EUSERS. Wait for %d s before trying again..." % waittime)
Reto Da Forno's avatar
Reto Da Forno committed
935
936
937
938
939
            time.sleep(waittime)
    logger.debug("Call to archiver successful.")
    
    logger.debug("Prepared testresults.")
    return errors
940
941
942
943
944
945
946
947
948
### END prepare_testresults()


##############################################################################
#
# evalute_linkmeasurement
#
##############################################################################
def evalute_linkmeasurement(testid, cur):
Reto Da Forno's avatar
Reto Da Forno committed
949
950
951
952
    errors = []
    # if link measurement, evaluate data
    cur.execute("SELECT `username` FROM `tbl_serv_tests` LEFT JOIN `tbl_serv_users` ON (`serv_users_key`=`owner_fk`) WHERE (`serv_tests_key` = %s)" %testid)
    ret = cur.fetchone()
Reto Da Forno's avatar
Reto Da Forno committed
953
    if ret and ret[0]==flocklab.config.get('linktests', 'user'):
Reto Da Forno's avatar
Reto Da Forno committed
954
        logger.debug("Evaluating link measurements.")
Reto Da Forno's avatar
Reto Da Forno committed
955
        cmd = [flocklab.config.get('dispatcher', 'testtolinkmapscript')]
956
        p = subprocess.Popen(cmd)
Reto Da Forno's avatar
Reto Da Forno committed
957
        rs = p.wait()
958
        if rs != flocklab.SUCCESS:
Reto Da Forno's avatar
Reto Da Forno committed
959
960
961
962
963
964
            msg = "Error %s returned from testtolinkmap script" % str(rs)
            logger.error(msg)
            errors.append(msg)
        else:
            logger.debug("Link measurement evaluations finished.")
    return errors
965
966
967
968
969
970
971
972
973
### END evalute_linkmeasurement()


##############################################################################
#
# inform_user
#
##############################################################################
def inform_user(testid, cur, job, errors, warnings):
Reto Da Forno's avatar
Reto Da Forno committed
974
975
976
977
978
979
980
981
982
983
984
985
    if len(errors) != 0:
        subj = "Error notification"
        if job == 'start':
            msg = "The test with ID %d could not be started as planned because of the following errors:\n\n" %testid
        elif job == 'stop':
            msg = "The test with ID %d could not be stopped as planned because of the following errors:\n\n" %testid
        elif job == 'abort':
            msg = "The test with ID %d could not be aborted as requested because of the following errors:\n\n" %testid
        for error in errors:
            msg += "\t * %s\n" %error
        for warn in warnings:
            msg += "\t * %s\n" %warn
986
        ret = flocklab.FAILED
Reto Da Forno's avatar
Reto Da Forno committed
987
988
989
990
991
992
993
994
995
996
997
998
    elif len(warnings) != 0:
        if job == 'start':
            subj = "Test %d starting with warnings" %testid
            msg  = "Your test has been prepared and is going to start as planned, but consider the following warnings:\n\n" 
        elif job == 'stop':
            subj = "Test %d stopped with warnings" %testid
            msg = "Your test has been stopped as planned and the results will be available on the website soon.\nTest results are also accessible using webdav: webdavs://www.flocklab.ethz.ch/user/webdav/\nConsider the following warnings:\n\n"
        elif job == 'abort':
            subj = "Test %d aborted with warnings" %testid
            msg = "Your test has been aborted as requested and the results (if any) will be available on the website soon\nTest results are also accessible using webdav: webdavs://www.flocklab.ethz.ch/user/webdav/\nConsider the following warnings:\n\n"
        for warn in warnings:
            msg += "\t * %s\n" %warn
999
        ret = flocklab.SUCCESS
Reto Da Forno's avatar
Reto Da Forno committed
1000
    else: