flocklab_dispatcher.py 77.1 KB
Newer Older
1
2
#! /usr/bin/env python3

3
import sys, os, getopt, errno, threading, shutil, time, datetime, subprocess, tempfile, queue, re, logging, traceback, __main__, types, hashlib, lxml.etree, MySQLdb
4
5
6
import lib.flocklab as flocklab


7
8
logger = None
debug  = False
9
10
11
12
13
14
15
16


##############################################################################
#
# StopTestThread
#
##############################################################################
class StopTestThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
17
18
19
20
21
    """    Thread which calls the test stop script on an observer. 
    """ 
    def __init__(self, obskey, obsdict_key, errors_queue, testid):
        threading.Thread.__init__(self) 
        self._obskey        = obskey
22
23
        self._obsdict_key   = obsdict_key
        self._errors_queue  = errors_queue
Reto Da Forno's avatar
Reto Da Forno committed
24
25
26
27
28
        self._abortEvent    = threading.Event()
        self._testid        = testid
        
    def run(self):
        try:
29
            logger.debug("Start StopTestThread for observer ID %d" % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
30
31
            errors = []
            # First test if the observer is online and if the SD card is mounted: 
Reto Da Forno's avatar
Reto Da Forno committed
32
            cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), "mount | grep /dev/mmcblk0p1"]
Reto Da Forno's avatar
Reto Da Forno committed
33
34
35
36
37
38
39
40
41
42
43
44
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            while p.returncode == None:
                self._abortEvent.wait(1.0)
                p.poll()
            if self._abortEvent.is_set():
                p.kill()
            else:
                out, err = p.communicate()
            rs = p.returncode
            if (rs != 0):
                if (rs == 1):
                    if ("No such file or directory" in err):
45
                        msg = "SD card on observer ID %s is not mounted, observer will thus be omitted for this test." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
46
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
47
                        msg = "Observer ID %s is not reachable (returned %d: %s, %s)." % (self._obsdict_key[self._obskey][1], rs, out, err)
Reto Da Forno's avatar
Reto Da Forno committed
48
                else:
49
                    msg = "Observer ID %s is not responsive (SSH returned %d)." % (self._obsdict_key[self._obskey][1], rs)
Reto Da Forno's avatar
Reto Da Forno committed
50
51
52
53
                errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                logger.error(msg)
            else:
                # Call the script on the observer which stops the test:
Reto Da Forno's avatar
Reto Da Forno committed
54
                remote_cmd = flocklab.config.get("observer", "stoptestscript") + " --testid=%d" % self._testid
Reto Da Forno's avatar
Reto Da Forno committed
55
56
                if debug:
                    remote_cmd += " --debug"
Reto Da Forno's avatar
Reto Da Forno committed
57
                cmd = ['ssh' ,'%s' % (self._obsdict_key[self._obskey][2]), remote_cmd]
Reto Da Forno's avatar
Reto Da Forno committed
58
59
60
61
62
63
64
65
66
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
                while p.returncode == None:
                    self._abortEvent.wait(1.0)
                    p.poll()
                if self._abortEvent.is_set():
                    p.kill()
                else:
                    out, err = p.communicate()
                rs = p.returncode
67
                if (rs == flocklab.SUCCESS):
68
                    logger.debug("Test stop script on observer ID %s succeeded." %(self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
69
                elif (rs == 255):
70
                    msg = "Observer ID %s is not reachable, thus not able to stop test. Dataloss occurred possibly for this observer." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
71
72
73
                    errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                    logger.error(msg)
                else:
Reto Da Forno's avatar
Reto Da Forno committed
74
                    errors.append(("Test stop script on observer ID %s failed with error code %d." % (str(self._obsdict_key[self._obskey][1]), rs), rs, self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
75
                    logger.error("Test stop script on observer ID %s failed with error code %d:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out).strip()))
76
                    logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
77
78
        except:
            logger.debug("Exception: %s, %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
79
80
81
82
            # Main thread requested abort.
            # Close a possibly still running subprocess:
            if (p is not None) and (p.poll() is not None):
                p.kill()
83
            msg = "StopTestThread for observer ID %d aborted." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
84
85
86
87
88
89
90
91
            errors.append((msg, errno.ECOMM, self._obsdict_key[self._obskey][1]))
            logger.error(msg)
        finally:
            if (len(errors) > 0):
                self._errors_queue.put((self._obskey, errors))
            
    def abort(self):
        self._abortEvent.set()
92
93
94
95
96
97
98
99
100
101
### END StopTestThread



##############################################################################
#
# StartTestThread
#
##############################################################################
class StartTestThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
102
103
104
    """    Thread which uploads all config files to an observer and
        starts the test on the observer. 
    """ 
Reto Da Forno's avatar
Reto Da Forno committed
105
    def __init__(self, obskey, obsdict_key, xmldict_key, imagedict_key, errors_queue, testid):
Reto Da Forno's avatar
Reto Da Forno committed
106
107
        threading.Thread.__init__(self) 
        self._obskey        = obskey
Reto Da Forno's avatar
Reto Da Forno committed
108
109
110
111
        self._obsdict_key   = obsdict_key
        self._xmldict_key   = xmldict_key
        self._imagedict_key = imagedict_key
        self._errors_queue  = errors_queue
Reto Da Forno's avatar
Reto Da Forno committed
112
113
114
115
116
        self._abortEvent    = threading.Event()
        self._testid        = testid
        
    def run(self):
        errors = []
117
118
        testconfigfolder = "%s/%d" % (flocklab.config.get("observer", "testconfigfolder"), self._testid)
        obsdataport      = flocklab.config.getint('serialproxy', 'obsdataport')
Reto Da Forno's avatar
Reto Da Forno committed
119
        try:
Reto Da Forno's avatar
Reto Da Forno committed
120
            logger.debug("Start StartTestThread for observer ID %d" % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
121
            # First test if the observer is online and if the SD card is mounted: 
122
            cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), "ls ~/data/ && mkdir %s" % testconfigfolder]
Reto Da Forno's avatar
Reto Da Forno committed
123
124
125
126
127
128
129
130
131
132
133
134
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            while p.returncode == None:
                self._abortEvent.wait(1.0)
                p.poll()
            if self._abortEvent.is_set():
                p.kill()
            else:
                out, err = p.communicate()
            rs = p.returncode
            if (rs != 0):
                if (rs == 1):
                    if ("No such file or directory" in err):
135
                        msg = "SD card on observer ID %s is not mounted, observer will thus be omitted for this test." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
136
                    else:
137
                        msg = "Observer ID %s is not reachable, it will thus be omitted for this test (returned: %d: %s, %s)." % (self._obsdict_key[self._obskey][1], rs, out, err)
Reto Da Forno's avatar
Reto Da Forno committed
138
                else:
139
                    msg = "Observer ID %s is not responsive, it will thus be omitted for this test (SSH returned %d). Command: %s" % (self._obsdict_key[self._obskey][1], rs, " ".join(cmd))
Reto Da Forno's avatar
Reto Da Forno committed
140
141
142
143
144
145
146
147
148
149
                errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                logger.error(msg)
            else:
                fileuploadlist = [self._xmldict_key[self._obskey][0]]
                if self._obskey in list(self._imagedict_key.keys()):
                    for image in self._imagedict_key[self._obskey]:
                        fileuploadlist.append(image[0])
                # Now upload the image and XML config file:
                cmd = ['scp', '-q']
                cmd.extend(fileuploadlist)
Reto Da Forno's avatar
Reto Da Forno committed
150
                cmd.append('%s:%s/.' % (self._obsdict_key[self._obskey][2], testconfigfolder))
Reto Da Forno's avatar
Reto Da Forno committed
151
152
153
154
155
156
157
                p = subprocess.Popen(cmd)
                while p.returncode == None:
                    self._abortEvent.wait(1.0)
                    p.poll()
                if self._abortEvent.is_set():
                    p.kill()
                rs = p.returncode
158
                if (rs != flocklab.SUCCESS):
159
                    msg = "Upload of target image and config XML to observer ID %s failed with error number %d\nTried to execute: %s" % (self._obsdict_key[self._obskey][1], rs, (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
160
161
162
                    errors.append((msg, rs, self._obsdict_key[self._obskey][1]))
                    logger.error(msg)
                else:
163
                    logger.debug("Upload of target image and config XML to observer ID %s succeeded." % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
164
                    # Run the script on the observer which starts the test:
Reto Da Forno's avatar
Reto Da Forno committed
165
                    remote_cmd = flocklab.config.get("observer", "starttestscript") + " --testid=%d --xml=%s/%s --serialport=%d" % (self._testid, testconfigfolder, os.path.basename(self._xmldict_key[self._obskey][0]), obsdataport)
Reto Da Forno's avatar
Reto Da Forno committed
166
167
                    if debug:
                        remote_cmd += " --debug"
Reto Da Forno's avatar
Reto Da Forno committed
168
                    cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), remote_cmd]
Reto Da Forno's avatar
Reto Da Forno committed
169
170
171
172
173
174
                    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
                    while p.returncode == None:
                        self._abortEvent.wait(1.0)
                        p.poll()
                    if self._abortEvent.is_set():
                        p.kill()
Reto Da Forno's avatar
Reto Da Forno committed
175
                        logger.debug("Abort is set, start test process for observer %s killed." % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
176
177
178
                    else:
                        out, err = p.communicate()
                    rs = p.wait()
179
                    if rs != flocklab.SUCCESS:
Reto Da Forno's avatar
Reto Da Forno committed
180
181
                        errors.append(("Test start script on observer ID %s failed with error code %d." % (self._obsdict_key[self._obskey][1], rs), rs, self._obsdict_key[self._obskey][1]))
                        logger.error("Test start script on observer ID %s failed with error code %d and message:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out)))
Reto Da Forno's avatar
Reto Da Forno committed
182
                    else:
183
184
185
186
187
188
189
190
                        logger.debug("Test start script on observer ID %s succeeded." % (self._obsdict_key[self._obskey][1]))
                    # Remove image file and xml on server:
                    os.remove(self._xmldict_key[self._obskey][0])
                    logger.debug("Removed XML config %s for observer ID %s" % (self._xmldict_key[self._obskey][0], self._obsdict_key[self._obskey][1]))
                    if self._obskey in list(self._imagedict_key.keys()):
                        for image in self._imagedict_key[self._obskey]:
                            os.remove(image[0])
                            logger.debug("Removed target image %s for observer ID %s" % (self._imagedict_key[self._obskey][0], self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
191
            
Reto Da Forno's avatar
Reto Da Forno committed
192
193
        except:
            logger.debug("Exception: %s, %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
194
195
196
197
            # Main thread requested abort.
            # Close a possibly still running subprocess:
            if (p is not None) and (p.poll() is not None):
                p.kill()
198
            msg = "StartTestThread for observer ID %d aborted." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
199
200
201
202
203
204
205
206
207
            errors.append((msg, errno.ECOMM, self._obsdict_key[self._obskey][1]))
            logger.error(msg)
        finally:
            if (len(errors) > 0):
                self._errors_queue.put((self._obskey, errors))
        
    def abort(self):
        self._abortEvent.set()
    
208
209
210
211
212
213
214
215
216
217
### END StartTestThread



##############################################################################
#
# start_test
#
##############################################################################
def start_test(testid, cur, cn, obsdict_key, obsdict_id):
Reto Da Forno's avatar
Reto Da Forno committed
218
219
220
221
222
223
    errors = []
    warnings = []
    
    try:    
        logger.debug("Entering start_test() function...")
        # First, validate the XML file again. If validation fails, return immediately:
Reto Da Forno's avatar
Reto Da Forno committed
224
        cmd = [flocklab.config.get('dispatcher','validationscript'), '--testid=%d'%testid]
Reto Da Forno's avatar
Reto Da Forno committed
225
226
227
228
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
        out, err = p.communicate()
        rs = p.returncode
        if rs != 0:
229
230
            logger.error("Error %s returned from %s" % (str(rs), flocklab.config.get('dispatcher','validationscript')))
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
231
232
233
234
235
236
237
238
239
240
            errors.append("Validation of XML failed. Output of script was: %s %s" % (str(out), str(err)))
        
        if len(errors) == 0:
            # Update DB status ---
            # Update the status of the test in the db:
            flocklab.set_test_status(cur, cn, testid, 'preparing')
            
            # Get start/stop time ---
            cur.execute("SELECT `time_start_wish`, `time_end_wish`, `owner_fk` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
            # Times are going to be of datetime type:
241
            ret = cur.fetchone()
Reto Da Forno's avatar
Reto Da Forno committed
242
243
244
245
246
247
248
249
250
            starttime = ret[0]
            stoptime  = ret[1]
            owner_fk = ret[2]
            logger.debug("Got start time wish for test from database: %s" %starttime)
            logger.debug("Got end time wish for test from database: %s" %stoptime)
            
            # Image processing ---
            # Get all images from the database:
            imagedict_key = {}
251
            sql_image =     """    SELECT `t`.`binary`, `m`.`observer_fk`, `m`.`node_id`, LOWER(`a`.`architecture`), `t`.`serv_targetimages_key`, LOWER(`p`.`name`) AS `platname`, `a`.`core` AS `core`
Reto Da Forno's avatar
Reto Da Forno committed
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
                                FROM `tbl_serv_targetimages` AS `t` 
                                LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `m` 
                                    ON `t`.`serv_targetimages_key` = `m`.`targetimage_fk` 
                                LEFT JOIN `tbl_serv_platforms` AS `p`
                                    ON `t`.`platforms_fk` = `p`.`serv_platforms_key`
                                LEFT JOIN `tbl_serv_operatingsystems` AS `o`
                                    ON `t`.`operatingsystems_fk` = `o`.`serv_operatingsystems_key`
                                LEFT JOIN `tbl_serv_architectures` AS `a`
                                    ON `t`.`core` = `a`.`core` AND `p`.`serv_platforms_key` = `a`.`platforms_fk`
                                WHERE `m`.`test_fk` = %d
                            """    
            cur.execute(sql_image%testid)
            ret = cur.fetchall()
            for r in ret:
                binary      = r[0]
                obs_fk      = r[1]
268
                obs_id      = obsdict_key[obs_fk][1]
Reto Da Forno's avatar
Reto Da Forno committed
269
270
                node_id     = r[2]
                arch        = r[3]
271
272
273
                tgimage_key = r[4]
                platname    = r[5]
                core        = r[6]
Reto Da Forno's avatar
Reto Da Forno committed
274
275
276
277
278
279
280
281
                
                # Prepare image ---
                (fd, imagepath) = tempfile.mkstemp()
                binpath = "%s" %(os.path.splitext(imagepath)[0]) 
                imagefile = os.fdopen(fd, 'w+b')
                imagefile.write(binary)
                imagefile.close()
                removeimage = True
282
                logger.debug("Got target image ID %s for observer ID %s with node ID %s from database and wrote it to temp file %s (hash %s)" % (str(tgimage_key), str(obs_id), str(node_id), imagepath, hashlib.sha1(binary).hexdigest()))
Reto Da Forno's avatar
Reto Da Forno committed
283
284
                
                # Convert image to binary format and, depending on operating system and platform architecture, write the node ID (if specified) to the image:
285
                logger.debug("Found %s target architecture on platform %s for observer ID %s (node ID to be used: %s)." % (arch, platname, str(obs_id), str(node_id)))
Reto Da Forno's avatar
Reto Da Forno committed
286
                set_symbols_tool = flocklab.config.get('targetimage', 'setsymbolsscript')
287
288
289
290
291
292
293
294
295
296
297
298
299
300
                symbol_node_id = None
                if (node_id != None):
                    # for backwards compatiblity: check whether symbol TOS_NODE_ID exists in the binary
                    p = subprocess.Popen(['objdump', '-t', imagepath], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
                    (out, err) = p.communicate()
                    if p.returncode == 0:
                        if "TOS_NODE_ID" in out:
                            logger.debug("Found symbol TOS_NODE_ID in binary file '%s'." % (imagepath))
                            symbol_node_id = "TOS_NODE_ID"
                        elif "FLOCKLAB_NODE_ID" in out:
                            logger.debug("Found symbol FLOCKLAB_NODE_ID in binary file '%s'." % (imagepath))
                            symbol_node_id = "FLOCKLAB_NODE_ID"
                    else:
                        logger.warn("Failed to search for TOS_NODE_ID in binary file '%s'." % (imagepath))
Reto Da Forno's avatar
Reto Da Forno committed
301
                    symbol_node_id = "TOS_NODE_ID"
302
                # Convert ELF file to ihex and set node ID if necessary
Reto Da Forno's avatar
Reto Da Forno committed
303
                if (arch == 'msp430'):
Reto Da Forno's avatar
Reto Da Forno committed
304
                    binutils_path = flocklab.config.get('targetimage', 'binutils_msp430')
Reto Da Forno's avatar
Reto Da Forno committed
305
306
                    binpath = "%s.ihex"%binpath
                    if symbol_node_id:
Reto Da Forno's avatar
Reto Da Forno committed
307
                        cmd = ['%s' % (set_symbols_tool), '--objcopy', '%s/msp430-objcopy' % (binutils_path), '--objdump', '%s/msp430-objdump' % (binutils_path), '--target', 'ihex', imagepath, binpath, '%s=%s' % (symbol_node_id, node_id), 'ActiveMessageAddressC$addr=%s' % (node_id), 'ActiveMessageAddressC__addr=%s' % (node_id)]
Reto Da Forno's avatar
Reto Da Forno committed
308
309
310
311
312
                        try:
                            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                            rs = p.wait()
                            if rs != 0:
                                logger.error("Error %d returned from %s" % (rs, set_symbols_tool))
313
                                logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
314
315
316
317
318
                                errors.append("Could not set node ID %s for target image %s" %(str(node_id), str(tgimage_key)))
                            else:
                                logger.debug("Set symbols and converted file to ihex.")
                                # Remove the temporary exe file
                                os.remove("%s.exe"%imagepath)
319
                                #logger.debug("Removed intermediate image %s.exe" % (str(imagepath)))
Reto Da Forno's avatar
Reto Da Forno committed
320
                        except OSError as err:
321
                            msg = "Error in subprocess: tried calling %s. Error was: %s" % (str(cmd), str(err))
Reto Da Forno's avatar
Reto Da Forno committed
322
323
324
325
                            logger.error(msg)
                            errors.append(msg)
                            removeimage = False
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
326
                        cmd = ['%s/msp430-objcopy' % (binutils_path), '--output-target', 'ihex', imagepath, binpath]
Reto Da Forno's avatar
Reto Da Forno committed
327
328
329
330
331
                        try:
                            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                            rs = p.wait()
                            if rs != 0:
                                logger.error("Error %d returned from msp430-objcopy" %rs)
332
                                logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
333
334
335
336
                                errors.append("Could not convert target image %s to ihex" %str(tgimage_key))
                            else:
                                logger.debug("Converted file to ihex.")
                        except OSError as err:
337
                            msg = "Error in subprocess: tried calling %s. Error was: %s" % (str(cmd), str(err))
Reto Da Forno's avatar
Reto Da Forno committed
338
339
340
341
342
343
344
345
346
347
348
                            logger.error(msg)
                            errors.append(msg)
                            removeimage = False
                elif (arch == 'arm'):
                    if (platname == 'dpp'):
                        imgformat = 'ihex'
                        binpath = "%s.ihex"%binpath
                    else:
                        imgformat = 'binary'
                        binpath = "%s.bin"%binpath
                    # Set library path for arm-binutils:
Reto Da Forno's avatar
Reto Da Forno committed
349
                    arm_binutils_path = flocklab.config.get('targetimage', 'binutils_arm')
Reto Da Forno's avatar
Reto Da Forno committed
350
351
352
                    arm_env = os.environ
                    if 'LD_LIBRARY_PATH' not in arm_env:
                        arm_env['LD_LIBRARY_PATH'] = ''
Reto Da Forno's avatar
Reto Da Forno committed
353
                    arm_env['LD_LIBRARY_PATH'] += ':%s/%s' % (arm_binutils_path, "usr/x86_64-linux-gnu/arm-linux-gnu/lib")
Reto Da Forno's avatar
Reto Da Forno committed
354
                    if symbol_node_id:
Reto Da Forno's avatar
Reto Da Forno committed
355
                        cmd = ['%s' % (set_symbols_tool), '--objcopy', '%s/%s' % (arm_binutils_path, "usr/bin/arm-linux-gnu-objcopy"), '--objdump', '%s/%s' % (arm_binutils_path, "usr/bin/arm-linux-gnu-objdump"), '--target', imgformat, imagepath, binpath, '%s=%s' % (symbol_node_id, node_id), 'ActiveMessageAddressC$addr=%s' % (node_id), 'ActiveMessageAddressC__addr=%s' % (node_id)]
Reto Da Forno's avatar
Reto Da Forno committed
356
357
358
359
360
                        try:
                            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=arm_env)
                            rs = p.wait()
                            if rs != 0:
                                logger.error("Error %d returned from %s" % (rs, set_symbols_tool))
361
                                logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
362
363
364
365
                                errors.append("Could not set node ID %s for target image %s" %(str(node_id), str(tgimage_key)))
                            else:
                                logger.debug("Set symbols and converted file to bin.")
                        except OSError as err:
366
                            msg = "Error in subprocess: tried calling %s. Error was: %s" % (str(cmd), str(err))
Reto Da Forno's avatar
Reto Da Forno committed
367
368
369
370
                            logger.error(msg)
                            errors.append(msg)
                            removeimage = False
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
371
                        cmd = ['%s/%s' % (arm_binutils_path, "usr/bin/arm-linux-gnu-objcopy"), '--output-target', imgformat, imagepath, binpath]
Reto Da Forno's avatar
Reto Da Forno committed
372
373
374
375
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=arm_env)
                        rs = p.wait()
                        if rs != 0:
                            logger.error("Error %d returned from arm-linux-gnu-objcopy" %rs)
376
                            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
                            errors.append("Could not convert target image %s to bin" %str(tgimage_key))
                        else:
                            logger.debug("Converted file to bin.")
                else:
                    msg = "Unknown architecture %s found. The original target image (ID %s) file will be used without modification." %(arch, str(tgimage_key))
                    errors.append(msg)
                    logger.error(msg)
                    orig = open(imagepath, "r+b")
                    binfile = open(binpath, "w+b")
                    binfile.write(orig.read())
                    orig.close()
                    binfile.close()
                    logger.debug("Copied image to binary file without modification.")
                
                # Remove the original file which is not used anymore:
                if removeimage:
                    os.remove(imagepath)
394
                    #logger.debug("Removed image %s" % (str(imagepath)))
Reto Da Forno's avatar
Reto Da Forno committed
395
                else:
396
                    logger.warn("Image %s has not been removed." % (str(imagepath)))
Reto Da Forno's avatar
Reto Da Forno committed
397
398
399
400
401
402
403
404
                
                
                # Slot detection ---
                # Find out which slot number to use on the observer.
                #logger.debug("Detecting adapter for %s on observer ID %s" %(platname, obs_id))
                ret = flocklab.get_slot(cur, int(obs_fk), platname)
                if ret in range(1,5):
                    slot = ret
405
                    logger.debug("Found adapter for %s on observer ID %s in slot %d" % (platname, obs_id, slot))
Reto Da Forno's avatar
Reto Da Forno committed
406
407
408
409
410
411
412
413
414
415
416
417
418
419
                elif ret == 0:
                    slot = None
                    msg = "Could not find an adapter for %s on observer ID %s" %(platname, obs_id)
                    errors.append(msg)
                    logger.error(msg)
                else:
                    slot = None
                    msg = "Error when detecting adapter for %s on observer ID %s: function returned %d" %(platname, obs_id, ret)
                    errors.append(msg)
                    logger.error(msg)
                        
                # Write the dictionary for the image:
                if not obs_fk in imagedict_key:
                    imagedict_key[obs_fk] = []
420
                imagedict_key[obs_fk].append((binpath, slot, platname, 0.0, core))
Reto Da Forno's avatar
Reto Da Forno committed
421
422
423
424
425
426
427
428
429
430
431
432
                
            logger.info("Processed all target images from database.")
                
            # XML processing ---
            # Get the XML config from the database and generate a separate file for every observer used:
            cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
            ret = cur.fetchone()
            if not ret:
                msg = "No XML found in database for testid %d." %testid
                errors.append(msg)
                logger.error(msg)
            else:
433
434
                parser = lxml.etree.XMLParser(remove_comments=True)
                tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
435
                ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
                logger.debug("Got XML from database.")
                # Create XML files ---
                # Create an empty XML config file for every observer used and organize them in a dictionary:
                xmldict_key = {}
                for obs_key, obs_id, obs_ether in obsdict_key.values():
                    (fd, xmlpath) = tempfile.mkstemp()
                    xmlfhand = os.fdopen(fd, 'w+')
                    xmldict_key[obs_key] = (xmlpath, xmlfhand)
                    xmlfhand.write('<?xml version="1.0" encoding="UTF-8"?>\n\n<obsConf>\n\n')
                # Go through the blocks of the XML file and write the configs to the affected observer XML configs:
                # targetConf ---
                targetconfs = tree.xpath('//d:targetConf', namespaces=ns)
                if not targetconfs:
                    msg = "no <targetConf> element found in XML config (wrong namespace?)"
                    errors.append(msg)
                    logger.error(msg)
                for targetconf in targetconfs:
                    obsids = targetconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                    ret = targetconf.xpath('d:voltage', namespaces=ns)
                    if ret:
                        voltage = ret[0].text.strip()
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
458
                        voltage = str(flocklab.config.get("dispatcher", "default_tg_voltage"))
Reto Da Forno's avatar
Reto Da Forno committed
459
460
461
462
463
464
465
466
467
468
469
470
                    ret = targetconf.xpath('d:noImage', namespaces=ns)
                    if ret:
                        noImageSlot = ret[0].text.strip()
                    else:
                        noImageSlot = None
                    for obsid in obsids:
                        obsid = int(obsid)
                        obskey = obsdict_id[obsid][0]
                        xmldict_key[obskey][1].write("<obsTargetConf>\n")
                        xmldict_key[obskey][1].write("\t<voltage>%s</voltage>\n"%voltage)
                        if noImageSlot:
                            slot = noImageSlot
471
                            xmldict_key[obskey][1].write("\t<slotnr>%s</slotnr>\n" % (slot))
Reto Da Forno's avatar
Reto Da Forno committed
472
                        else:
473
                            xmldict_key[obskey][1].write("\t<firmware>%s</firmware>\n" % (imagedict_key[obskey][0][3]))
Reto Da Forno's avatar
Reto Da Forno committed
474
                            for coreimage in imagedict_key[obskey]:
475
                                xmldict_key[obskey][1].write("\t<image core=\"%d\">%s/%d/%s</image>\n" % (coreimage[4], flocklab.config.get("observer", "testconfigfolder"),testid, os.path.basename(coreimage[0])))
476
477
                            xmldict_key[obskey][1].write("\t<slotnr>%s</slotnr>\n" % (imagedict_key[obskey][0][1]))
                            xmldict_key[obskey][1].write("\t<platform>%s</platform>\n" % (imagedict_key[obskey][0][2]))
478
                            xmldict_key[obskey][1].write("\t<os>%s</os>\n" % (imagedict_key[obskey][0][2]))
Reto Da Forno's avatar
Reto Da Forno committed
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
                            slot = imagedict_key[obskey][0][1]
                        xmldict_key[obskey][1].write("</obsTargetConf>\n\n")
                        #logger.debug("Wrote obsTargetConf XML for observer ID %s" %obsid)
                        # update test_image mapping with slot information
                        cur.execute("UPDATE `tbl_serv_map_test_observer_targetimages` SET `slot` = %s WHERE `observer_fk` = %d AND `test_fk`=%d" % (slot, obskey, testid))
                        cn.commit()
                
                # serialConf ---
                srconfs = tree.xpath('//d:serialConf', namespaces=ns)
                serialProxyUsed = False
                if srconfs:
                    # only use serialproxy if remote IP specified in xml
                    if tree.xpath('//d:serialConf/d:remoteIp', namespaces=ns):
                        serialProxyUsed = True
                    for srconf in srconfs:
                        obsids = srconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        xmlblock = "<obsSerialConf>\n"
                        port = srconf.xpath('d:port', namespaces=ns)
                        if port:
                            port = srconf.xpath('d:port', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<port>%s</port>\n" %port
                        baudrate = srconf.xpath('d:baudrate', namespaces=ns)
                        if baudrate:
                            baudrate = srconf.xpath('d:baudrate', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<baudrate>%s</baudrate>\n" %baudrate
                        mode = srconf.xpath('d:mode', namespaces=ns)
                        if mode:
                            mode = srconf.xpath('d:mode', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<mode>%s</mode>\n" %mode
                        xmlblock += "</obsSerialConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                            #logger.debug("Wrote obsSerialConf XML for observer ID %s" %obsid)
                else:
515
                    logger.debug("No <serialConf> found, not using serial service.")
Reto Da Forno's avatar
Reto Da Forno committed
516
517
518
                
                # gpioTracingConf ---
                gmconfs = tree.xpath('//d:gpioTracingConf', namespaces=ns)
519
520
521
522
                if gmconfs:
                    for gmconf in gmconfs:
                        obsids = gmconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        pinconfs = gmconf.xpath('d:pinConf', namespaces=ns)
523
                        pinlist = gmconf.xpath('d:pins', namespaces=ns)
524
                        xmlblock = "<obsGpioMonitorConf>\n"
525
526
                        if pinlist:
                            xmlblock += "\t<pins>" + pinlist[0].text.strip() + "</pins>\n"
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
                        for pinconf in pinconfs:
                            pin  = pinconf.xpath('d:pin', namespaces=ns)[0].text.strip()
                            edge = pinconf.xpath('d:edge', namespaces=ns)[0].text.strip()
                            mode = pinconf.xpath('d:mode', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<pinConf>\n\t\t<pin>%s</pin>\n\t\t<edge>%s</edge>\n\t\t<mode>%s</mode>\n" %(pin, edge, mode)
                            cb_gs_add = pinconf.xpath('d:callbackGpioActAdd', namespaces=ns)
                            if cb_gs_add:
                                pin = cb_gs_add[0].xpath('d:pin', namespaces=ns)[0].text.strip()
                                level = cb_gs_add[0].xpath('d:level', namespaces=ns)[0].text.strip()
                                offsets = cb_gs_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
                                offsetms = cb_gs_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
                                xmlblock += "\t\t<callbackGpioSetAdd>\n\t\t\t<pin>%s</pin>\n\t\t\t<level>%s</level>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackGpioSetAdd>\n" %(pin, level, offsets, offsetms)
                            cb_pp_add = pinconf.xpath('d:callbackPowerProfAdd', namespaces=ns)
                            if cb_pp_add:
                                duration = cb_pp_add[0].xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()
                                offsets = cb_pp_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
                                offsetms = cb_pp_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
                                xmlblock += "\t\t<callbackPowerprofAdd>\n\t\t\t<duration>%s</duration>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackPowerprofAdd>\n" %(duration, offsets, offsetms)
                            xmlblock += "\t</pinConf>\n"
                        xmlblock += "</obsGpioMonitorConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                          #logger.debug("Wrote obsGpioMonitorConf XML for observer ID %s" %obsid)
                else:
                    logger.debug("No <gpioTracingConf> found, not using GPIO tracing service.")
Reto Da Forno's avatar
Reto Da Forno committed
554
555
556
557
558
559
                        
                # gpioActuationConf ---
                # Create 2 pin settings for every observer used in the test: 
                #        1) Pull reset pin of target low when test is to start
                #        2) Pull reset pin of target high when test is to stop
                xmlblock = "<obsGpioSettingConf>\n"
Reto Da Forno's avatar
Reto Da Forno committed
560
                startdatetime = starttime.strftime(flocklab.config.get("observer", "timeformat"))
Reto Da Forno's avatar
Reto Da Forno committed
561
562
                startmicrosecs = starttime.microsecond
                xmlblock += "\t<pinConf>\n\t\t<pin>RST</pin>\n\t\t<level>low</level>\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%d</absoluteMicrosecs>\n\t\t</absoluteTime>\n\t\t<intervalMicrosecs>0</intervalMicrosecs>\n\t\t<count>1</count>\n\t</pinConf>\n" %(startdatetime, startmicrosecs)
Reto Da Forno's avatar
Reto Da Forno committed
563
                stopdatetime = stoptime.strftime(flocklab.config.get("observer", "timeformat"))
Reto Da Forno's avatar
Reto Da Forno committed
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
                stopmicrosecs = stoptime.microsecond
                xmlblock += "\t<pinConf>\n\t\t<pin>RST</pin>\n\t\t<level>high</level>\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%d</absoluteMicrosecs>\n\t\t</absoluteTime>\n\t\t<intervalMicrosecs>0</intervalMicrosecs>\n\t\t<count>1</count>\n\t</pinConf>\n" %(stopdatetime, stopmicrosecs)
                for obskey in obsdict_key.keys():
                    xmldict_key[obskey][1].write(xmlblock)
                # Now write the per-observer config:
                gsconfs = tree.xpath('//d:gpioActuationConf', namespaces=ns)
                for gsconf in gsconfs:
                    xmlblock = ""
                    obsids = gsconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                    pinconfs = gsconf.xpath('d:pinConf', namespaces=ns)
                    for pinconf in pinconfs:
                        pin  = pinconf.xpath('d:pin', namespaces=ns)[0].text.strip()
                        level = pinconf.xpath('d:level', namespaces=ns)[0].text.strip()
                        abs_tim = pinconf.xpath('d:absoluteTime', namespaces=ns)
                        if abs_tim:
                            absdatetime = absolute2absoluteUTC_time(abs_tim[0].xpath('d:absoluteDateTime', namespaces=ns)[0].text.strip())
                            ret = abs_tim[0].xpath('d:absoluteMicrosecs', namespaces=ns)
                            if ret:
                                absmicrosec = int(ret[0].text.strip())
                            else:
                                absmicrosec = 0
                        rel_tim = pinconf.xpath('d:relativeTime', namespaces=ns)
                        if rel_tim:
                            relsec = int(rel_tim[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip())
                            ret = rel_tim[0].xpath('d:offsetMicrosecs', namespaces=ns)
                            if ret:
                                relmicrosec = int(ret[0].text.strip())
                            else:
                                relmicrosec = 0
                            # Relative times need to be converted into absolute times:
                            absmicrosec, absdatetime = relative2absolute_time(starttime, relsec, relmicrosec)    
                        periodic = pinconf.xpath('d:periodic', namespaces=ns)
                        if periodic:
                            interval = int(periodic[0].xpath('d:intervalMicrosecs', namespaces=ns)[0].text.strip())
                            count = int(periodic[0].xpath('d:count', namespaces=ns)[0].text.strip())
                        else:
                            interval = 0
                            count = 1
                        xmlblock += "\t<pinConf>\n\t\t<pin>%s</pin>\n\t\t<level>%s</level>\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%s</absoluteMicrosecs>\n\t\t</absoluteTime>\n\t\t<intervalMicrosecs>%i</intervalMicrosecs>\n\t\t<count>%i</count>\n\t</pinConf>\n" %(pin, level, absdatetime, absmicrosec, interval, count)
                    for obsid in obsids:
                        obsid = int(obsid)
                        obskey = obsdict_id[obsid][0]
                        xmldict_key[obskey][1].write(xmlblock)
                        #logger.debug("Wrote obsGpioSettingConf XML for observer ID %s" %obsid)
                xmlblock = "</obsGpioSettingConf>\n\n"
                for obskey in obsdict_key.keys():
                    xmldict_key[obskey][1].write(xmlblock)
611
                
Reto Da Forno's avatar
Reto Da Forno committed
612
613
                # powerProfilingConf ---
                ppconfs = tree.xpath('//d:powerProfilingConf', namespaces=ns)
614
615
616
617
618
619
                if ppconfs:
                    for ppconf in ppconfs:
                        obsids = ppconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        profconfs = ppconf.xpath('d:profConf', namespaces=ns)
                        xmlblock = "<obsPowerprofConf>\n"
                        for profconf in profconfs:
Reto Da Forno's avatar
Reto Da Forno committed
620
621
622
623
                            duration = profconf.xpath('d:duration', namespaces=ns)
                            if duration:
                                duration = duration[0].text.strip()
                            else:
Reto Da Forno's avatar
Reto Da Forno committed
624
625
626
627
628
629
630
631
                                try:
                                    duration = int(profconf.xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()) / 1000
                                except:
                                    duration = 0
                            #xmlblock += "\t<profConf>\n"
                            xmlblock += "\t<duration>%s</duration>" % duration
                            # calculate the sampling start
                            offset  = profconf.xpath('d:offset', namespaces=ns)
632
                            rel_tim = profconf.xpath('d:relativeTime', namespaces=ns)
Reto Da Forno's avatar
Reto Da Forno committed
633
634
635
636
637
638
639
                            abs_tim = profconf.xpath('d:absoluteTime', namespaces=ns)
                            if offset:
                                offset = int(offset[0].text.strip())
                                tstart = datetime.datetime.timestamp(starttime + datetime.timedelta(seconds=offset))
                            elif abs_tim:
                                tstart = datetime.datetime.timestamp(flocklab.get_xml_timestamp(abs_tim[0].xpath('d:absoluteDateTime', namespaces=ns)[0].text.strip()))
                            elif rel_tim:
640
                                relsec = int(rel_tim[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip())
Reto Da Forno's avatar
Reto Da Forno committed
641
642
643
644
                                tstart = datetime.datetime.timestamp(starttime + datetime.timedelta(seconds=relsec))
                            xmlblock += "\n\t<starttime>%s</starttime>" % (tstart)
                            # check if config contains samplingRate:
                            samplingrate    = profconf.xpath('d:samplingRate', namespaces=ns)
645
                            samplingdivider = profconf.xpath('d:samplingDivider', namespaces=ns)
Reto Da Forno's avatar
Reto Da Forno committed
646
647
648
649
                            if samplingrate:
                                samplingrate = samplingrate[0].text.strip()
                                xmlblock += "\n\t<samplingRate>%s</samplingRate>" % samplingrate
                            elif samplingdivider:
650
                                samplingdivider = samplingdivider[0].text.strip()
Reto Da Forno's avatar
Reto Da Forno committed
651
                                xmlblock += "\n\t<samplingDivider>%s</samplingDivider>" % samplingdivider
Reto Da Forno's avatar
Reto Da Forno committed
652
                            else:
Reto Da Forno's avatar
Reto Da Forno committed
653
654
655
656
657
                                samplingdivider = flocklab.config.get('dispatcher', 'default_sampling_divider')
                                xmlblock += "\n\t<samplingDivider>%s</samplingDivider>" % samplingdivider
                            #xmlblock += "\n\t</profConf>\n"
                            break    # for now, only parse the first block
                        xmlblock += "\n</obsPowerprofConf>\n\n"
658
659
660
661
662
663
664
665
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                            #logger.debug("Wrote obsPowerprofConf XML for observer ID %s" %obsid)
                else:
                    logger.debug("No <powerProfilingConf> found, not using power profiling service.")
                 
Reto Da Forno's avatar
Reto Da Forno committed
666
                logger.debug("Wrote all observer XML configs.")
667
                
Reto Da Forno's avatar
Reto Da Forno committed
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
                # Close XML files ---
                for xmlpath, xmlfhand in xmldict_key.values():
                    xmlfhand.write("</obsConf>\n")
                    xmlfhand.close()
                    #logger.debug("Closed observer XML config %s"%xmlpath)
                #logger.debug("Closed all observer XML configs.")
                
        # Upload configs to observers and start test ---
        if len(errors) == 0:
            if not db_register_activity(testid, cur, cn, 'start', iter(obsdict_key.keys())):
                msg = "Could not access all needed observers for testid %d." %testid
                errors.append(msg)
                logger.error(msg)
        if len(errors) == 0:
            # -- START OF CRITICAL SECTION where dispatcher accesses used observers
            # Start a thread for each observer which uploads the config and calls the test start script on the observer
            thread_list = []
            errors_queue = queue.Queue()
            for obskey in obsdict_key.keys():
Reto Da Forno's avatar
Reto Da Forno committed
687
                thread = StartTestThread(obskey, obsdict_key, xmldict_key, imagedict_key, errors_queue, testid)
Reto Da Forno's avatar
Reto Da Forno committed
688
689
690
691
692
693
                thread_list.append((thread, obskey))
                thread.start()
                #DEBUG logger.debug("Started thread for test start on observer ID %s" %(str(obsdict_key[obskey][1])))
            # Wait for all threads to finish:
            for (thread, obskey) in thread_list:
                # Wait max 75% of the setuptime:
Reto Da Forno's avatar
Reto Da Forno committed
694
                thread.join(timeout=(flocklab.config.getint('tests','setuptime')*0.75*60))
Reto Da Forno's avatar
Reto Da Forno committed
695
696
                if thread.isAlive():
                    # Timeout occurred. Signal the thread to abort:
Reto Da Forno's avatar
Reto Da Forno committed
697
                    logger.error("Telling thread for test start on observer ID %s to abort..." % (str(obsdict_key[obskey][1])))
Reto Da Forno's avatar
Reto Da Forno committed
698
699
700
701
702
                    thread.abort()
            # Wait again for the aborted threads:
            for (thread, obskey) in thread_list:    
                thread.join(timeout=10)
                if thread.isAlive():
Reto Da Forno's avatar
Reto Da Forno committed
703
                    msg = "Thread for test start on observer ID %s timed out and will be aborted now." % (str(obsdict_key[obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
704
705
706
707
708
709
710
711
                    errors.append(msg)
                    logger.error(msg)
            # -- END OF CRITICAL SECTION where dispatcher accesses used observers
            db_unregister_activity(testid, cur, cn, 'start')
                
            # Get all errors (if any). Observers which return errors are not regarded as a general error. In this
            # case, the test is just started without the faulty observers if there is at least 1 observer that succeeded:
            obs_error = []
712
713
            #if not errors_queue.empty():
                #logger.error("Queue with errors from test start thread is not empty. Getting errors...")
Reto Da Forno's avatar
Reto Da Forno committed
714
715
716
            while not errors_queue.empty():
                errs = errors_queue.get()
                for err in errs[1]:
717
                    #logger.error("Error from test start thread for observer %s: %s" %(str(err[2]), str(err[0])))
Reto Da Forno's avatar
Reto Da Forno committed
718
719
720
                    obs_error.append(err[2])
                    warnings.append(err[0])
            if len(obs_error) > 0:
Reto Da Forno's avatar
Reto Da Forno committed
721
722
723
724
725
726
727
                # Abort or continue?
                if not flocklab.config.get("dispatcher", "continue_on_error"):
                    msg = "At least one observer failed to start the test, going to abort..."
                    errors.append(msg)
                    logger.error(msg)
                # Check if there is at least 1 observer which succeeded:
                elif (len(obsdict_id) == len(set(obs_error))):
Reto Da Forno's avatar
Reto Da Forno committed
728
729
730
731
732
733
734
735
                    msg = "None of the requested observers could successfully start the test."
                    errors.append(msg)
                    logger.error(msg)
        
        # Start proxy for serial service ---
        if len(errors) == 0:
            if serialProxyUsed:
                # Start serial proxy:
Reto Da Forno's avatar
Reto Da Forno committed
736
                logger.debug("Starting serial proxy...")
Reto Da Forno's avatar
Reto Da Forno committed
737
                cmd = [flocklab.config.get("dispatcher", "serialproxyscript"), "--notify"]
Reto Da Forno's avatar
Reto Da Forno committed
738
739
740
741
742
                if debug: 
                    cmd.append("--debug")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                rs = p.wait()
                if (rs != 0):
743
                    msg = "Serial proxy for test ID %d could not be started (error code %d)." % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
744
745
                    errors.append(msg)
                    logger.error(msg)
746
                    logger.debug("Executed command was: %s" % (str(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
747
748
749
                else:
                    logger.debug("Started serial proxy.")
    
Reto Da Forno's avatar
Reto Da Forno committed
750
        # Start fetcher ---
Reto Da Forno's avatar
Reto Da Forno committed
751
        if len(errors) == 0:
Reto Da Forno's avatar
Reto Da Forno committed
752
753
            logger.debug("Starting fetcher...")
            cmd = [flocklab.config.get("dispatcher", "fetcherscript"), "--testid=%d" % testid]
Reto Da Forno's avatar
Reto Da Forno committed
754
755
756
757
758
            if debug:
                cmd.append("--debug")
            p = subprocess.Popen(cmd)
            rs = p.wait()
            if rs != 0:
Reto Da Forno's avatar
Reto Da Forno committed
759
                msg = "Could not start fetcher for test ID %d. Fetcher returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
760
761
                errors.append(msg)
                logger.error(msg)
762
                logger.error("Tried to execute: %s" % (" ".join(cmd)))
763

Reto Da Forno's avatar
Reto Da Forno committed
764
765
766
767
768
769
770
771
        # check if we're still in time ---
        if len(errors) == 0:
            now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
            cur.execute("SELECT `serv_tests_key` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d AND `time_start_wish` <= '%s'" % (testid, now))
            if cur.fetchone() is not None:
                msg = "Setup for test ID %d took too much time." % (testid)
                errors.append(msg)
                logger.error(msg)
772

Reto Da Forno's avatar
Reto Da Forno committed
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
        # Update DB status, set start time ---
        if len(errors) == 0:
            logger.debug("Setting test status in DB to running...")
            flocklab.set_test_status(cur, cn, testid, 'running')
            cur.execute("UPDATE `tbl_serv_tests` SET `time_start_act` = `time_start_wish` WHERE `serv_tests_key` = %d" %testid)
            cn.commit()
        else:
            logger.debug("Setting test status in DB to aborting...")
            flocklab.set_test_status(cur, cn, testid, 'aborting')
            cur.execute("UPDATE `tbl_serv_tests` SET `time_start_act` = `time_start_wish`, `time_end_act` = UTC_TIMESTAMP() WHERE `serv_tests_key` = %d" %testid)
            cn.commit()
        logger.debug("At end of start_test(). Returning...")
        
        # Set a time for the scheduler to check for the test to stop ---
        # This is done using the 'at' command:
        if len(errors) == 0:
            lag = 5
            # avoid scheduling a scheduler around full minute +/- 5s
            if (stoptime.second+lag) % 60 < 5:
                lag = lag + 5 - ((stoptime.second+lag) % 60)
            elif (stoptime.second+lag) % 60 > 55:
                lag = lag + 60 - ((stoptime.second+lag) % 60) + 5
            # Only schedule scheduler if it's the only one at that time
            cmd = ['atq']
            p = subprocess.Popen(cmd,  stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            out, err = p.communicate()
            rs = p.returncode
            if rs == 0:
801
                #logger.debug("Output of atq is: %s" % (out))
Reto Da Forno's avatar
Reto Da Forno committed
802
803
                stopTimeString = str(stoptime).split()[1]
                if not out or stopTimeString not in out:
804
                    logger.debug("Scheduling scheduler for %s +%ds using at command..." % (stoptime, lag))
Reto Da Forno's avatar
Reto Da Forno committed
805
806
807
                    (fd, tmppath) = tempfile.mkstemp()
                    tmpfile = os.fdopen(fd, 'w')
                    # The at command can only schedule with a minute resolution. Thus let the script sleep for the time required and add some slack:
808
809
                    tmpfile.write("sleep %d;\n" % (stoptime.second+lag))
                    tmpfile.write("%s " % (flocklab.config.get("dispatcher", "schedulerscript")))
Reto Da Forno's avatar
Reto Da Forno committed
810
811
812
813
814
815
816
817
818
819
820
                    if debug:
                        tmpfile.write("--debug ")
                    tmpfile.write(">> /dev/null 2>&1\n")
                    tmpfile.close()
                    # Register the command:
                    cmd = ['at', '-M', '-t', stoptime.strftime('%Y%m%d%H%M'), '-f', tmppath]
                    p = subprocess.Popen(cmd, stderr=subprocess.PIPE)
                    rs = p.wait()
                    # Delete the temp script:
                    os.unlink(tmppath)
                    if rs != 0:
821
                        msg = "Could not schedule scheduler for test ID %d. at command returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
822
823
                        warnings.append(msg)
                        logger.error(msg)
824
                        logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
825
826
827
828
                else:
                    logger.debug("Already scheduler scheduled for %s"%stoptime)
            else:
                logger.debug("Could not execute atq, continue")
829

Reto Da Forno's avatar
Reto Da Forno committed
830
831
        return (errors, warnings)
    except Exception:
832
        msg = "Unexpected error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
833
        logger.error(msg)
Reto Da Forno's avatar
Reto Da Forno committed
834
        raise
835
836
837
838
839
840
841
842
843
844
### END start_test()



##############################################################################
#
# stop_test
#
##############################################################################
def stop_test(testid, cur, cn, obsdict_key, obsdict_id, abort=False):
Reto Da Forno's avatar
Reto Da Forno committed
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
    errors = []
    warnings = []
    
    try:
        logger.info("Stopping test %d..."%testid)
        
        # Update DB status --- 
        if abort:
            status = 'aborting'
        else:
            status = 'cleaning up'
        logger.debug("Setting test status in DB to %s..." %status)
        flocklab.set_test_status(cur, cn, testid, status)
        
        # Stop serial proxy ---
        # Get the XML config from the database and check if the serial service was used in the test:
        cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
        ret = cur.fetchone()
        if not ret:
            msg = "No XML found in database for testid %d." %testid
            errors.append(msg)
            logger.error(msg)
        else:
868
869
            parser = lxml.etree.XMLParser(remove_comments=True)
            tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
870
            ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
871
872
873
874
875
            logger.debug("Got XML from database.")
            # only stop serialproxy if remote IP specified in xml
            if tree.xpath('//d:serialConf/d:remoteIp', namespaces=ns):
                # Serial service was used. Thus stop the serial proxy:
                logger.debug("Usage of serial service detected. Stopping serial proxy...")
Reto Da Forno's avatar
Reto Da Forno committed
876
                cmd = [flocklab.config.get("dispatcher", "serialproxyscript"), "--notify"]
Reto Da Forno's avatar
Reto Da Forno committed
877
878
879
880
881
                if debug: 
                    cmd.append("--debug")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                rs = p.wait()
                if (rs != 0):
882
                    msg = "Serial proxy for test ID %d could not be stopped. Serial proxy returned %d." % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
883
884
                    errors.append(msg)
                    logger.error(msg)
885
                    logger.debug("Executed command was: %s" % (str(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
                else:
                    logger.debug("Stopped serial proxy.")
        
        # Stop test on observers ---
        if not db_register_activity(testid, cur, cn, 'stop', iter(obsdict_key.keys())):
            msg = "Some observers were occupied while stopping test."
            logger.warn(msg)
            warnings.append(msg)
        # Start a thread for each observer which calls the test stop script on the observer
        logger.info("Stopping test on observers...")
        thread_list = []
        errors_queue = queue.Queue()
        for obskey in obsdict_key.keys():
            thread = StopTestThread(obskey, obsdict_key, errors_queue,testid)
            thread_list.append((thread, obskey))
            thread.start()
            logger.debug("Started thread for test stop on observer ID %s" %(str(obsdict_key[obskey][1])))
        # Wait for all threads to finish:
        for (thread, obskey) in thread_list:
Reto Da Forno's avatar
Reto Da Forno committed
905
            thread.join(timeout=(flocklab.config.getint('tests','cleanuptime')*0.75*60))
Reto Da Forno's avatar
Reto Da Forno committed
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
            if thread.isAlive():
                # Timeout occurred. Signal the thread to abort:
                msg = "Telling thread for test stop on observer ID %s to abort..." %(str(obsdict_key[obskey][1]))
                logger.error(msg)
                warnings.append(msg)
                thread.abort()
        # Wait again for the aborted threads:
        for (thread, obskey) in thread_list:    
            thread.join(timeout=10)
            if thread.isAlive():
                msg = "Thread for test stop on observer ID %s is still alive but should be aborted now." %(str(obsdict_key[obskey][1]))
                errors.append(msg)
                logger.error(msg)
        db_unregister_activity(testid, cur, cn, 'stop')
        # cleanup resource allocation
Reto Da Forno's avatar
Reto Da Forno committed
921
        now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
Reto Da Forno's avatar
Reto Da Forno committed
922
923
924
925
926
        cur.execute("DELETE FROM tbl_serv_resource_allocation where `time_end` < '%s' OR `test_fk` = %d" % (now, testid))
        cn.commit()
        # Stop fetcher ---
        # This has to be done regardless of previous errors.
        logger.info("Stopping fetcher...")
Reto Da Forno's avatar
Reto Da Forno committed
927
        cmd = [flocklab.config.get("dispatcher", "fetcherscript"),"--testid=%d"%testid, "--stop"]
Reto Da Forno's avatar
Reto Da Forno committed
928
929
930
931
        if debug: 
            cmd.append("--debug")
        p = subprocess.Popen(cmd)
        rs = p.wait()
932
        if rs not in (flocklab.SUCCESS, errno.ENOPKG): # flocklab.SUCCESS (0) is successful stop, ENOPKG (65) means the service was not running. 
Reto Da Forno's avatar
Reto Da Forno committed
933
            msg = "Could not stop fetcher for test ID %d. Fetcher returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
934
935
            errors.append(msg)
            logger.error(msg)
936
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
937
938
    
        # Get all errors (if any). Observers which return errors are not regarded as a general error.
939
940
        #if not errors_queue.empty():
            #logger.error("Queue with errors from test stop thread is not empty. Getting errors...")
Reto Da Forno's avatar
Reto Da Forno committed
941
942
943
        while not errors_queue.empty():
            errs = errors_queue.get()
            for err in errs[1]:
944
                #logger.error("Error from test stop thread: %s" %(str(err[0])))
Reto Da Forno's avatar
Reto Da Forno committed
945
946
947
948
949
950
951
952
                warnings.append(err[0])
        
        # Set stop time in DB ---
        cur.execute("UPDATE `tbl_serv_tests` SET `time_end_act` = UTC_TIMESTAMP() WHERE `serv_tests_key` = %d" %testid)
        cn.commit()
        
        return (errors, warnings)
    except Exception:
953
        msg = "Unexpected error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
954
        logger.error(msg)
Reto Da Forno's avatar
Reto Da Forno committed
955
        raise
956
957
958
959
960
961
962
963
964
### END stop_test()


##############################################################################
#
# prepare_testresults
#
##############################################################################
def prepare_testresults(testid, cur):
Reto Da Forno's avatar
Reto Da Forno committed
965
966
967
968
969
970
    """    This function prepares testresults for the user. It calls the archiver.
        If several instances of the archiver
        are running, it may take a long time for this function to finish as it will wait
        for these functions to succeed.
    """
    errors = []
971
    tree = None
Reto Da Forno's avatar
Reto Da Forno committed
972
973
974
975
    
    # Check if results directory exists
    testresultsdir = "%s/%d" % (flocklab.config.get('fetcher', 'testresults_dir'), testid)
    if not os.path.isdir(testresultsdir):
Reto Da Forno's avatar
Reto Da Forno committed
976
        logger.warn("Test results directory does not exist.")
Reto Da Forno's avatar
Reto Da Forno committed
977
978
        return errors
    
Reto Da Forno's avatar
Reto Da Forno committed
979
980
981
982
983
984
    logger.debug("Preparing testresults...")
    
    # Check if user wants test results as email ---
    logger.debug("Check if user wants testresults as email...")
    emailResults = False
    # Get the XML config from the database:
985
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" % testid)
Reto Da Forno's avatar
Reto Da Forno committed
986
987
    ret = cur.fetchone()
    if ret:
988
989
        parser = lxml.etree.XMLParser(remove_comments=True)
        tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
990
        ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
991
992
993
994
995
996
997
998
999
1000
        logger.debug("Got XML from database.")
        # Check if user wants results as email
        ret = tree.xpath('//d:generalConf/d:emailResults', namespaces=ns)
        if not ret:
            logger.debug("Could not get relevant XML value <emailResults>, thus not emailing results to user.")
        else:
            if (ret[0].text.lower() == 'yes'):
                emailResults = True
    if not emailResults:
        logger.debug("User does not want test results as email.")
For faster browsing, not all history is shown. View entire blame