To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

test_to_linkmap.py 11.4 KB
Newer Older
1
#! /usr/bin/env python3
2 3 4 5 6 7

__author__		= "Christoph Walser <walserc@tik.ee.ethz.ch>, Adnan Mlika"
__copyright__	= "Copyright 2010, ETH Zurich, Switzerland"
__license__		= "GPL"


8
import sys, os, getopt, tempfile, shutil, re, time, errno, io, logging, traceback, __main__, csv, tarfile
9
from datetime import datetime
Reto Da Forno's avatar
Reto Da Forno committed
10
from struct import *
11 12 13
# Import local libraries
from lib.flocklab import SUCCESS
import lib.flocklab as flocklab
14 15 16 17 18


### Global variables ###
###
scriptname = os.path.basename(__main__.__file__)
19
scriptpath = os.path.dirname(os.path.abspath(sys.argv[0]))
20 21 22
name = "test_to_linkmap"
###

23 24
logger = None
config = None
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56


##############################################################################
#
# Error classes
#
##############################################################################
class Error(Exception):
	""" Base class for exception. """
	pass
### END Error classes


##############################################################################
#
# Node class
#
##############################################################################
class Node():
	def __init__(self, obsid, id):
		self.nodeid = id
		self.obsid = obsid
		self.stat = {}
		self.rssi = {}
		
	def addStats(self, sender_id, num_messages, num_received):
		if not sender_id in self.stat:
			self.stat[sender_id] = []
		self.stat[sender_id].append((sender_id, num_messages, num_received))
	
	def getPRR(self):
		prr = []
57
		for sender,statlist in self.stat.items():
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
			prr_rec = 0
			prr_tot = 0
			for (sender_id, num_messages, num_received) in statlist:
				prr_rec = prr_rec + num_received
				prr_tot = prr_tot + num_messages
			prr.append((self.obsid, sender, float(prr_rec) / prr_tot, prr_tot))
		return prr
		
	def addRssi(self, channel, level, ouccurences):
		if not channel in self.rssi:
			self.rssi[channel] = {}
		self.rssi[channel][level] = ouccurences
		
	def getRssi(self):
		return self.rssi
### END Node class


##############################################################################
#
# TestToLinkmap
#
##############################################################################
def TestToLinkmap(testid=None, cn=None, cur=None):
	
	errors = []
	_serial_service_file = None
	nodes = {}
	starttime = None
	stoptime = None
	channels = []
	
	logger.debug("Starting to create linkmap for test ID %s..."%testid)
	
	# Get test results from archive --- 
	archive_path = "%s/%s%s"%(config.get('archiver','archive_dir'), testid, config.get('archiver','archive_ext'))
	if not os.path.exists(archive_path):
		msg = "Archive path %s does not exist, removing link measurement." % archive_path
		cur.execute("DELETE FROM `tbl_serv_web_link_measurements` WHERE `test_fk` = %s" % testid)
		logger.error(msg)
		errors.append(msg)
		return errors
	
	# Extract serial service results file ---
	logger.debug("Extracting serial service file from archive...")
	tempdir = tempfile.mkdtemp()
	archive = tarfile.open(archive_path, 'r:gz')
	for f in archive.getmembers():
		if re.search("serial[_]?", f.name) is not None:
			archive.extract(f, tempdir)
			_serial_service_file = "%s/%s" % (tempdir, f.name)
			logger.debug("Found serial service file in test archive.")
			break
	archive.close()
	if _serial_service_file is None:
		msg =  "Serial service file could not be found in archive %s."%(archive_path)
		logger.error(msg)
		errors.append(msg)
		return errors
	
	# Process CSV file ---
	logger.debug("Processing CSV file...")
Reto Da Forno's avatar
Reto Da Forno committed
120
	packetreader = csv.reader(open(_serial_service_file, 'r'), delimiter=',')
121 122 123 124 125 126
	for packetinfo in packetreader:
		if re.search("^observer_id", packetinfo[1]):
			continue
		# nx_uint16_t num_messages;
		# nx_uint16_t sender_id;
		# nx_uint16_t num_received;
Reto Da Forno's avatar
Reto Da Forno committed
127
		packet = bytes.fromhex(packetinfo[4])
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
		data = unpack(">7xB%dx" % (len(packet) - 8), packet)
		if data[0] == 7:
			# link measurement
			data = unpack(">8xHHH",packet)
			#print "%s: src:%d dst:%s %d/%d" % (packetinfo[1], data[1], packetinfo[2], data[2], data[0])
			if not int(packetinfo[2]) in nodes:
				nodes[int(packetinfo[2])] = Node(int(packetinfo[1]), int(packetinfo[2]))
			nodes[int(packetinfo[2])].addStats(data[1], data[0], data[2])
			if starttime is None or starttime > float(packetinfo[0]):
				starttime = float(packetinfo[0])
			if stoptime is None or stoptime < float(packetinfo[0]):
				stoptime = float(packetinfo[0])
		elif data[0] == 8:
			# RSSI scan
			data = unpack(">8xBHH",packet)
			# print "RSSI scan: %d %d %d" % (data[0], data[1] - 127 - 45, data[2])
			if not int(packetinfo[2]) in nodes:
				nodes[int(packetinfo[2])] = Node(int(packetinfo[1]), int(packetinfo[2]))
			nodes[int(packetinfo[2])].addRssi(data[0], data[1], data[2])
			if not data[0] in channels:
				channels.append(data[0])
	logger.debug("Processed CSV file.")
	
	# Determine start/stop time ---
	if (starttime is None) or (stoptime is None):
153 154 155 156 157 158 159 160 161
		msg = "Could not determine start or stop time of link test ID %s." % testid
		filesize = os.path.getsize(_serial_service_file)
		if filesize < 100:
			# file size is less than 100 bytes (empty file) -> test failed
			ret = flocklab.set_test_status(cur, cn, testid, 'failed')
			if ret != 0:
				msg += " Could not set test status to failed.\n"
			else:
				msg += " File size is %u bytes, test status set to failed.\n" % filesize
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
		logger.error(msg)
		errors.append(msg)
		return errors
	# structure: list, [[sum(received packets from node i at j)],[],[]...]
	# structure: list, [[sum(stat packets node j about i)],[],[]...]
	
	# Get platform info ---
	logger.debug("Getting platform info...")
	sql = """	SELECT `c`.`platforms_fk`, `d`.`name`, `a`.`description`
			FROM 
				`tbl_serv_tests` as `a`
				LEFT JOIN `tbl_serv_map_test_observer_targetimages` as `b` ON (`a`.serv_tests_key = `b`.test_fk) 
				LEFT JOIN `tbl_serv_targetimages` AS `c` ON (`b`.`targetimage_fk` = `c`.`serv_targetimages_key`)
				LEFT JOIN `tbl_serv_platforms` AS `d` ON (`c`.`platforms_fk` = `d`.`serv_platforms_key`)
			WHERE `a`.serv_tests_key = %s
			LIMIT 1
		"""
Reto Da Forno's avatar
Reto Da Forno committed
179
	cur.execute(sql % str(testid))
180 181 182 183 184 185 186 187 188 189
	ret = cur.fetchall()
	platform_fk = ret[0][0]
	platform_name = ret[0][1]
	# search for structure (Radio:*) in description
	platform_radio = re.search('\(Radio:([^)]*)\)', ret[0][2])
	if platform_radio is not None:
		platform_radio = platform_radio.group(1)
		
	# Write XML file ---
	logger.debug("Writing XML file...")
190
	linkmap = io.StringIO()
191 192 193 194
	linkmap.write('<?xml version="1.0" encoding="UTF-8" ?>\n<network platform="%s"' % platform_name)
	if platform_radio is not None:
		linkmap.write(' radio="%s"' % platform_radio)
	linkmap.write('>')
195
	for receiver, node in nodes.items():
196 197 198 199 200 201 202 203 204 205
		nodeprr = node.getPRR()
		for (obsid, sender, prr, numpkt) in nodeprr:
			if prr > 0:
				if sender in nodes:
					sender_obs_id = str(nodes[sender].obsid)
				else:
					sender_obs_id = "?"
				linkmap.write('<link src="%s" dest="%d" prr="%0.4f" numpackets="%d" />' % (sender_obs_id, obsid, prr, numpkt))
	for ch in channels:
		linkmap.write('<rssiscan channel="%d">' % ch)
206
		for receiver, node in nodes.items():
207 208 209
			rssi = node.getRssi()
			if ch in rssi:
				linkmap.write('<rssi nodeid="%s" frq="' % (node.obsid))
210
				linkmap.write(','.join(map(str, iter(rssi[ch].values()))))
211 212 213 214 215 216
				linkmap.write('" />')	
		linkmap.write('</rssiscan>')
	linkmap.write('</network>')
	
	# Store XML file in  DB ---
	logger.debug("Storing XML file in DB...")
Reto Da Forno's avatar
Reto Da Forno committed
217
	cur.execute("DELETE FROM `tbl_serv_web_link_measurements` WHERE `test_fk`=%s" % str(testid))
218
	if platform_radio is None:
Reto Da Forno's avatar
Reto Da Forno committed
219
		cur.execute("INSERT INTO `tbl_serv_web_link_measurements` (`test_fk`, `platform_fk`, `links`, `begin`, `end`) VALUES (%s,%s,'%s','%s','%s')" % ((str(testid), platform_fk, linkmap.getvalue(), datetime.fromtimestamp(starttime), datetime.fromtimestamp(stoptime))))
220
	else:
Reto Da Forno's avatar
Reto Da Forno committed
221
		cur.execute("INSERT INTO `tbl_serv_web_link_measurements` (`test_fk`, `platform_fk`, `links`, `begin`, `end`, `radio`) VALUES (%s,%s,'%s','%s','%s',%s)" % (str(testid), platform_fk, linkmap.getvalue(), datetime.fromtimestamp(starttime), datetime.fromtimestamp(stoptime), platform_radio))
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
	cn.commit()	

	# Remove temp dir ---
	logger.debug("Removing %s..."%tempdir)
	shutil.rmtree(tempdir)
	
	logger.debug("Created linkmap for test ID %s"%testid)
	return errors
### END TestToLinkmap()


##############################################################################
#
# Usage
#
##############################################################################
def usage():
239 240 241 242
	print("Usage: %s [--debug] [--help]" %scriptname)
	print("Options:")
	print("  --debug\t\t\tOptional. Print debug messages to log.")
	print("  --help\t\t\tOptional. Print this help.")
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
### END usage()


##############################################################################
#
# Main
#
##############################################################################
def main(argv):
	global logger
	global config
	global name
	
	errors = []
	_serial_service_file = None
	
	# Set timezone to UTC:
	os.environ['TZ'] = 'UTC'
	time.tzset()
	
	# Get logger:
	logger = flocklab.get_logger(loggername=scriptname, loggerpath=scriptpath)
	
	# Get the config file:
	config = flocklab.get_config(configpath=scriptpath)
	if not config:
		msg = "Could not read configuration file. Exiting..."
		flocklab.error_logandexit(msg, errno.EAGAIN, name, logger, config)
	#logger.debug("Read configuration file.")
	
	# Get the arguments:
	try:
275 276
		opts, args = getopt.getopt(argv, "hd", ["help", "debug"])
	except getopt.GetoptError as err:
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
		logger.warn(str(err))
		usage()
		sys.exit(errno.EINVAL)
	except:
		msg = "Error when getting arguments: %s: %s" %(str(sys.exc_info()[0]), str(sys.exc_info()[1]))
		flocklab.error_logandexit(msg, errno.EAGAIN, name, logger, config)

	for opt, arg in opts:
		if opt in ("-h", "--help"):
			usage()
			sys.exit(SUCCESS)
		elif opt in ("-d", "--debug"):
			logger.setLevel(logging.DEBUG)
		else:
			logger.warn("Wrong API usage")
			sys.exit(errno.EINVAL)
	
	# Connect to the DB ---
	try:
		(cn, cur) = flocklab.connect_to_db(config, logger)
	except:
		msg = "Could not connect to database"
		flocklab.error_logandexit(msg, errno.EAGAIN, name, logger, config)
	#logger.debug("Connected to database")
	
	# Query database for pending link measurements ---
	testids = []
	cur.execute("SELECT `test_fk` FROM `tbl_serv_web_link_measurements` LEFT JOIN `tbl_serv_tests` ON (`test_fk` = `serv_tests_key`) WHERE `links` is NULL AND `test_status` IN ('synced', 'finished')")
	ret = cur.fetchall()
	for row in ret:
307
		testids.append(int(row[0]))
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323
	if (len(testids) == 0):
		logger.debug("No pending test for evaluation")
	else:
		logger.debug("Test IDs to process: %s\n"%str(testids))
		for testid in testids:
			try:
				ret = TestToLinkmap(testid=testid, cn=cn, cur=cur)
				if len(ret) == 0:
					# No errors occurred, thus mark the test for deletion:
					logger.debug("Mark test %s for deletion."%str(testid))
					flocklab.set_test_status(cur, cn, testid, 'todelete')
				else:
					logger.debug("Errors detected while processing test %s."%str(testid))
					for err in ret:
						errors.append(err)
			except:
324
				msg = "Encountered error for test ID %d: %s: %s" % (testid, str(sys.exc_info()[0]), str(sys.exc_info()[1]))
325 326 327 328 329 330 331 332 333 334 335 336
				errors.append(msg)
				logger.error(msg)
				continue
	if (len(errors)):
		msg = ""
		for err in errors:
			msg += err	
		flocklab.error_logandexit(msg, errno.EAGAIN, name, logger, config)
		
	logger.debug("Finished. Exit program.")
	cn.close()
	sys.exit(SUCCESS)
337 338 339
### END main()


340 341 342
if __name__ == "__main__":
	try:
		main(sys.argv[1:])
343 344
	except Exception:
		msg = "Encountered error: %s: %s\n%s\nCommand line was: %s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc(), str(sys.argv))
345
		flocklab.error_logandexit(msg, errno.EAGAIN, name, logger, config)
346