Commit 07bb087d authored by Andreas Biri's avatar Andreas Biri 👻
Browse files

initial commit

parents
# STeC: Exploiting Spatial and Temporal Correlation for Event-based Communication in WSNs
This repository contains scripts for the analysis and simulation of STeC.
| Folder | Description |
|----------------------------------------------------------|---------------------------|
| [*analysis/bin*](./analysis/bin) | Contains scripts to schedule, download, analyse and evaluate protocol runs and historic data from the deployments |
| [*analysis/data_management*](./analysis/data_management) | Contains scripts to interface with the ETHZ GSN and pre-process the data for other scripts and notebooks |
| [*analysis/notebooks*](./analysis/notebooks) | Contains notebooks to visualize data from real protocol runs and historic data from the deployments |
| [*system_model/des*](./system_model/des) | Contains the discrete event simulation for all protocols treated in the evaluation as well as the synthetic traces |
| [*system_model/notebooks*](./system_model/notebooks) | Contains notebooks to visualize data from simulated protocol runs |
*Notice:* Occasionally, STeC is still referred to by its working name _EBC_ in some of the variables in this script. To remain compatible with previously generated files and prevent accidental incompatibilities, these variables have not been altered.
For more information, please visit the [STeC wiki](https://gitlab.ethz.ch/tec/public/stec/stec-wiki).
# STeC Testing & Data Analysis Scripts
Scripts for automatic testing of STeC on FlockLab as well as to automatically analyse FlockLab tests and statistica data from GSN.
## Interface
```
usage: stec-flocklab.py [-h] [-a INT] [-d INT] [-np] [-n INT] [-e INT] [-g INT] [-bs INT] [-max INT] [-bsimg INT] [-p STRING] [-t] [-tp STRING]
usage: stec-flocklab.py [options]
optional arguments:
-h, --help Show this help message and exit
-a INT, --abort-test INT Aborts the currently running test with the given ID.
-d INT, --delete-test INT Deletes the test with the given ID.
-np, --no-powertracing Disables power tracing for this test.
-n INT, --nodes INT Number of nodes exclusive BaseStation (default: 2).
-e INT, --events INT Number of scheduled events (default: 1).
-g INT, --gap INT Gap between events [ms] (default: 500).
-bs INT, --basestation INT ID of node that will be the BaseStation (default: 1).
-max INT, --maximum-points INT Limit number of data points to be fetched from GSN (default: unlimited).
-bsimg INT, --basestation-image INT FlockLab image for BaseStation (default: None).
-p STRING, --path STRING Path to .elf files to be uploaded (default: './').
-t, --trace Use a trace file to simulate events.
-tp STRING, --trace-path STRING Path to .log file of trace.
```
```
usage: stec-analysis.py [-h] [-v] [-p STRING] [-s] [-l INT] [-r] [-a] [-g]
usage: stec-analysis.py [options]
optional arguments:
-h, --help Show this help message and exit
-v, --verify-tests Verify locally scheduled tests.
-p STRING, --path STRING Path to .log file of tests.
-s, --statistics Display statistics on co-detections.
-l INT, --codet-length INT Length of a co-detection [ms].
-r, --include-rain Include rainy days in statistics.
-a, --auto-extend Automatically extend the event duration if it is still running.
-g, --geophone-data Extract Geophone features from log files.
```
```
usage: stec-utils.py [-h] [-g] [-p STRING] [-c INT] [-e INT] [-d INT] [-i INT] [-s]
usage: stec-utils.py [options]
optional arguments:
-h, --help Show this help message and exit
-g, --generate-trace Generate new trace.
-p STRING, --path STRING Path to files (default: './').
-c INT, --nr-codets INT Number of generated co-detections.
-e INT, --nr-evts INT Number of generated events per co-detection.
-d INT, --prop-delay INT Maximal propagation delay of co-detections [ms] (default: 100ms).
-i INT, --event-interval INT Average event interval [s] (default: 20s).
-s, --show-metrics Displays the information of a stored metric.
```
## Further resources
### GSN web interface
- [A decade of detailed observations in steep bedrock permafrost at Matterhorn Hörnligrad (Zermatt, CH) - Weber et al. 19](https://doi.org/10.3929/ethz-b-000323342)
- [GSN Web-interface @ GitHub](https://github.com/LSIR/gsn/wiki/Web-Interface#multidata)
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Copyright (c) 2021, ETH Zurich, Computer Engineering Group (TEC)
"""
# Analysis script for the STeC project
#
# Author: abiri
# Date: 03.11.21
import logging
import configparser
import sys
import datetime as dt
import flocklab
from ast import literal_eval
from argparse import ArgumentParser
from os.path import isfile, isdir, abspath
sys.path.append("../") # FIXME: Work around if not built as a package
from data_management.data_manager import DataManager
from data_management.data_processing import DataProcessor
# ----------------------------------------------------------------------------------------------------------------------
# General consts and variables
# ----------------------------------------------------------------------------------------------------------------------
# Default params for command line arguments that are mandatory
DEFAULT_CONFIG_FILE = '../stec.conf'
DEFAULT_LOG_LEVEL = 'INFO'
DEPLOYMENT = 'dirruhorn'
VS_ACOUSTIC_METADATA = '_dpp_geophone_acq__conv'
VS_ACOUSTIC_DATA = '_dpp_geophone_adcData__conv'
# ----------------------------------------------------------------------------------------------------------------------
# Classes and Functions
# ----------------------------------------------------------------------------------------------------------------------
class STECAnalyser:
def __init__(self):
self.LOG_FORMAT = None
self.LOG_DATE_FORMAT = None
self.LOG_FILE_NAME = None
self.LOG_FILE_LEVEL = None
self.LOG_STREAM_LEVEL = None
self.DATA_START_TIME = dt.datetime.strptime("01/01/2017", "%d/%m/%Y") # type: dt.datetime
self.DATA_END_TIME = dt.datetime.strptime("01/01/2020", "%d/%m/%Y") # type: dt.datetime
self.DATES_EXCLUDED = None
# Load config
self.load_config(DEFAULT_CONFIG_FILE)
self._DataMgr = DataManager(deployment=DEPLOYMENT, config_file=DEFAULT_CONFIG_FILE, project_name='stec', start_time=self.DATA_START_TIME, end_time=self.DATA_END_TIME)
self._DataProc = DataProcessor(config_file=DEFAULT_CONFIG_FILE)
def load_config(self, config_file=None):
if config_file is None:
config_file = DEFAULT_CONFIG_FILE
logging.info('Using default configuration file: %s' % (config_file,))
if not isfile(config_file):
logging.warning('Config file (%s) not found' % (config_file,))
else:
config_file = abspath(config_file)
# Read config file for other options
config = configparser.ConfigParser()
config.optionxform = str # Case sensitive
config.read(config_file)
section_common = 'stec'
section_special = 'stec-bins'
try:
# Read options from config
for name, value in (config.items(section_common) + config.items(section_special)):
value = value.strip()
if value != '':
if name == 'log_format':
self.LOG_FORMAT = str(value)
elif name == 'log_date_format':
self.LOG_DATE_FORMAT = str(value)
elif name == 'log_file_name':
self.LOG_FILE_NAME = str(value)
elif name == 'log_file_level':
self.LOG_FILE_LEVEL = getattr(logging, value.upper())
elif name == 'log_stream_level':
self.LOG_STREAM_LEVEL = getattr(logging, value.upper())
elif name == 'data_start_time':
self.DATA_START_TIME = dt.datetime.strptime(value, "%d/%m/%Y")
elif name == 'data_end_time':
self.DATA_END_TIME = dt.datetime.strptime(value, "%d/%m/%Y")
elif name == 'dates_excluded':
self.DATES_EXCLUDED = literal_eval(value)
else:
logging.warning('Unknown config option in section [%s]: %s' % (section_common + "/" + section_special, name,))
except configparser.NoSectionError:
raise TypeError('No [%s] section specified in %s' % (section_common + "/" + section_special, config_file,))
def verify_tests(self, log_path):
# Get all locally scheduled tests
logs = self._DataMgr.get_logged_tests(log_path)
# Fetch corresponding tests
fl = flocklab.Flocklab()
test_ids = logs['test_id']
download_directory = './'
for test_id in test_ids:
if isdir(download_directory + str(test_id)):
logging.info('Skipping test %d as already downloaded' % (test_id,))
else:
# Fetch test from FlockLab - catch FlockLab errors directly to not affect other tests
try:
if 'Successfully downloaded & extracted:' in fl.getResults(test_id):
logging.info('Successfully downloaded test %d' % (test_id,))
else:
logging.warning('Failed to download test %d' % (test_id,))
except flocklab.flocklab.FlocklabError as exception:
logging.warning('Experienced FlockLab error for test %d: %s' % (test_id, exception))
# Extract metrics
metrics_tracing, metrics_serial, metrics_power = self._DataMgr.extract_test_metrics(test_ids, download_directory)
# Compare metrics
self._DataProc.compare_metrics(test_input=logs, test_output=[metrics_tracing, metrics_serial, metrics_power])
# Extract metrics for comparison with simulation
if metrics_power is not None:
self._DataMgr.store_metrics(test_input=logs, test_output=[metrics_tracing, metrics_serial, metrics_power], path=download_directory)
return True
def fetch_statistics(self, codet_length_ms=0, include_rain=False, auto_extend=False):
# Create URL including conditions
fields = 'device_id,start_time,end_time,generation_time,trg_duration'
url = self._DataMgr.assemble_gsn_url(VS_ACOUSTIC_METADATA, fields)
# Get dates to be excluded (usually due to maintenance)
excluded_dates = self.DATES_EXCLUDED
# Fetch data
try:
df = self._DataMgr.fetch_csv_data(url)
# Mark and filter days with precipitation
prec_dates = self._DataMgr.fetch_rain_dates(include_gruengarten=True, include_breithorn=False, include_grabengufer=False)
self._DataProc.mark_rain_dates(df, prec_dates)
if not include_rain:
if len(prec_dates):
excluded_dates = excluded_dates + prec_dates
logging.info("Added %d days to list of excepted dates due to precipitation" % (len(prec_dates)))
except Exception as ex_acoustic:
logging.error("Could not fetch CSV data from path:\n%s\n\n%s" % (url, ex_acoustic,))
return
logging.debug("Received data from URL:\n%s" % (url,))
if codet_length_ms > 0:
events = self._DataProc.compute_event_stats(df, dates_excluded=excluded_dates, propagation_ms=codet_length_ms, auto_extend_evt=auto_extend)
codets = self._DataProc.find_codetections(df, dates_excluded=excluded_dates, propagation_ms=codet_length_ms, auto_extend_evt=auto_extend)
else:
events = self._DataProc.compute_event_stats(df, dates_excluded=excluded_dates, auto_extend_evt=auto_extend)
codets = self._DataProc.find_codetections(df, dates_excluded=excluded_dates, auto_extend_evt=auto_extend)
# Print statistics
self._DataProc.print_event_stats(events)
self._DataProc.print_codet_stats(codets, force_print=True)
self._DataProc.print_meas_stats( codets, force_print=True)
def extract_geophone_features(self, path):
if path is None:
raise ValueError('Path to files must not be None')
elif not isdir(path):
raise ValueError('Path \'%s\' must point to a directory' % (path,))
# Get data
[df_acq, df_com, df_evts, df_health] = self._DataMgr.extract_geophone_data(path)
# Store dataframes
file_path = path + '/geophone_acq.pkl'
df_acq.to_pickle(file_path)
file_path = path + '/geophone_com.pkl'
df_com.to_pickle(file_path)
file_path = path + '/geophone_events.pkl'
df_evts.to_pickle(file_path)
file_path = path + '/geophone_health.pkl'
df_health.to_pickle(file_path)
# Store acquisitions as CSV
df_acq_converted = self._DataMgr.convert_geophone_data(df_acq)
file_path = path + '/cache_etz_dpp_geophone_acq_logs__conv_20201019_20210517.csv'
df_acq_converted.to_csv(file_path)
# ----------------------------------------------------------------------------------------------------------------------
# Main
# ----------------------------------------------------------------------------------------------------------------------
if __name__ == "__main__":
# Initialization
try:
# Create STECAnalyser to store parameters
analyser = STECAnalyser()
# Parse arguments
parser = ArgumentParser(description='usage: %(prog)s [options]')
parser.add_argument('-v', '--verify-tests', dest='verify', action='store_true',
help='Verify locally scheduled tests.')
parser.add_argument('-p', '--path', type=str, dest='path',
help='Path to .log file of tests.', metavar='STRING')
parser.add_argument('-s', '--statistics', dest='stats', action='store_true',
help='Display statistics on co-detections.')
parser.add_argument('-l', '--codet-length', type=int, dest='codet_length_ms', default=0,
help='Length of a co-detection [ms].', metavar='INT')
parser.add_argument('-r', '--include-rain', dest='include_rain', action='store_true',
help='Include rainy days in statistics.')
parser.add_argument('-a', '--auto-extend', dest='auto_extend_evt', action='store_true',
help='Automatically extend the event duration if it is still running.')
parser.add_argument('-g', '--geophone-data', dest='extract_geophone_features', action='store_true',
help='Extract Geophone features from log files.')
args = parser.parse_args()
# Check arguments
if args.path is not None and not isinstance(args.path, str):
raise TypeError('Invalid path to .log files: %s' % (args.path,))
except Exception as ex:
logging.error('Experienced an error: %s' % (ex,))
logging.shutdown()
# Exit program
sys.exit(1)
# Initialize logger
if analyser.LOG_FORMAT is None:
analyser.LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
if analyser.LOG_DATE_FORMAT is None:
analyser.LOG_DATE_FORMAT = '%H:%M:%S'
# Setup handlers for root logger
root = logging.getLogger()
root.setLevel(DEFAULT_LOG_LEVEL)
formatter = logging.Formatter(fmt=analyser.LOG_FORMAT, datefmt=analyser.LOG_DATE_FORMAT)
# Clear old handlers and setup new ones
if root.hasHandlers():
root.handlers.clear()
if analyser.LOG_FILE_LEVEL is not None:
# Log root logger to file (FileHandler)
if analyser.LOG_FILE_NAME is not None:
handler = logging.FileHandler(filename=analyser.LOG_FILE_NAME)
handler.setLevel(analyser.LOG_FILE_LEVEL)
handler.setFormatter(formatter)
root.addHandler(handler)
else:
logging.warning('Log file name is not defined, skipping logging...')
if analyser.LOG_STREAM_LEVEL is not None:
# Pipe root logger to stdout (StreamHandler)
handler = logging.StreamHandler()
handler.setLevel(analyser.LOG_STREAM_LEVEL)
handler.setFormatter(formatter)
root.addHandler(handler)
# Print debug options
logging.debug('STeC Analysis CLI options:')
for arg in vars(args):
logging.debug(' {}:\t {}'.format(arg, getattr(args, arg) or ''))
logging.info("STeC Analysis script is running...")
try:
# Check arguments
if args.verify:
# Fetch all scheduled tests and verify them
analyser.verify_tests(args.path)
if args.stats:
analyser.fetch_statistics(codet_length_ms=args.codet_length_ms, include_rain=args.include_rain, auto_extend=args.auto_extend_evt)
if args.extract_geophone_features:
analyser.extract_geophone_features(args.path)
except Exception as ex:
logging.error('Experienced an error: %s' % (ex,))
logging.shutdown()
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
Copyright (c) 2021, ETH Zurich, Computer Engineering Group (TEC)
"""
# Flocklab script for the STeC project
#
# Author: abiri
# Date: 03.11.21
import logging
import configparser
import sys
import datetime as dt
import flocklab
from ast import literal_eval
from argparse import ArgumentParser
from os.path import isfile, abspath
sys.path.append("../") # FIXME: Work around if not built as a package
from data_management.data_manager import DataManager
from data_management.data_processing import DataProcessor
# ----------------------------------------------------------------------------------------------------------------------
# General consts and variables
# ----------------------------------------------------------------------------------------------------------------------
# Default params for command line arguments that are mandatory
DEFAULT_CONFIG_FILE = '../stec.conf'
DEFAULT_LOG_LEVEL = 'INFO'
# Nodes 15, 17 and 25 are available but external nodes and not in FSK range; 30 is on ETZ roof
FL_NODE_IDS = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 16, 19, 20, 21, 22, 23, 24, 26, 27, 28, 29, 30, 31, 32]
FL_NODE_REMOTE_IDS = [15, 17, 25, 30]
FL_NODE_EXCLUDED_IDS = [6, 9, 10, 13]
DEPLOYMENT = 'dirruhorn'
VS_ACOUSTIC_METADATA = '_dpp_geophone_acq__conv'
VS_ACOUSTIC_DATA = '_dpp_geophone_adcData__conv'
# ----------------------------------------------------------------------------------------------------------------------
# Classes and Functions
# ----------------------------------------------------------------------------------------------------------------------
class STECFlocklabManager:
def __init__(self):
self.DEFAULT_NR_NODES = 2 # type: int
self.DEFAULT_NR_EVTS = 1 # type: int
self.DEFAULT_EVT_GAP_MS = 5*1000 # type: int
self.DEFAULT_BS_ID = FL_NODE_IDS[0] # type: int
self.EVT_GAP_MS_MIN = 0 # type: int
self.EVT_GAP_MS_MAX = 3600*1000 # type: int
self.LOG_FORMAT = None
self.LOG_DATE_FORMAT = None
self.LOG_FILE_NAME = None
self.LOG_FILE_LEVEL = None
self.LOG_STREAM_LEVEL = None
self.DATA_START_TIME = dt.datetime.strptime("01/01/2017", "%d/%m/%Y") # type: dt.datetime
self.DATA_END_TIME = dt.datetime.strptime("01/10/2020", "%d/%m/%Y") # type: dt.datetime
self.DATES_EXCLUDED = None
self.nr_nodes = None
self.nr_evts = None
self.evt_gap_ms = None
self.path = './' # type: str
self.basestation_id = None
self.basestation_image = None
self.max_nr_points = None
# Load config
self.load_config(DEFAULT_CONFIG_FILE)
self._DataMgr = DataManager(deployment=DEPLOYMENT, config_file=DEFAULT_CONFIG_FILE, project_name='stec', start_time=self.DATA_START_TIME, end_time=self.DATA_END_TIME)
self._DataProc = DataProcessor(config_file=DEFAULT_CONFIG_FILE)
self._codet_sample = None
def load_config(self, config_file=None):
if config_file is None:
config_file = DEFAULT_CONFIG_FILE
logging.info('Using default configuration file: %s' % (config_file,))
if not isfile(config_file):
logging.warning('Config file (%s) not found' % (config_file,))
else:
config_file = abspath(config_file)
# Read config file for other options
config = configparser.ConfigParser()
config.optionxform = str # Case sensitive
config.read(config_file)
section_common = 'stec'
section_special = 'stec-bins'
try:
# Read options from config
for name, value in (config.items(section_common) + config.items(section_special)):
value = value.strip()
if value != '':
if name == 'default_nr_nodes':
self.DEFAULT_NR_NODES = int(value)
elif name == 'default_nr_events':
self.DEFAULT_NR_EVTS = int(value)
elif name == 'default_evt_gap':
self.DEFAULT_EVT_GAP_MS = int(value)
elif name == 'default_bs_id':
self.DEFAULT_BS_ID = int(value)
elif name == 'evt_gap_ms_min':
self.EVT_GAP_MS_MIN = int(value)
elif name == 'evt_gap_ms_max':
self.EVT_GAP_MS_MAX = int(value)
elif name == 'log_format':
self.LOG_FORMAT = str(value)
elif name == 'log_date_format':
self.LOG_DATE_FORMAT = str(value)
elif name == 'log_file_name':
self.LOG_FILE_NAME = str(value)
elif name == 'log_file_level':
self.LOG_FILE_LEVEL = getattr(logging, value.upper())
elif name == 'log_stream_level':
self.LOG_STREAM_LEVEL = getattr(logging, value.upper())
elif name == 'data_start_time':
self.DATA_START_TIME = dt.datetime.strptime(value, "%d/%m/%Y")
elif name == 'data_end_time':
self.DATA_END_TIME = dt.datetime.strptime(value, "%d/%m/%Y")
elif name == 'dates_excluded':
self.DATES_EXCLUDED = literal_eval(value)
else:
logging.warning('Unknown config option in section [%s]: %s' % (section_common + "/" + section_special, name,))
except configparser.NoSectionError:
raise TypeError('No [%s] section specified in %s' % (section_common + "/" + section_special, config_file,))
@staticmethod
def is_valid_FL_id(node_id):
return node_id in FL_NODE_IDS
def load_codetections(self, path):
# Load full trace from file
trace = self._DataMgr.load_codetection_trace(path)
# Reduce to ordered time trace
trace = self._DataMgr.reduce_to_time_trace(trace)
return trace
def fetch_acoustic_data(self):
# Create URL including conditions
fields = 'device_id,start_time,end_time,generation_time,trg_duration'
url = self._DataMgr.assemble_gsn_url(VS_ACOUSTIC_METADATA, fields, max_nr_points=self.max_nr_points)
# Fetch data
try:
df = self._DataMgr.fetch_csv_data(url)
except Exception as ex_acoustic:
logging.error("Could not fetch CSV data from path:\n%s\n\n%s" % (url, ex_acoustic,))
return
logging.info("Received data from URL:\n%s" % (url,))
# Print statistics
self._DataProc.print_df_stats(df)
return df
def fetch_codetections(self, df, include_rain=True):
# Filter days with precipitation
excluded_dates = self.DATES_EXCLUDED
if not include_rain:
prec_dates = self._DataMgr.fetch_rain_dates()
if len(prec_dates):
excluded_dates = self.DATES_EXCLUDED + prec_dates
logging.info("Added %d days to list of excepted dates due to precipitation" % (len(prec_dates)))
return self._DataProc.find_codetections(df, dates_excluded=excluded_dates)
def generate_xml(self, codet_list, powertracing_enabled=True, random_sampling=True):
# Fetch subset
self._codet_sample = DataProcessor.gather_codetection_sample(codet_list=codet_list, max_nr_events=self.nr_evts, min_nr_of_nodes=self.nr_nodes, max_nr_of_nodes=(len(FL_NODE_IDS)-1), random=random_sampling)
if len(self._codet_sample) < self.nr_evts:
logging.warning("Tried to schedule %u events, but only %u co-detections with sufficient nodes (>= %u) were available" % (self.nr_evts, len(self._codet_sample), self.nr_nodes,))
# Verify that FlockLab IDs are up-to-date and available
fl = flocklab.Flocklab()
available_ids = set(fl.getObsIds('DPP2LoRa')) - set(FL_NODE_EXCLUDED_IDS)
if not set(FL_NODE_IDS).issubset(available_ids):
logging.warning("Not all given FlockLab IDs are available; expected %s, but available IDs are %s" % (str(sorted(FL_NODE_IDS)), str(sorted(available_ids)),))
unavailable_ids = []
for fl_id in FL_NODE_IDS:
if fl_id not in available_ids:
unavailable_ids.append(fl_id)
for fl_id in unavailable_ids:
FL_NODE_IDS.remove(fl_id) # Requires separate loop, as otherwise iterator in first loop jumps the next following ID
logging.warning("Removed IDs %s from the set of available nodes" % (str(unavailable_ids),))
# Create XML
self._DataMgr.create_FL_xml(codet_list=self._codet_sample, eligible_ids=FL_NODE_IDS, bs_id=self.basestation_id, evt_gap_t=self.evt_gap_ms, path=self.path, bs_img=self.basestation_image, powertracing_enabled=powertracing_enabled)
def upload_xml(self):
# Access fl-tools functions
fl = flocklab.Flocklab()
# Validate XML
ret = fl.xmlValidate(DataManager.get_test_xml())
if 'validated correctly' not in ret:
raise ValueError('Invalid FlockLab XML file: %s' % (ret,))
else:
logging.debug("XML validated correctly")
# Upload XML
ret = fl.createTestWithInfo(DataManager.get_test_xml())
if 'was successfully added' not in ret:
raise ValueError('Could not upload XML to FlockLab: %s' % (ret,))
else:
logging.info(ret.replace('was successfully added and is scheduled to start', 'starts'))
# Log test
ret = ret.split(' ')
test_nr = ret