Commit 6c07ad3b authored by Reto Da Forno's avatar Reto Da Forno
Browse files

feature added: power measurement aggregation on server

parent f8efe257
......@@ -351,7 +351,7 @@ def worker_gpiotracing(queueitem=None, nodeid=None, resultfile_path=None, result
# whole observer DB files.
#
##############################################################################
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, logqueue=None, arg=None):
def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, resultfile_lock=None, logqueue=None, args=None):
try:
_errors = []
cur_p = multiprocessing.current_process()
......@@ -359,7 +359,7 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, resultfi
inputfilename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
if arg and arg == 'rld':
if args and args['format'] == 'rld':
# RLD file format
# simply move the file into the results directory
try:
......@@ -381,7 +381,20 @@ def worker_powerprof(queueitem=None, nodeid=None, resultfile_path=None, resultfi
voltage_ch = rld_data.get_data('V2') - rld_data.get_data('V1') # voltage difference for old file versions (channel order swapped)
else:
voltage_ch = rld_data.get_data('V1') - rld_data.get_data('V2') # voltage difference for new file versions (channel order correct)
rld_dataframe = pd.DataFrame(np.hstack((current_ch, voltage_ch)), index=timeidxunix, columns=['I', 'V'])
# aggregate the data if necessary
if args and args['aggregate'] > 0:
current_aggr = []
voltage_aggr = []
timeidx_aggr = []
aggr_cnt = args['aggregate']
for idx in range(0, len(current_ch), aggr_cnt):
# calculate average values
current_aggr.append(sum(current_ch[idx:idx + aggr_cnt]) / aggr_cnt)
voltage_aggr.append(sum(voltage_ch[idx:idx + aggr_cnt]) / aggr_cnt)
timeidx_aggr.append(sum(timeidxunix[idx:idx + aggr_cnt]) / aggr_cnt)
rld_dataframe = pd.DataFrame(np.hstack((current_aggr, voltage_aggr)), index=timeidx_aggr, columns=['I', 'V'])
else:
rld_dataframe = pd.DataFrame(np.hstack((current_ch, voltage_ch)), index=timeidxunix, columns=['I', 'V'])
rld_dataframe.insert(0, 'observer_id', obsid)
rld_dataframe.insert(1, 'node_id', nodeid)
resultfile_lock.acquire()
......@@ -680,7 +693,7 @@ class FetchObsThread(threading.Thread):
self._logger.debug(self._loggerprefix + "Could not download all results files from observer. Dataloss occurred for this observer.")
self._logger.debug(self._loggerprefix + "Tried to execute %s, result was %d, stdout: %s, error: %s" % (str(cmd), rs, out, err))
else:
self._logger.debug("Downloaded results files from observer.")
#self._logger.debug("Downloaded results files from observer.")
# put a copy to the debug directory
if self._obsfiledebugdir:
for f in copyfilelist:
......@@ -1060,7 +1073,7 @@ def main(argv):
ret = cur.fetchone()
teststarttime = ret[0]
teststoptime = ret[1]
ppFileFormat = None
powerprofconf = {} # config dict for power profiling (file format and aggregation rate)
# Find out which services are used to allocate working threads later on ---
# Get the XML config from the database and check which services are used in the test.
......@@ -1085,14 +1098,25 @@ def main(argv):
logger.debug("Usage of %s detected." % service)
else:
servicesUsed_dict[service] = False
# check which file format the user wants for the power profiling
# NOTE: This implementation assumes that the same file format is configured for all observers. In case multiple powerProfilingConf blocks with differing file formats are present, the file format found first will be used.
# create a dict of file formats and aggregation rates
if servicesUsed_dict['powerprofiling']:
if tree.xpath('//d:powerProfilingConf/d:fileFormat', namespaces=ns):
ppFileFormat = tree.xpath('//d:powerProfilingConf/d:fileFormat', namespaces=ns)[0].text
logger.debug("User wants file format %s for power profiling." % (ppFileFormat))
else:
logger.debug("Element <fileFormat> not detected.")
for powerProfConf in tree.xpath('//d:powerProfilingConf', namespaces=ns):
obsList = [int(obsIdStr) for obsIdStr in powerProfConf.xpath('.//d:obsIds', namespaces=ns)[0].text.split()]
# set default config
for obs in obsList:
if not obs in powerprofconf:
powerprofconf[obs] = { 'format': 'csv', 'aggregate': 0 }
samplingRate = powerProfConf.xpath('.//d:samplingRate', namespaces=ns)
samplingRate = int(samplingRate[0].text) if samplingRate else 1000
aggregate = powerProfConf.xpath('.//d:aggregate', namespaces=ns)
fileFormat = powerProfConf.xpath('.//d:fileFormat', namespaces=ns)
if aggregate and int(aggregate[0].text) > 0:
aggregate = int(aggregate[0].text)
for obs in obsList:
powerprofconf[obs]['aggregate'] = int(samplingRate / aggregate)
if fileFormat:
for obs in obsList:
powerprofconf[obs]['format'] = fileFormat[0].text
# extract cpuSpeed for datatracing (for all observer configured for datatracing)
dtCpuSpeed = {}
if servicesUsed_dict['datatrace']:
......@@ -1155,8 +1179,6 @@ def main(argv):
elif service == 'gpiotracing':
header = 'timestamp,observer_id,node_id,pin_name,value\n'
elif service == 'powerprofiling':
if ppFileFormat == 'rld':
continue # don't open a csv file
header = 'timestamp,observer_id,node_id,current_mA,voltage_V\n'
elif service == 'serial':
header = 'timestamp,observer_id,node_id,direction,output\n'
......@@ -1282,7 +1304,8 @@ def main(argv):
worker_f = worker_gpiotracing
elif (re.search("^powerprofiling_[0-9]{14}\.rld$", f) != None):
pool = service_pools_dict['powerprofiling']
worker_args = [nextitem, nodeid, testresultsfile_dict['powerprofiling'][0], testresultsfile_dict['powerprofiling'][1], logqueue, ppFileFormat]
ppargs = powerprofconf[obsid] if obsid in powerprofconf else None
worker_args = [nextitem, nodeid, testresultsfile_dict['powerprofiling'][0], testresultsfile_dict['powerprofiling'][1], logqueue, ppargs]
worker_f = worker_powerprof
elif (re.search("^serial_[0-9]{14}\.db$", f) != None):
pool = service_pools_dict['serial']
......
......@@ -739,31 +739,43 @@ def main(argv):
print("Element powerProfilingConf: Some observer IDs have been used but do not have a targetConf element associated with them.")
print("List: %s, used IDs: %s" % (str(obsidlist), str(ids)))
errcnt = errcnt + 1
# Check offset tag (mandatory element)
rs = tree.xpath('//d:powerProfilingConf/d:offset', namespaces=ns)
total_samples = 0
for elem in rs:
ppOffset = int(elem.text)
elem2 = elem.getparent().find('d:duration', namespaces=ns)
if elem2 != None:
ppDuration = int(elem2.text.strip())
for powerProfConf in tree.xpath('//d:powerProfilingConf', namespaces=ns):
obsList = [int(obsIdStr) for obsIdStr in powerProfConf.xpath('.//d:obsIds', namespaces=ns)[0].text.split()]
samplingRate = get_sampling_rate(powerProfConf, ns)
offset = int(powerProfConf.xpath('.//d:offset', namespaces=ns)[0].text) # mandatory field
duration = powerProfConf.xpath('.//d:duration', namespaces=ns)
if duration:
ppDuration = int(duration[0].text.strip())
else:
# assume sampling duration = test duration
ppDuration = testDuration - ppOffset
total_samples = total_samples + ppDuration * get_sampling_rate(elem.getparent(), ns)
if (ppOffset + ppDuration) > testDuration:
ppDuration = testDuration - offset # sampling duration = test duration minus offset
# make sure the sampling window is within the test runtime
if (offset + ppDuration) > testDuration:
if not quiet:
print(("Line %d: element duration/offset: Profiling lasts longer than test." % (elem2.sourceline)))
print(("Line %d: element duration/offset: Profiling lasts longer than test." % (duration[0].sourceline)))
errcnt = errcnt + 1
# sum up the total number of samples that will be collected
total_samples = total_samples + ppDuration * samplingRate * len(obsList)
aggregate = powerProfConf.xpath('.//d:aggregate', namespaces=ns)
fileFormat = powerProfConf.xpath('.//d:fileFormat', namespaces=ns)
if aggregate:
# make sure aggregate < samplingRate
ppAggregate = int(aggregate[0].text)
if ppAggregate >= samplingRate or ppAggregate < 1:
if not quiet:
print(("Line %d: element aggregate: Must be smaller than the sampling rate." % (aggregate[0].sourceline)))
errcnt = errcnt + 1
if fileFormat:
if fileFormat[0].text == "rld" and aggregate:
if not quiet:
print(("Line %d: element fileFormat: Cannot use format 'rld' with data aggregation." % (fileFormat[0].sourceline)))
errcnt = errcnt + 1
# check total number of samples (for now, just multiply the total by the number of observers)
if ids:
total_samples = total_samples * len(ids)
if total_samples > flocklab.config.getint('tests', 'powerprofilinglimit'):
if not quiet:
print(("Invalid combination of power profiling duration and sampling rate: the total amount of data to collect is too large (%d samples requested, limit is %d)." % (total_samples, flocklab.config.getint('tests', 'powerprofilinglimit'))))
errcnt = errcnt + 1
if total_samples and total_samples > flocklab.config.getint('tests', 'powerprofilinglimit'):
if not quiet:
print(("Invalid combination of power profiling duration and sampling rate: the total amount of data to collect is too large (%d samples requested, limit is %d)." % (total_samples, flocklab.config.getint('tests', 'powerprofilinglimit'))))
errcnt = errcnt + 1
#===========================================================================
# All additional tests finished. Clean up and exit.
#===========================================================================
......
......@@ -300,10 +300,11 @@
<xs:sequence>
<xs:element name="obsIds" type="obsIdListRestType"/>
<xs:element name="offset" type="offsetSecsType" minOccurs="1" maxOccurs="1"/>
<xs:choice minOccurs="0" maxOccurs="3">
<xs:choice minOccurs="0" maxOccurs="4">
<xs:element name="fileFormat" type="profConfFileFormat" minOccurs="0" maxOccurs="1"/>
<xs:element name="duration" type="powerProfDurationType" minOccurs="0" maxOccurs="1"/>
<xs:element name="samplingRate" type="powerProfSamplingRateType" minOccurs="0" maxOccurs="1"/>
<xs:element name="aggregate" type="powerProfSamplingRateType" minOccurs="0" maxOccurs="1"/>
</xs:choice>
</xs:sequence>
</xs:complexType>
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment