flocklab_dispatcher.py 74.9 KB
Newer Older
1
2
#! /usr/bin/env python3

3
import sys, os, getopt, errno, threading, shutil, time, datetime, subprocess, tempfile, queue, re, logging, traceback, __main__, types, hashlib, lxml.etree, MySQLdb
4
5
6
import lib.flocklab as flocklab


7
8
logger = None
debug  = False
9
10
11
12
13
14
15
16


##############################################################################
#
# StopTestThread
#
##############################################################################
class StopTestThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
17
18
19
20
21
    """    Thread which calls the test stop script on an observer. 
    """ 
    def __init__(self, obskey, obsdict_key, errors_queue, testid):
        threading.Thread.__init__(self) 
        self._obskey        = obskey
22
23
        self._obsdict_key   = obsdict_key
        self._errors_queue  = errors_queue
Reto Da Forno's avatar
Reto Da Forno committed
24
25
26
27
28
        self._abortEvent    = threading.Event()
        self._testid        = testid
        
    def run(self):
        try:
29
            logger.debug("Start StopTestThread for observer ID %d" % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
30
31
            errors = []
            # First test if the observer is online and if the SD card is mounted: 
Reto Da Forno's avatar
Reto Da Forno committed
32
            cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), "mount | grep /dev/mmcblk0p1"]
Reto Da Forno's avatar
Reto Da Forno committed
33
34
35
36
37
38
39
40
41
42
43
44
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            while p.returncode == None:
                self._abortEvent.wait(1.0)
                p.poll()
            if self._abortEvent.is_set():
                p.kill()
            else:
                out, err = p.communicate()
            rs = p.returncode
            if (rs != 0):
                if (rs == 1):
                    if ("No such file or directory" in err):
45
                        msg = "SD card on observer ID %s is not mounted, observer will thus be omitted for this test." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
46
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
47
                        msg = "Observer ID %s is not reachable (returned %d: %s, %s)." % (self._obsdict_key[self._obskey][1], rs, out, err)
Reto Da Forno's avatar
Reto Da Forno committed
48
                else:
49
                    msg = "Observer ID %s is not responsive (SSH returned %d)." % (self._obsdict_key[self._obskey][1], rs)
Reto Da Forno's avatar
Reto Da Forno committed
50
51
52
53
                errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                logger.error(msg)
            else:
                # Call the script on the observer which stops the test:
Reto Da Forno's avatar
Reto Da Forno committed
54
                remote_cmd = flocklab.config.get("observer", "stoptestscript") + " --testid=%d" % self._testid
Reto Da Forno's avatar
Reto Da Forno committed
55
56
                if debug:
                    remote_cmd += " --debug"
Reto Da Forno's avatar
Reto Da Forno committed
57
                cmd = ['ssh' ,'%s' % (self._obsdict_key[self._obskey][2]), remote_cmd]
Reto Da Forno's avatar
Reto Da Forno committed
58
59
60
61
62
63
64
65
66
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
                while p.returncode == None:
                    self._abortEvent.wait(1.0)
                    p.poll()
                if self._abortEvent.is_set():
                    p.kill()
                else:
                    out, err = p.communicate()
                rs = p.returncode
67
                if (rs == flocklab.SUCCESS):
68
                    logger.debug("Test stop script on observer ID %s succeeded." %(self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
69
                elif (rs == 255):
70
                    msg = "Observer ID %s is not reachable, thus not able to stop test. Dataloss occurred possibly for this observer." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
71
72
73
                    errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                    logger.error(msg)
                else:
Reto Da Forno's avatar
Reto Da Forno committed
74
75
                    errors.append(("Test stop script on observer ID %s failed with error code %d." % (str(self._obsdict_key[self._obskey][1]), rs), rs, self._obsdict_key[self._obskey][1]))
                    logger.error("Test stop script on observer ID %s failed with error code %d and message:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out)))
76
                    logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
77
78
        except:
            logger.debug("Exception: %s, %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
79
80
81
82
            # Main thread requested abort.
            # Close a possibly still running subprocess:
            if (p is not None) and (p.poll() is not None):
                p.kill()
83
            msg = "StopTestThread for observer ID %d aborted." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
84
85
86
87
88
89
90
91
            errors.append((msg, errno.ECOMM, self._obsdict_key[self._obskey][1]))
            logger.error(msg)
        finally:
            if (len(errors) > 0):
                self._errors_queue.put((self._obskey, errors))
            
    def abort(self):
        self._abortEvent.set()
92
93
94
95
96
97
98
99
100
101
### END StopTestThread



##############################################################################
#
# StartTestThread
#
##############################################################################
class StartTestThread(threading.Thread):
Reto Da Forno's avatar
Reto Da Forno committed
102
103
104
    """    Thread which uploads all config files to an observer and
        starts the test on the observer. 
    """ 
Reto Da Forno's avatar
Reto Da Forno committed
105
    def __init__(self, obskey, obsdict_key, xmldict_key, imagedict_key, errors_queue, testid):
Reto Da Forno's avatar
Reto Da Forno committed
106
107
        threading.Thread.__init__(self) 
        self._obskey        = obskey
Reto Da Forno's avatar
Reto Da Forno committed
108
109
110
111
        self._obsdict_key   = obsdict_key
        self._xmldict_key   = xmldict_key
        self._imagedict_key = imagedict_key
        self._errors_queue  = errors_queue
Reto Da Forno's avatar
Reto Da Forno committed
112
113
114
115
116
        self._abortEvent    = threading.Event()
        self._testid        = testid
        
    def run(self):
        errors = []
117
118
        testconfigfolder = "%s/%d" % (flocklab.config.get("observer", "testconfigfolder"), self._testid)
        obsdataport      = flocklab.config.getint('serialproxy', 'obsdataport')
Reto Da Forno's avatar
Reto Da Forno committed
119
        try:
Reto Da Forno's avatar
Reto Da Forno committed
120
            logger.debug("Start StartTestThread for observer ID %d" % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
121
            # First test if the observer is online and if the SD card is mounted: 
122
            cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), "ls ~/data/ && mkdir %s" % testconfigfolder]
Reto Da Forno's avatar
Reto Da Forno committed
123
124
125
126
127
128
129
130
131
132
133
134
            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            while p.returncode == None:
                self._abortEvent.wait(1.0)
                p.poll()
            if self._abortEvent.is_set():
                p.kill()
            else:
                out, err = p.communicate()
            rs = p.returncode
            if (rs != 0):
                if (rs == 1):
                    if ("No such file or directory" in err):
135
                        msg = "SD card on observer ID %s is not mounted, observer will thus be omitted for this test." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
136
                    else:
137
                        msg = "Observer ID %s is not reachable, it will thus be omitted for this test (returned: %d: %s, %s)." % (self._obsdict_key[self._obskey][1], rs, out, err)
Reto Da Forno's avatar
Reto Da Forno committed
138
                else:
139
                    msg = "Observer ID %s is not responsive, it will thus be omitted for this test (SSH returned %d). Command: %s" % (self._obsdict_key[self._obskey][1], rs, " ".join(cmd))
Reto Da Forno's avatar
Reto Da Forno committed
140
141
142
143
144
145
146
147
148
149
                errors.append((msg, errno.EHOSTUNREACH, self._obsdict_key[self._obskey][1]))
                logger.error(msg)
            else:
                fileuploadlist = [self._xmldict_key[self._obskey][0]]
                if self._obskey in list(self._imagedict_key.keys()):
                    for image in self._imagedict_key[self._obskey]:
                        fileuploadlist.append(image[0])
                # Now upload the image and XML config file:
                cmd = ['scp', '-q']
                cmd.extend(fileuploadlist)
Reto Da Forno's avatar
Reto Da Forno committed
150
                cmd.append('%s:%s/.' % (self._obsdict_key[self._obskey][2], testconfigfolder))
Reto Da Forno's avatar
Reto Da Forno committed
151
152
153
154
155
156
157
                p = subprocess.Popen(cmd)
                while p.returncode == None:
                    self._abortEvent.wait(1.0)
                    p.poll()
                if self._abortEvent.is_set():
                    p.kill()
                rs = p.returncode
158
                if (rs != flocklab.SUCCESS):
159
                    msg = "Upload of target image and config XML to observer ID %s failed with error number %d\nTried to execute: %s" % (self._obsdict_key[self._obskey][1], rs, (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
160
161
162
                    errors.append((msg, rs, self._obsdict_key[self._obskey][1]))
                    logger.error(msg)
                else:
163
                    logger.debug("Upload of target image and config XML to observer ID %s succeeded." % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
164
                    # Run the script on the observer which starts the test:
Reto Da Forno's avatar
Reto Da Forno committed
165
                    remote_cmd = flocklab.config.get("observer", "starttestscript") + " --testid=%d --xml=%s/%s --serialport=%d" % (self._testid, testconfigfolder, os.path.basename(self._xmldict_key[self._obskey][0]), obsdataport)
Reto Da Forno's avatar
Reto Da Forno committed
166
167
                    if debug:
                        remote_cmd += " --debug"
Reto Da Forno's avatar
Reto Da Forno committed
168
                    cmd = ['ssh', '%s' % (self._obsdict_key[self._obskey][2]), remote_cmd]
Reto Da Forno's avatar
Reto Da Forno committed
169
170
171
172
173
174
                    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
                    while p.returncode == None:
                        self._abortEvent.wait(1.0)
                        p.poll()
                    if self._abortEvent.is_set():
                        p.kill()
Reto Da Forno's avatar
Reto Da Forno committed
175
                        logger.debug("Abort is set, start test process for observer %s killed." % (self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
176
177
178
                    else:
                        out, err = p.communicate()
                    rs = p.wait()
179
                    if rs != flocklab.SUCCESS:
Reto Da Forno's avatar
Reto Da Forno committed
180
181
                        errors.append(("Test start script on observer ID %s failed with error code %d." % (self._obsdict_key[self._obskey][1], rs), rs, self._obsdict_key[self._obskey][1]))
                        logger.error("Test start script on observer ID %s failed with error code %d and message:\n%s" % (str(self._obsdict_key[self._obskey][1]), rs, str(out)))
Reto Da Forno's avatar
Reto Da Forno committed
182
                    else:
183
184
185
186
187
188
189
190
                        logger.debug("Test start script on observer ID %s succeeded." % (self._obsdict_key[self._obskey][1]))
                    # Remove image file and xml on server:
                    os.remove(self._xmldict_key[self._obskey][0])
                    logger.debug("Removed XML config %s for observer ID %s" % (self._xmldict_key[self._obskey][0], self._obsdict_key[self._obskey][1]))
                    if self._obskey in list(self._imagedict_key.keys()):
                        for image in self._imagedict_key[self._obskey]:
                            os.remove(image[0])
                            logger.debug("Removed target image %s for observer ID %s" % (self._imagedict_key[self._obskey][0], self._obsdict_key[self._obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
191
            
Reto Da Forno's avatar
Reto Da Forno committed
192
193
        except:
            logger.debug("Exception: %s, %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1])))
Reto Da Forno's avatar
Reto Da Forno committed
194
195
196
197
            # Main thread requested abort.
            # Close a possibly still running subprocess:
            if (p is not None) and (p.poll() is not None):
                p.kill()
198
            msg = "StartTestThread for observer ID %d aborted." % (self._obsdict_key[self._obskey][1])
Reto Da Forno's avatar
Reto Da Forno committed
199
200
201
202
203
204
205
206
207
            errors.append((msg, errno.ECOMM, self._obsdict_key[self._obskey][1]))
            logger.error(msg)
        finally:
            if (len(errors) > 0):
                self._errors_queue.put((self._obskey, errors))
        
    def abort(self):
        self._abortEvent.set()
    
208
209
210
211
212
213
214
215
216
217
### END StartTestThread



##############################################################################
#
# start_test
#
##############################################################################
def start_test(testid, cur, cn, obsdict_key, obsdict_id):
Reto Da Forno's avatar
Reto Da Forno committed
218
219
220
221
222
223
    errors = []
    warnings = []
    
    try:    
        logger.debug("Entering start_test() function...")
        # First, validate the XML file again. If validation fails, return immediately:
Reto Da Forno's avatar
Reto Da Forno committed
224
        cmd = [flocklab.config.get('dispatcher','validationscript'), '--testid=%d'%testid]
Reto Da Forno's avatar
Reto Da Forno committed
225
226
227
228
        p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
        out, err = p.communicate()
        rs = p.returncode
        if rs != 0:
229
230
            logger.error("Error %s returned from %s" % (str(rs), flocklab.config.get('dispatcher','validationscript')))
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
231
232
233
234
235
236
237
238
239
240
            errors.append("Validation of XML failed. Output of script was: %s %s" % (str(out), str(err)))
        
        if len(errors) == 0:
            # Update DB status ---
            # Update the status of the test in the db:
            flocklab.set_test_status(cur, cn, testid, 'preparing')
            
            # Get start/stop time ---
            cur.execute("SELECT `time_start_wish`, `time_end_wish`, `owner_fk` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d" %testid)
            # Times are going to be of datetime type:
241
            ret = cur.fetchone()
Reto Da Forno's avatar
Reto Da Forno committed
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
            starttime = ret[0]
            stoptime  = ret[1]
            owner_fk = ret[2]
            logger.debug("Got start time wish for test from database: %s" %starttime)
            logger.debug("Got end time wish for test from database: %s" %stoptime)
            
            # Image processing ---
            # Get all images from the database:
            imagedict_key = {}
            sql_image =     """    SELECT `t`.`binary`, `m`.`observer_fk`, `m`.`node_id`, LOWER(`a`.`architecture`), LOWER(`o`.`name`) AS `osname`, `t`.`serv_targetimages_key`, LOWER(`p`.`name`) AS `platname`, `a`.`core` AS `core`
                                FROM `tbl_serv_targetimages` AS `t` 
                                LEFT JOIN `tbl_serv_map_test_observer_targetimages` AS `m` 
                                    ON `t`.`serv_targetimages_key` = `m`.`targetimage_fk` 
                                LEFT JOIN `tbl_serv_platforms` AS `p`
                                    ON `t`.`platforms_fk` = `p`.`serv_platforms_key`
                                LEFT JOIN `tbl_serv_operatingsystems` AS `o`
                                    ON `t`.`operatingsystems_fk` = `o`.`serv_operatingsystems_key`
                                LEFT JOIN `tbl_serv_architectures` AS `a`
                                    ON `t`.`core` = `a`.`core` AND `p`.`serv_platforms_key` = `a`.`platforms_fk`
                                WHERE `m`.`test_fk` = %d
                            """    
            cur.execute(sql_image%testid)
            ret = cur.fetchall()
            for r in ret:
                binary      = r[0]
                obs_fk      = r[1]
                obs_id        = obsdict_key[obs_fk][1]
                node_id     = r[2]
                arch        = r[3]
                osname      = r[4].lower()
                tgimage_key = r[5]
                platname    = r[6]
                core        = r[7]
                
                # Prepare image ---
                (fd, imagepath) = tempfile.mkstemp()
                binpath = "%s" %(os.path.splitext(imagepath)[0]) 
                imagefile = os.fdopen(fd, 'w+b')
                imagefile.write(binary)
                imagefile.close()
                removeimage = True
                logger.debug("Got target image ID %s for observer ID %s with node ID %s from database and wrote it to temp file %s (hash %s)" %(str(tgimage_key), str(obs_id), str(node_id), imagepath, hashlib.sha1(binary).hexdigest()))
                
                # Convert image to binary format and, depending on operating system and platform architecture, write the node ID (if specified) to the image:
                logger.debug("Found %s target platform architecture with %s operating system on platform %s for observer ID %s (node ID to be used: %s)." %(arch, osname, platname, str(obs_id), str(node_id)))
Reto Da Forno's avatar
Reto Da Forno committed
287
                set_symbols_tool = flocklab.config.get('targetimage', 'setsymbolsscript')
Reto Da Forno's avatar
Reto Da Forno committed
288
289
290
291
292
293
294
                symbol_node_id = "FLOCKLAB_NODE_ID"
                # keep <os> tag for backwards compatibility
                if ((node_id != None) and (osname == 'tinyos')):
                    symbol_node_id = "TOS_NODE_ID"
                elif (osname == 'contiki'):
                    symbol_node_id = None   # don't set node ID for OS Contiki
                if (arch == 'msp430'):
Reto Da Forno's avatar
Reto Da Forno committed
295
                    binutils_path = flocklab.config.get('targetimage', 'binutils_msp430')
Reto Da Forno's avatar
Reto Da Forno committed
296
297
                    binpath = "%s.ihex"%binpath
                    if symbol_node_id:
Reto Da Forno's avatar
Reto Da Forno committed
298
                        cmd = ['%s' % (set_symbols_tool), '--objcopy', '%s/msp430-objcopy' % (binutils_path), '--objdump', '%s/msp430-objdump' % (binutils_path), '--target', 'ihex', imagepath, binpath, '%s=%s' % (symbol_node_id, node_id), 'ActiveMessageAddressC$addr=%s' % (node_id), 'ActiveMessageAddressC__addr=%s' % (node_id)]
Reto Da Forno's avatar
Reto Da Forno committed
299
300
301
302
303
                        try:
                            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                            rs = p.wait()
                            if rs != 0:
                                logger.error("Error %d returned from %s" % (rs, set_symbols_tool))
304
                                logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
305
306
307
308
309
                                errors.append("Could not set node ID %s for target image %s" %(str(node_id), str(tgimage_key)))
                            else:
                                logger.debug("Set symbols and converted file to ihex.")
                                # Remove the temporary exe file
                                os.remove("%s.exe"%imagepath)
310
                                #logger.debug("Removed intermediate image %s.exe" % (str(imagepath)))
Reto Da Forno's avatar
Reto Da Forno committed
311
                        except OSError as err:
312
                            msg = "Error in subprocess: tried calling %s. Error was: %s" % (str(cmd), str(err))
Reto Da Forno's avatar
Reto Da Forno committed
313
314
315
316
                            logger.error(msg)
                            errors.append(msg)
                            removeimage = False
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
317
                        cmd = ['%s/msp430-objcopy' % (binutils_path), '--output-target', 'ihex', imagepath, binpath]
Reto Da Forno's avatar
Reto Da Forno committed
318
319
320
321
322
                        try:
                            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                            rs = p.wait()
                            if rs != 0:
                                logger.error("Error %d returned from msp430-objcopy" %rs)
323
                                logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
324
325
326
327
                                errors.append("Could not convert target image %s to ihex" %str(tgimage_key))
                            else:
                                logger.debug("Converted file to ihex.")
                        except OSError as err:
328
                            msg = "Error in subprocess: tried calling %s. Error was: %s" % (str(cmd), str(err))
Reto Da Forno's avatar
Reto Da Forno committed
329
330
331
332
333
334
335
336
337
338
339
                            logger.error(msg)
                            errors.append(msg)
                            removeimage = False
                elif (arch == 'arm'):
                    if (platname == 'dpp'):
                        imgformat = 'ihex'
                        binpath = "%s.ihex"%binpath
                    else:
                        imgformat = 'binary'
                        binpath = "%s.bin"%binpath
                    # Set library path for arm-binutils:
Reto Da Forno's avatar
Reto Da Forno committed
340
                    arm_binutils_path = flocklab.config.get('targetimage', 'binutils_arm')
Reto Da Forno's avatar
Reto Da Forno committed
341
342
343
                    arm_env = os.environ
                    if 'LD_LIBRARY_PATH' not in arm_env:
                        arm_env['LD_LIBRARY_PATH'] = ''
Reto Da Forno's avatar
Reto Da Forno committed
344
                    arm_env['LD_LIBRARY_PATH'] += ':%s/%s' % (arm_binutils_path, "usr/x86_64-linux-gnu/arm-linux-gnu/lib")
Reto Da Forno's avatar
Reto Da Forno committed
345
                    if symbol_node_id:
Reto Da Forno's avatar
Reto Da Forno committed
346
                        cmd = ['%s' % (set_symbols_tool), '--objcopy', '%s/%s' % (arm_binutils_path, "usr/bin/arm-linux-gnu-objcopy"), '--objdump', '%s/%s' % (arm_binutils_path, "usr/bin/arm-linux-gnu-objdump"), '--target', imgformat, imagepath, binpath, '%s=%s' % (symbol_node_id, node_id), 'ActiveMessageAddressC$addr=%s' % (node_id), 'ActiveMessageAddressC__addr=%s' % (node_id)]
Reto Da Forno's avatar
Reto Da Forno committed
347
348
349
350
351
                        try:
                            p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=arm_env)
                            rs = p.wait()
                            if rs != 0:
                                logger.error("Error %d returned from %s" % (rs, set_symbols_tool))
352
                                logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
353
354
355
356
                                errors.append("Could not set node ID %s for target image %s" %(str(node_id), str(tgimage_key)))
                            else:
                                logger.debug("Set symbols and converted file to bin.")
                        except OSError as err:
357
                            msg = "Error in subprocess: tried calling %s. Error was: %s" % (str(cmd), str(err))
Reto Da Forno's avatar
Reto Da Forno committed
358
359
360
361
                            logger.error(msg)
                            errors.append(msg)
                            removeimage = False
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
362
                        cmd = ['%s/%s' % (arm_binutils_path, "usr/bin/arm-linux-gnu-objcopy"), '--output-target', imgformat, imagepath, binpath]
Reto Da Forno's avatar
Reto Da Forno committed
363
364
365
366
                        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=arm_env)
                        rs = p.wait()
                        if rs != 0:
                            logger.error("Error %d returned from arm-linux-gnu-objcopy" %rs)
367
                            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
                            errors.append("Could not convert target image %s to bin" %str(tgimage_key))
                        else:
                            logger.debug("Converted file to bin.")
                else:
                    msg = "Unknown architecture %s found. The original target image (ID %s) file will be used without modification." %(arch, str(tgimage_key))
                    errors.append(msg)
                    logger.error(msg)
                    orig = open(imagepath, "r+b")
                    binfile = open(binpath, "w+b")
                    binfile.write(orig.read())
                    orig.close()
                    binfile.close()
                    logger.debug("Copied image to binary file without modification.")
                
                # Remove the original file which is not used anymore:
                if removeimage:
                    os.remove(imagepath)
385
                    #logger.debug("Removed image %s" % (str(imagepath)))
Reto Da Forno's avatar
Reto Da Forno committed
386
                else:
387
                    logger.warn("Image %s has not been removed." % (str(imagepath)))
Reto Da Forno's avatar
Reto Da Forno committed
388
389
390
391
392
393
394
395
                
                
                # Slot detection ---
                # Find out which slot number to use on the observer.
                #logger.debug("Detecting adapter for %s on observer ID %s" %(platname, obs_id))
                ret = flocklab.get_slot(cur, int(obs_fk), platname)
                if ret in range(1,5):
                    slot = ret
396
                    logger.debug("Found adapter for %s on observer ID %s in slot %d" % (platname, obs_id, slot))
Reto Da Forno's avatar
Reto Da Forno committed
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
                elif ret == 0:
                    slot = None
                    msg = "Could not find an adapter for %s on observer ID %s" %(platname, obs_id)
                    errors.append(msg)
                    logger.error(msg)
                else:
                    slot = None
                    msg = "Error when detecting adapter for %s on observer ID %s: function returned %d" %(platname, obs_id, ret)
                    errors.append(msg)
                    logger.error(msg)
                        
                # Write the dictionary for the image:
                if not obs_fk in imagedict_key:
                    imagedict_key[obs_fk] = []
                imagedict_key[obs_fk].append((binpath, slot, platname, osname, 0.0, core))
                
            logger.info("Processed all target images from database.")
                
            # XML processing ---
            # Get the XML config from the database and generate a separate file for every observer used:
            cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
            ret = cur.fetchone()
            if not ret:
                msg = "No XML found in database for testid %d." %testid
                errors.append(msg)
                logger.error(msg)
            else:
424
425
                parser = lxml.etree.XMLParser(remove_comments=True)
                tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
426
                ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
                logger.debug("Got XML from database.")
                # Create XML files ---
                # Create an empty XML config file for every observer used and organize them in a dictionary:
                xmldict_key = {}
                for obs_key, obs_id, obs_ether in obsdict_key.values():
                    (fd, xmlpath) = tempfile.mkstemp()
                    xmlfhand = os.fdopen(fd, 'w+')
                    xmldict_key[obs_key] = (xmlpath, xmlfhand)
                    xmlfhand.write('<?xml version="1.0" encoding="UTF-8"?>\n\n<obsConf>\n\n')
                # Go through the blocks of the XML file and write the configs to the affected observer XML configs:
                # targetConf ---
                targetconfs = tree.xpath('//d:targetConf', namespaces=ns)
                if not targetconfs:
                    msg = "no <targetConf> element found in XML config (wrong namespace?)"
                    errors.append(msg)
                    logger.error(msg)
                for targetconf in targetconfs:
                    obsids = targetconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                    ret = targetconf.xpath('d:voltage', namespaces=ns)
                    if ret:
                        voltage = ret[0].text.strip()
                    else:
Reto Da Forno's avatar
Reto Da Forno committed
449
                        voltage = str(flocklab.config.get("dispatcher", "default_tg_voltage"))
Reto Da Forno's avatar
Reto Da Forno committed
450
451
452
453
454
455
456
457
458
459
460
461
                    ret = targetconf.xpath('d:noImage', namespaces=ns)
                    if ret:
                        noImageSlot = ret[0].text.strip()
                    else:
                        noImageSlot = None
                    for obsid in obsids:
                        obsid = int(obsid)
                        obskey = obsdict_id[obsid][0]
                        xmldict_key[obskey][1].write("<obsTargetConf>\n")
                        xmldict_key[obskey][1].write("\t<voltage>%s</voltage>\n"%voltage)
                        if noImageSlot:
                            slot = noImageSlot
462
                            xmldict_key[obskey][1].write("\t<slotnr>%s</slotnr>\n" % (slot))
Reto Da Forno's avatar
Reto Da Forno committed
463
                        else:
464
                            xmldict_key[obskey][1].write("\t<firmware>%s</firmware>\n" % (imagedict_key[obskey][0][4]))
Reto Da Forno's avatar
Reto Da Forno committed
465
                            for coreimage in imagedict_key[obskey]:
466
                                xmldict_key[obskey][1].write("\t<image core=\"%d\">%s/%d/%s</image>\n" % (coreimage[5], flocklab.config.get("observer", "testconfigfolder"),testid, os.path.basename(coreimage[0])))
467
468
469
                            xmldict_key[obskey][1].write("\t<slotnr>%s</slotnr>\n" % (imagedict_key[obskey][0][1]))
                            xmldict_key[obskey][1].write("\t<platform>%s</platform>\n" % (imagedict_key[obskey][0][2]))
                            xmldict_key[obskey][1].write("\t<os>%s</os>\n" % (imagedict_key[obskey][0][3]))
Reto Da Forno's avatar
Reto Da Forno committed
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
                            slot = imagedict_key[obskey][0][1]
                        xmldict_key[obskey][1].write("</obsTargetConf>\n\n")
                        #logger.debug("Wrote obsTargetConf XML for observer ID %s" %obsid)
                        # update test_image mapping with slot information
                        cur.execute("UPDATE `tbl_serv_map_test_observer_targetimages` SET `slot` = %s WHERE `observer_fk` = %d AND `test_fk`=%d" % (slot, obskey, testid))
                        cn.commit()
                
                # serialConf ---
                srconfs = tree.xpath('//d:serialConf', namespaces=ns)
                serialProxyUsed = False
                if srconfs:
                    # only use serialproxy if remote IP specified in xml
                    if tree.xpath('//d:serialConf/d:remoteIp', namespaces=ns):
                        serialProxyUsed = True
                    for srconf in srconfs:
                        obsids = srconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        xmlblock = "<obsSerialConf>\n"
                        port = srconf.xpath('d:port', namespaces=ns)
                        if port:
                            port = srconf.xpath('d:port', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<port>%s</port>\n" %port
                        baudrate = srconf.xpath('d:baudrate', namespaces=ns)
                        if baudrate:
                            baudrate = srconf.xpath('d:baudrate', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<baudrate>%s</baudrate>\n" %baudrate
                        mode = srconf.xpath('d:mode', namespaces=ns)
                        if mode:
                            mode = srconf.xpath('d:mode', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<mode>%s</mode>\n" %mode
                        xmlblock += "</obsSerialConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                            #logger.debug("Wrote obsSerialConf XML for observer ID %s" %obsid)
                else:
506
                    logger.debug("No <serialConf> found, not using serial service.")
Reto Da Forno's avatar
Reto Da Forno committed
507
508
509
                
                # gpioTracingConf ---
                gmconfs = tree.xpath('//d:gpioTracingConf', namespaces=ns)
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
                if gmconfs:
                    for gmconf in gmconfs:
                        obsids = gmconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        pinconfs = gmconf.xpath('d:pinConf', namespaces=ns)
                        xmlblock = "<obsGpioMonitorConf>\n"
                        for pinconf in pinconfs:
                            pin  = pinconf.xpath('d:pin', namespaces=ns)[0].text.strip()
                            edge = pinconf.xpath('d:edge', namespaces=ns)[0].text.strip()
                            mode = pinconf.xpath('d:mode', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<pinConf>\n\t\t<pin>%s</pin>\n\t\t<edge>%s</edge>\n\t\t<mode>%s</mode>\n" %(pin, edge, mode)
                            cb_gs_add = pinconf.xpath('d:callbackGpioActAdd', namespaces=ns)
                            if cb_gs_add:
                                pin = cb_gs_add[0].xpath('d:pin', namespaces=ns)[0].text.strip()
                                level = cb_gs_add[0].xpath('d:level', namespaces=ns)[0].text.strip()
                                offsets = cb_gs_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
                                offsetms = cb_gs_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
                                xmlblock += "\t\t<callbackGpioSetAdd>\n\t\t\t<pin>%s</pin>\n\t\t\t<level>%s</level>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackGpioSetAdd>\n" %(pin, level, offsets, offsetms)
                            cb_pp_add = pinconf.xpath('d:callbackPowerProfAdd', namespaces=ns)
                            if cb_pp_add:
                                duration = cb_pp_add[0].xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()
                                offsets = cb_pp_add[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip()
                                offsetms = cb_pp_add[0].xpath('d:offsetMicrosecs', namespaces=ns)[0].text.strip()
                                xmlblock += "\t\t<callbackPowerprofAdd>\n\t\t\t<duration>%s</duration>\n\t\t\t<offsetSecs>%s</offsetSecs>\n\t\t\t<offsetMicrosecs>%s</offsetMicrosecs>\n\t\t</callbackPowerprofAdd>\n" %(duration, offsets, offsetms)
                            xmlblock += "\t</pinConf>\n"
                        xmlblock += "</obsGpioMonitorConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                          #logger.debug("Wrote obsGpioMonitorConf XML for observer ID %s" %obsid)
                else:
                    logger.debug("No <gpioTracingConf> found, not using GPIO tracing service.")
Reto Da Forno's avatar
Reto Da Forno committed
542
543
544
545
546
547
                        
                # gpioActuationConf ---
                # Create 2 pin settings for every observer used in the test: 
                #        1) Pull reset pin of target low when test is to start
                #        2) Pull reset pin of target high when test is to stop
                xmlblock = "<obsGpioSettingConf>\n"
Reto Da Forno's avatar
Reto Da Forno committed
548
                startdatetime = starttime.strftime(flocklab.config.get("observer", "timeformat"))
Reto Da Forno's avatar
Reto Da Forno committed
549
550
                startmicrosecs = starttime.microsecond
                xmlblock += "\t<pinConf>\n\t\t<pin>RST</pin>\n\t\t<level>low</level>\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%d</absoluteMicrosecs>\n\t\t</absoluteTime>\n\t\t<intervalMicrosecs>0</intervalMicrosecs>\n\t\t<count>1</count>\n\t</pinConf>\n" %(startdatetime, startmicrosecs)
Reto Da Forno's avatar
Reto Da Forno committed
551
                stopdatetime = stoptime.strftime(flocklab.config.get("observer", "timeformat"))
Reto Da Forno's avatar
Reto Da Forno committed
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
                stopmicrosecs = stoptime.microsecond
                xmlblock += "\t<pinConf>\n\t\t<pin>RST</pin>\n\t\t<level>high</level>\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%d</absoluteMicrosecs>\n\t\t</absoluteTime>\n\t\t<intervalMicrosecs>0</intervalMicrosecs>\n\t\t<count>1</count>\n\t</pinConf>\n" %(stopdatetime, stopmicrosecs)
                for obskey in obsdict_key.keys():
                    xmldict_key[obskey][1].write(xmlblock)
                # Now write the per-observer config:
                gsconfs = tree.xpath('//d:gpioActuationConf', namespaces=ns)
                for gsconf in gsconfs:
                    xmlblock = ""
                    obsids = gsconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                    pinconfs = gsconf.xpath('d:pinConf', namespaces=ns)
                    for pinconf in pinconfs:
                        pin  = pinconf.xpath('d:pin', namespaces=ns)[0].text.strip()
                        level = pinconf.xpath('d:level', namespaces=ns)[0].text.strip()
                        abs_tim = pinconf.xpath('d:absoluteTime', namespaces=ns)
                        if abs_tim:
                            absdatetime = absolute2absoluteUTC_time(abs_tim[0].xpath('d:absoluteDateTime', namespaces=ns)[0].text.strip())
                            ret = abs_tim[0].xpath('d:absoluteMicrosecs', namespaces=ns)
                            if ret:
                                absmicrosec = int(ret[0].text.strip())
                            else:
                                absmicrosec = 0
                        rel_tim = pinconf.xpath('d:relativeTime', namespaces=ns)
                        if rel_tim:
                            relsec = int(rel_tim[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip())
                            ret = rel_tim[0].xpath('d:offsetMicrosecs', namespaces=ns)
                            if ret:
                                relmicrosec = int(ret[0].text.strip())
                            else:
                                relmicrosec = 0
                            # Relative times need to be converted into absolute times:
                            absmicrosec, absdatetime = relative2absolute_time(starttime, relsec, relmicrosec)    
                        periodic = pinconf.xpath('d:periodic', namespaces=ns)
                        if periodic:
                            interval = int(periodic[0].xpath('d:intervalMicrosecs', namespaces=ns)[0].text.strip())
                            count = int(periodic[0].xpath('d:count', namespaces=ns)[0].text.strip())
                        else:
                            interval = 0
                            count = 1
                        xmlblock += "\t<pinConf>\n\t\t<pin>%s</pin>\n\t\t<level>%s</level>\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%s</absoluteMicrosecs>\n\t\t</absoluteTime>\n\t\t<intervalMicrosecs>%i</intervalMicrosecs>\n\t\t<count>%i</count>\n\t</pinConf>\n" %(pin, level, absdatetime, absmicrosec, interval, count)
                    for obsid in obsids:
                        obsid = int(obsid)
                        obskey = obsdict_id[obsid][0]
                        xmldict_key[obskey][1].write(xmlblock)
                        #logger.debug("Wrote obsGpioSettingConf XML for observer ID %s" %obsid)
                xmlblock = "</obsGpioSettingConf>\n\n"
                for obskey in obsdict_key.keys():
                    xmldict_key[obskey][1].write(xmlblock)
599
                
Reto Da Forno's avatar
Reto Da Forno committed
600
601
                # powerProfilingConf ---
                ppconfs = tree.xpath('//d:powerProfilingConf', namespaces=ns)
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
                if ppconfs:
                    for ppconf in ppconfs:
                        obsids = ppconf.xpath('d:obsIds', namespaces=ns)[0].text.strip().split()
                        profconfs = ppconf.xpath('d:profConf', namespaces=ns)
                        xmlblock = "<obsPowerprofConf>\n"
                        for profconf in profconfs:
                            duration  = profconf.xpath('d:durationMillisecs', namespaces=ns)[0].text.strip()
                            xmlblock += "\t<profConf>\n\t\t<duration>%s</duration>" %duration
                            abs_tim = profconf.xpath('d:absoluteTime', namespaces=ns)
                            if abs_tim:
                                absdatetime = absolute2absoluteUTC_time(abs_tim[0].xpath('d:absoluteDateTime', namespaces=ns)[0].text.strip()) # parse xml date
                                ret = abs_tim[0].xpath('d:absoluteMicrosecs', namespaces=ns)
                                if ret:
                                    absmicrosec = ret[0].text.strip()
                                else: 
                                    absmicrosec = 0
                            rel_tim = profconf.xpath('d:relativeTime', namespaces=ns)
                            if rel_tim:
                                relsec = int(rel_tim[0].xpath('d:offsetSecs', namespaces=ns)[0].text.strip())
                                ret = rel_tim[0].xpath('d:offsetMicrosecs', namespaces=ns)
                                if ret:
                                    relmicrosec = int(ret[0].text.strip())
                                else:
                                    relmicrosec = 0
                                # Relative times need to be converted into absolute times:
                                absmicrosec, absdatetime = relative2absolute_time(starttime, relsec, relmicrosec)
                            xmlblock += "\n\t\t<absoluteTime>\n\t\t\t<absoluteDateTime>%s</absoluteDateTime>\n\t\t\t<absoluteMicrosecs>%s</absoluteMicrosecs>\n\t\t</absoluteTime>" %(absdatetime, absmicrosec)
                            samplingdivider = profconf.xpath('d:samplingDivider', namespaces=ns)
                            if samplingdivider:
                                samplingdivider = samplingdivider[0].text.strip()
Reto Da Forno's avatar
Reto Da Forno committed
632
                            else:
633
634
635
636
637
638
639
640
641
642
643
644
                                samplingdivider = flocklab.config.get('dispatcher', 'default_sampling_divider') 
                            xmlblock += "\n\t\t<samplingDivider>%s</samplingDivider>"%samplingdivider
                            xmlblock += "\n\t</profConf>\n"
                        xmlblock += "</obsPowerprofConf>\n\n"
                        for obsid in obsids:
                            obsid = int(obsid)
                            obskey = obsdict_id[obsid][0]
                            xmldict_key[obskey][1].write(xmlblock)
                            #logger.debug("Wrote obsPowerprofConf XML for observer ID %s" %obsid)
                else:
                    logger.debug("No <powerProfilingConf> found, not using power profiling service.")
                 
Reto Da Forno's avatar
Reto Da Forno committed
645
                logger.debug("Wrote all observer XML configs.")
646
                
Reto Da Forno's avatar
Reto Da Forno committed
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
                # Close XML files ---
                for xmlpath, xmlfhand in xmldict_key.values():
                    xmlfhand.write("</obsConf>\n")
                    xmlfhand.close()
                    #logger.debug("Closed observer XML config %s"%xmlpath)
                #logger.debug("Closed all observer XML configs.")
                
        # Upload configs to observers and start test ---
        if len(errors) == 0:
            if not db_register_activity(testid, cur, cn, 'start', iter(obsdict_key.keys())):
                msg = "Could not access all needed observers for testid %d." %testid
                errors.append(msg)
                logger.error(msg)
        if len(errors) == 0:
            # -- START OF CRITICAL SECTION where dispatcher accesses used observers
            # Start a thread for each observer which uploads the config and calls the test start script on the observer
            thread_list = []
            errors_queue = queue.Queue()
            for obskey in obsdict_key.keys():
Reto Da Forno's avatar
Reto Da Forno committed
666
                thread = StartTestThread(obskey, obsdict_key, xmldict_key, imagedict_key, errors_queue, testid)
Reto Da Forno's avatar
Reto Da Forno committed
667
668
669
670
671
672
                thread_list.append((thread, obskey))
                thread.start()
                #DEBUG logger.debug("Started thread for test start on observer ID %s" %(str(obsdict_key[obskey][1])))
            # Wait for all threads to finish:
            for (thread, obskey) in thread_list:
                # Wait max 75% of the setuptime:
Reto Da Forno's avatar
Reto Da Forno committed
673
                thread.join(timeout=(flocklab.config.getint('tests','setuptime')*0.75*60))
Reto Da Forno's avatar
Reto Da Forno committed
674
675
                if thread.isAlive():
                    # Timeout occurred. Signal the thread to abort:
Reto Da Forno's avatar
Reto Da Forno committed
676
                    logger.error("Telling thread for test start on observer ID %s to abort..." % (str(obsdict_key[obskey][1])))
Reto Da Forno's avatar
Reto Da Forno committed
677
678
679
680
681
                    thread.abort()
            # Wait again for the aborted threads:
            for (thread, obskey) in thread_list:    
                thread.join(timeout=10)
                if thread.isAlive():
Reto Da Forno's avatar
Reto Da Forno committed
682
                    msg = "Thread for test start on observer ID %s timed out and will be aborted now." % (str(obsdict_key[obskey][1]))
Reto Da Forno's avatar
Reto Da Forno committed
683
684
685
686
687
688
689
690
                    errors.append(msg)
                    logger.error(msg)
            # -- END OF CRITICAL SECTION where dispatcher accesses used observers
            db_unregister_activity(testid, cur, cn, 'start')
                
            # Get all errors (if any). Observers which return errors are not regarded as a general error. In this
            # case, the test is just started without the faulty observers if there is at least 1 observer that succeeded:
            obs_error = []
691
692
            #if not errors_queue.empty():
                #logger.error("Queue with errors from test start thread is not empty. Getting errors...")
Reto Da Forno's avatar
Reto Da Forno committed
693
694
695
            while not errors_queue.empty():
                errs = errors_queue.get()
                for err in errs[1]:
696
                    #logger.error("Error from test start thread for observer %s: %s" %(str(err[2]), str(err[0])))
Reto Da Forno's avatar
Reto Da Forno committed
697
698
699
700
701
702
703
704
705
706
707
708
709
                    obs_error.append(err[2])
                    warnings.append(err[0])
            # Check if there is at least 1 observer which succeeded:
            if len(obs_error) > 0:
                if (len(obsdict_id) == len(set(obs_error))):
                    msg = "None of the requested observers could successfully start the test."
                    errors.append(msg)
                    logger.error(msg)
        
        # Start proxy for serial service ---
        if len(errors) == 0:
            if serialProxyUsed:
                # Start serial proxy:
Reto Da Forno's avatar
Reto Da Forno committed
710
                logger.debug("Starting serial proxy...")
Reto Da Forno's avatar
Reto Da Forno committed
711
                cmd = [flocklab.config.get("dispatcher", "serialproxyscript"), "--notify"]
Reto Da Forno's avatar
Reto Da Forno committed
712
713
714
715
716
                if debug: 
                    cmd.append("--debug")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                rs = p.wait()
                if (rs != 0):
717
                    msg = "Serial proxy for test ID %d could not be started (error code %d)." % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
718
719
                    errors.append(msg)
                    logger.error(msg)
720
                    logger.debug("Executed command was: %s" % (str(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
721
722
723
724
725
726
                else:
                    logger.debug("Started serial proxy.")
    
        # Start obsdbfetcher ---
        if len(errors) == 0:
            logger.debug("Starting DB fetcher...")
Reto Da Forno's avatar
Reto Da Forno committed
727
            cmd = [flocklab.config.get("dispatcher", "fetcherscript"), "--testid=%d"%testid]
Reto Da Forno's avatar
Reto Da Forno committed
728
729
730
731
732
            if debug:
                cmd.append("--debug")
            p = subprocess.Popen(cmd)
            rs = p.wait()
            if rs != 0:
733
                msg = "Could not start database fetcher for test ID %d. Fetcher returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
734
735
                errors.append(msg)
                logger.error(msg)
736
                logger.error("Tried to execute: %s" % (" ".join(cmd)))
737

Reto Da Forno's avatar
Reto Da Forno committed
738
739
        # check if we're still in time
        # 
Reto Da Forno's avatar
Reto Da Forno committed
740
        now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
Reto Da Forno's avatar
Reto Da Forno committed
741
742
        cur.execute("SELECT `serv_tests_key` FROM `tbl_serv_tests` WHERE `serv_tests_key` = %d AND `time_start_wish` <= '%s'" % (testid, now))
        if cur.fetchone() is not None:
743
            msg = "Setup for test ID %d took too much time." % (testid)
Reto Da Forno's avatar
Reto Da Forno committed
744
745
            errors.append(msg)
            logger.error(msg)
746

Reto Da Forno's avatar
Reto Da Forno committed
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
        # Update DB status, set start time ---
        if len(errors) == 0:
            logger.debug("Setting test status in DB to running...")
            flocklab.set_test_status(cur, cn, testid, 'running')
            cur.execute("UPDATE `tbl_serv_tests` SET `time_start_act` = `time_start_wish` WHERE `serv_tests_key` = %d" %testid)
            cn.commit()
        else:
            logger.debug("Setting test status in DB to aborting...")
            flocklab.set_test_status(cur, cn, testid, 'aborting')
            cur.execute("UPDATE `tbl_serv_tests` SET `time_start_act` = `time_start_wish`, `time_end_act` = UTC_TIMESTAMP() WHERE `serv_tests_key` = %d" %testid)
            cn.commit()
        logger.debug("At end of start_test(). Returning...")
        
        # Set a time for the scheduler to check for the test to stop ---
        # This is done using the 'at' command:
        if len(errors) == 0:
            lag = 5
            # avoid scheduling a scheduler around full minute +/- 5s
            if (stoptime.second+lag) % 60 < 5:
                lag = lag + 5 - ((stoptime.second+lag) % 60)
            elif (stoptime.second+lag) % 60 > 55:
                lag = lag + 60 - ((stoptime.second+lag) % 60) + 5
            # Only schedule scheduler if it's the only one at that time
            cmd = ['atq']
            p = subprocess.Popen(cmd,  stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
            out, err = p.communicate()
            rs = p.returncode
            if rs == 0:
775
                #logger.debug("Output of atq is: %s" % (out))
Reto Da Forno's avatar
Reto Da Forno committed
776
777
                stopTimeString = str(stoptime).split()[1]
                if not out or stopTimeString not in out:
778
                    logger.debug("Scheduling scheduler for %s +%ds using at command..." % (stoptime, lag))
Reto Da Forno's avatar
Reto Da Forno committed
779
780
781
                    (fd, tmppath) = tempfile.mkstemp()
                    tmpfile = os.fdopen(fd, 'w')
                    # The at command can only schedule with a minute resolution. Thus let the script sleep for the time required and add some slack:
782
783
                    tmpfile.write("sleep %d;\n" % (stoptime.second+lag))
                    tmpfile.write("%s " % (flocklab.config.get("dispatcher", "schedulerscript")))
Reto Da Forno's avatar
Reto Da Forno committed
784
785
786
787
788
789
790
791
792
793
794
                    if debug:
                        tmpfile.write("--debug ")
                    tmpfile.write(">> /dev/null 2>&1\n")
                    tmpfile.close()
                    # Register the command:
                    cmd = ['at', '-M', '-t', stoptime.strftime('%Y%m%d%H%M'), '-f', tmppath]
                    p = subprocess.Popen(cmd, stderr=subprocess.PIPE)
                    rs = p.wait()
                    # Delete the temp script:
                    os.unlink(tmppath)
                    if rs != 0:
795
                        msg = "Could not schedule scheduler for test ID %d. at command returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
796
797
                        warnings.append(msg)
                        logger.error(msg)
798
                        logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
799
800
801
802
                else:
                    logger.debug("Already scheduler scheduled for %s"%stoptime)
            else:
                logger.debug("Could not execute atq, continue")
803

Reto Da Forno's avatar
Reto Da Forno committed
804
805
        return (errors, warnings)
    except Exception:
806
        msg = "Unexpected error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
807
808
809
        print(msg)
        logger.warn(msg)
        raise
810
811
812
813
814
815
816
817
818
819
### END start_test()



##############################################################################
#
# stop_test
#
##############################################################################
def stop_test(testid, cur, cn, obsdict_key, obsdict_id, abort=False):
Reto Da Forno's avatar
Reto Da Forno committed
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
    errors = []
    warnings = []
    
    try:
        logger.info("Stopping test %d..."%testid)
        
        # Update DB status --- 
        if abort:
            status = 'aborting'
        else:
            status = 'cleaning up'
        logger.debug("Setting test status in DB to %s..." %status)
        flocklab.set_test_status(cur, cn, testid, status)
        
        # Stop serial proxy ---
        # Get the XML config from the database and check if the serial service was used in the test:
        cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
        ret = cur.fetchone()
        if not ret:
            msg = "No XML found in database for testid %d." %testid
            errors.append(msg)
            logger.error(msg)
        else:
843
844
            parser = lxml.etree.XMLParser(remove_comments=True)
            tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
845
            ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
846
847
848
849
850
            logger.debug("Got XML from database.")
            # only stop serialproxy if remote IP specified in xml
            if tree.xpath('//d:serialConf/d:remoteIp', namespaces=ns):
                # Serial service was used. Thus stop the serial proxy:
                logger.debug("Usage of serial service detected. Stopping serial proxy...")
Reto Da Forno's avatar
Reto Da Forno committed
851
                cmd = [flocklab.config.get("dispatcher", "serialproxyscript"), "--notify"]
Reto Da Forno's avatar
Reto Da Forno committed
852
853
854
855
856
                if debug: 
                    cmd.append("--debug")
                p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                rs = p.wait()
                if (rs != 0):
857
                    msg = "Serial proxy for test ID %d could not be stopped. Serial proxy returned %d." % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
858
859
                    errors.append(msg)
                    logger.error(msg)
860
                    logger.debug("Executed command was: %s" % (str(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
                else:
                    logger.debug("Stopped serial proxy.")
        
        # Stop test on observers ---
        if not db_register_activity(testid, cur, cn, 'stop', iter(obsdict_key.keys())):
            msg = "Some observers were occupied while stopping test."
            logger.warn(msg)
            warnings.append(msg)
        # Start a thread for each observer which calls the test stop script on the observer
        logger.info("Stopping test on observers...")
        thread_list = []
        errors_queue = queue.Queue()
        for obskey in obsdict_key.keys():
            thread = StopTestThread(obskey, obsdict_key, errors_queue,testid)
            thread_list.append((thread, obskey))
            thread.start()
            logger.debug("Started thread for test stop on observer ID %s" %(str(obsdict_key[obskey][1])))
        # Wait for all threads to finish:
        for (thread, obskey) in thread_list:
Reto Da Forno's avatar
Reto Da Forno committed
880
            thread.join(timeout=(flocklab.config.getint('tests','cleanuptime')*0.75*60))
Reto Da Forno's avatar
Reto Da Forno committed
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
            if thread.isAlive():
                # Timeout occurred. Signal the thread to abort:
                msg = "Telling thread for test stop on observer ID %s to abort..." %(str(obsdict_key[obskey][1]))
                logger.error(msg)
                warnings.append(msg)
                thread.abort()
        # Wait again for the aborted threads:
        for (thread, obskey) in thread_list:    
            thread.join(timeout=10)
            if thread.isAlive():
                msg = "Thread for test stop on observer ID %s is still alive but should be aborted now." %(str(obsdict_key[obskey][1]))
                errors.append(msg)
                logger.error(msg)
        db_unregister_activity(testid, cur, cn, 'stop')
        # cleanup resource allocation
Reto Da Forno's avatar
Reto Da Forno committed
896
        now = time.strftime(flocklab.config.get("database", "timeformat"), time.gmtime())
Reto Da Forno's avatar
Reto Da Forno committed
897
898
899
900
901
        cur.execute("DELETE FROM tbl_serv_resource_allocation where `time_end` < '%s' OR `test_fk` = %d" % (now, testid))
        cn.commit()
        # Stop fetcher ---
        # This has to be done regardless of previous errors.
        logger.info("Stopping fetcher...")
Reto Da Forno's avatar
Reto Da Forno committed
902
        cmd = [flocklab.config.get("dispatcher", "fetcherscript"),"--testid=%d"%testid, "--stop"]
Reto Da Forno's avatar
Reto Da Forno committed
903
904
905
906
        if debug: 
            cmd.append("--debug")
        p = subprocess.Popen(cmd)
        rs = p.wait()
907
        if rs not in (flocklab.SUCCESS, errno.ENOPKG): # flocklab.SUCCESS (0) is successful stop, ENOPKG (65) means the service was not running. 
908
            msg = "Could not stop database fetcher for test ID %d. Fetcher returned error %d" % (testid, rs)
Reto Da Forno's avatar
Reto Da Forno committed
909
910
            errors.append(msg)
            logger.error(msg)
911
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
912
913
    
        # Get all errors (if any). Observers which return errors are not regarded as a general error.
914
915
        #if not errors_queue.empty():
            #logger.error("Queue with errors from test stop thread is not empty. Getting errors...")
Reto Da Forno's avatar
Reto Da Forno committed
916
917
918
        while not errors_queue.empty():
            errs = errors_queue.get()
            for err in errs[1]:
919
                #logger.error("Error from test stop thread: %s" %(str(err[0])))
Reto Da Forno's avatar
Reto Da Forno committed
920
921
922
923
924
925
926
927
                warnings.append(err[0])
        
        # Set stop time in DB ---
        cur.execute("UPDATE `tbl_serv_tests` SET `time_end_act` = UTC_TIMESTAMP() WHERE `serv_tests_key` = %d" %testid)
        cn.commit()
        
        return (errors, warnings)
    except Exception:
928
        msg = "Unexpected error: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
Reto Da Forno's avatar
Reto Da Forno committed
929
930
931
        print(msg)
        logger.warn(msg)
        raise
932
933
934
935
936
937
938
939
940
### END stop_test()


##############################################################################
#
# prepare_testresults
#
##############################################################################
def prepare_testresults(testid, cur):
Reto Da Forno's avatar
Reto Da Forno committed
941
942
943
944
945
    """    This function prepares testresults for the user. It calls the archiver.
        If several instances of the archiver
        are running, it may take a long time for this function to finish as it will wait
        for these functions to succeed.
    """
946

Reto Da Forno's avatar
Reto Da Forno committed
947
948
949
950
951
952
953
954
955
956
957
    errors = []
        
    logger.debug("Preparing testresults...")
    
    # Check if user wants test results as email ---
    logger.debug("Check if user wants testresults as email...")
    emailResults = False
    # Get the XML config from the database:
    cur.execute("SELECT `testconfig_xml` FROM `tbl_serv_tests` WHERE (`serv_tests_key` = %s)" %testid)
    ret = cur.fetchone()
    if ret:
958
959
        parser = lxml.etree.XMLParser(remove_comments=True)
        tree = lxml.etree.fromstring(bytes(bytearray(ret[0], encoding = 'utf-8')), parser)
Reto Da Forno's avatar
Reto Da Forno committed
960
        ns = {'d': flocklab.config.get('xml', 'namespace')}
Reto Da Forno's avatar
Reto Da Forno committed
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
        logger.debug("Got XML from database.")
        # Check if user wants results as email
        ret = tree.xpath('//d:generalConf/d:emailResults', namespaces=ns)
        if not ret:
            logger.debug("Could not get relevant XML value <emailResults>, thus not emailing results to user.")
        else:
            if (ret[0].text.lower() == 'yes'):
                emailResults = True
    if not emailResults:
        logger.debug("User does not want test results as email.")
    else:
        logger.debug("User wants test results as email. Will trigger the email.")
    
    
    # Archive test results ---
Reto Da Forno's avatar
Reto Da Forno committed
976
    cmd = [flocklab.config.get('dispatcher', 'archiverscript'),"--testid=%d"%testid]
Reto Da Forno's avatar
Reto Da Forno committed
977
978
979
980
981
    if emailResults:
        cmd.append("--email")
    if debug: 
        cmd.append("--debug")
    # Call the script until it succeeds:
Reto Da Forno's avatar
Reto Da Forno committed
982
    waittime = flocklab.config.getint('dispatcher', 'archiver_waittime')
Reto Da Forno's avatar
Reto Da Forno committed
983
984
985
986
    rs = errno.EUSERS
    while rs == errno.EUSERS:
        p = subprocess.Popen(cmd)
        rs = p.wait()
987
        if rs not in (flocklab.SUCCESS, errno.EUSERS): # flocklab.SUCCESS (0) is successful stop, EUSERS (87) means the maximum number of allowed instances is reached. 
988
            msg = "Could not trigger archiver. Archiver returned error %d" % (rs)
Reto Da Forno's avatar
Reto Da Forno committed
989
            logger.error(msg)
990
            logger.error("Tried to execute: %s" % (" ".join(cmd)))
Reto Da Forno's avatar
Reto Da Forno committed
991
992
993
994
995
996
997
998
999
1000
            errors.append(msg)
            return errors
        if rs == errno.EUSERS:
            # Maximum number of instances is reached. Wait some time before calling again.
            logger.info("Archiver returned EUSERS. Wait for %d s before trying again..."%waittime)
            time.sleep(waittime)
    logger.debug("Call to archiver successful.")
    
    logger.debug("Prepared testresults.")
    return errors
For faster browsing, not all history is shown. View entire blame