data_manager.py 71.2 KB
Newer Older
Andreas Biri's avatar
Andreas Biri committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
# -*- coding: utf-8 -*
"""
Copyright (c) 2021, ETH Zurich, Computer Engineering Group (TEC)
"""

# Data manager for interfacing with GSN
#
# Author: abiri, tkuonen, matthmey, tgsell
# Date:   03.11.21

import logging
import os
import sys
import configparser
import urllib.request
import datetime as dt
import numpy    as np
import pandas   as pd
import xml.etree.ElementTree as ET
from math   import ceil, floor
from base64 import b64encode
from ast    import literal_eval
from random import shuffle, randrange
from urllib import error
from csv    import DictReader

# ----------------------------------------------------------------------------------------------------------------------
# General consts and variables
# ----------------------------------------------------------------------------------------------------------------------

# Default params for command line arguments that are mandatory
FILE_DIR          = os.path.dirname(__file__)
DEFAULT_LOG_LEVEL = 'ERROR'

FL_XML_TEMPLATE  = FILE_DIR + '/flocklab_template.xml'
FL_XML_TEST      = FILE_DIR + '/flocklab_test.xml'
FL_TEST_LOG      = FILE_DIR + '/flocklab_test.log'
FL_TIME_FORMAT   = "%Y-%m-%d %H:%M:%S"
FL_NAMESPACE     = 'http://www.flocklab.ethz.ch'
FL_BAUDRATE      = 460800
FL_TRACING_PIN   = 'INT2'
FL_ACTUATION_PIN = 'SIG1'

TEST_FILE   = '/comboard_'  # To be extended with project name
SERIAL_FILE = '/serial.csv'
GPIO_FILE   = '/gpiotracing.csv'
POWER_FILE  = '/powerprofiling.csv'
CACHE_FILE  = '/cache'

TRACE_FILE  = '/trace'

# Global constants
S_TO_MS  = 1000
MS_TO_US = 1000
S_TO_US  = 1000 * 1000

MA_TO_UA = 1000
MW_TO_UW = 1000
MJ_TO_UJ = 1000

MAX_NR_OF_NODES_PER_DETECTION = 30

EVT_TIME_MAX_MS = 30 * S_TO_MS  # 1s staggered wake-up + 15s max event duration + 12s for report and ACK + 1s for time sync = 29s
TEST_BUFFER_MS  = 60 * S_TO_MS  # 8 nodes require 1.5s each for NodeID + 3s initial delay = 15s -> 2/3 of buffer is used to offset first trigger

MAX_FL_EVT_PROPAGATION_MS = 2 * S_TO_MS
MAX_FL_PRINT_DELAY_S      = 1

MAX_EVT_DURATION_MS    = 15 * S_TO_MS
MAX_EVT_PROPAGATION_MS = 100

# 0.5 mm/h results in reasonable events if weather data from Gruengarten is included as well; for simulations, all rain data is excluded (0.1 mm/h)
VS_PRECIPITATION   = '_vaisalawxt520prec'
MAX_RAIN_INTENSITY = 0.5  # [mm/h]
MAX_HAIL_INTENSITY = 0.5  # [hits/cm²/h]

# ----------------------------------------------------------------------------------------------------------------------
# Classes and functions
# ----------------------------------------------------------------------------------------------------------------------


class DataManager:

    def __init__(self, deployment, config_file, project_name, start_time=None, end_time=None):
        self._logger = logging.getLogger(self.__class__.__name__)

        if isinstance(deployment, str):
            self._deployment = deployment  # type: str
        else:
            raise TypeError("Deployment must be defined")
        if isinstance(project_name, str):
            self._project_name = project_name  # type: str
        else:
            raise TypeError("Project name must be defined")
        if isinstance(start_time, dt.datetime):
            self._start_time = start_time  # type: dt.datetime
        else:
            self._start_time = dt.datetime.strptime("01/01/2000", "%d/%m/%Y")
        if isinstance(end_time, dt.datetime):
            self._end_time   = end_time    # type: dt.datetime
        else:
            self._end_time   = dt.datetime.strptime("01/01/2025", "%d/%m/%Y")

        self._server_url = None

        log_file_name    = None
        log_file_level   = None
        log_stream_level = DEFAULT_LOG_LEVEL
        log_format       = None
        log_date_format  = None

        # Check configuration file
        if not os.path.isfile(config_file):
            raise TypeError('Config file (%s) not found' % (config_file,))

        config_file = os.path.abspath(config_file)

        # Read config file for other options
        config = configparser.SafeConfigParser()
        config.optionxform = str  # Case sensitive
        config.read(config_file)

        section_common  = '%s' % project_name
        section_special = '%s-manager' % project_name
        try:
            # Read options from config
            for name, value in (config.items(section_common) + config.items(section_special)):
                value = value.strip()
                if value != '':
                    if name == 'server':
                        if not isinstance(value, str):
                            raise TypeError('Server URL must be of type str: %s' % (value,))
                        else:
                            self._server_url = value
                    elif name == 'log_file_name':
                        if not isinstance(value, str):
                            raise TypeError('Log file name must be of type str: %s' % (value,))
                        else:
                            log_file_name = value
                    elif name == 'log_file_level':
                        if not isinstance(value, str):
                            raise TypeError('Log file level must be of type str: %s' % (value,))
                        else:
                            log_file_level = value
                    elif name == 'log_stream_level':
                        if not isinstance(value, str):
                            raise TypeError('Log stream level must be of type str: %s' % (value,))
                        else:
                            log_stream_level = value
                    elif name == 'log_format':
                        if not isinstance(value, str):
                            raise TypeError('Log format must be of type str: %s' % (value,))
                        else:
                            log_format = value
                    elif name == 'log_date_format':
                        if not isinstance(value, str):
                            raise TypeError('Log date format must be of type str: %s' % (value,))
                        else:
                            log_date_format = value
                    else:
                        self._logger.warning('Unknown config option in section [%s]: %s' % (section_common + "/" + section_special, name,))
        except configparser.NoSectionError:
            raise TypeError('No [%s] section specified in %s' % (section_common + "/" + section_special, config_file,))

        # Make sure necessary settings are set
        if self._server_url is None:
            raise ValueError("Server URL must be set in the configs")

        # Initialize logging - afterwards, can use "self._logger.*" instead of root logger ("logging.*")
        self._init_logging(log_file_name, log_file_level, log_stream_level, log_format, log_date_format)

    def _init_logging(self, log_file_name=None, log_file_level=None, log_stream_level=None, format=None, date_format=None):
        self._logger.setLevel(logging.DEBUG)
        self._logger.propagate = False  # Avoid propagation to root logger

        # Setup file handler for logging
        if log_file_name is not None and log_file_level is not None:
            fh = logging.FileHandler(filename=log_file_name)
            fh.setLevel(log_file_level)
        else:
            fh = None

        # Setup console output
        if log_stream_level is not None:
            ch = logging.StreamHandler()
            ch.setLevel(log_stream_level)
        else:
            ch = None

        # Create formatter and add it to the handlers
        if format is not None:
            if date_format is None:
                formatter = logging.Formatter(format)
            else:
                formatter = logging.Formatter(fmt=format, datefmt=date_format)

            if fh is not None:
                fh.setFormatter(formatter)
            if ch is not None:
                ch.setFormatter(formatter)

        # Add handlers to logger
        if fh is not None:
            self._logger.addHandler(fh)
        if ch is not None:
            self._logger.addHandler(ch)

    @staticmethod
    def get_test_xml():
        return FL_XML_TEST

    def assemble_gsn_url(self, virtual_sensor, fields='All', conditions=None, max_nr_points=None, server_url=None, deployment=None):
        """ Create URL to retrieve GSN data according to a variable number of conditions

        @param virtual_sensor: str  # Name of virtual sensor
        @param fields: str          # Fields that should be fetched from the specified virtual sensor
        @param conditions:     list # List of conditions in the form:
                                      [{'join':<'and' or 'or'>,
                                        'field':<name of field>,
                                        'min':<min value>,
                                        'max':<max value>}]
        @param max_nr_points: int   # Maximal number of data points
        @param server_url: str      # Optional: Server URL if default of data manager should be overwritten
        @param deployment: str      # Optional: Deployment String if default of data manager should be overwritten

        @returns: Assembled URL
        """
        # Documentation of URL format:
        # https://github.com/LSIR/gsn/wiki/Web-Interface
        # https://doi.org/10.3929/ethz-b-000323342

        # Fetch default parameters if no parameters were overwritten
        if server_url is None:
            server_url = self._server_url
        if deployment is None:
            deployment = self._deployment

        # Base URL
        vs  = deployment + virtual_sensor
        url = server_url + "/multidata?field[0]={0:s}&vs[0]={1:s}".format(fields, vs)

        # Time settings
        url += "&time_format=iso&timeline=generation_time&from={0}&to={1}".format(self._start_time.strftime("%d/%m/%Y+%H:%M:%S"), self._end_time.strftime("%d/%m/%Y+%H:%M:%S"))

        # Add conditions
        if conditions is None:
            conditions = []
        for i in range(0, len(conditions)):
            condition = conditions[i]
            condition["i"]  = i
            condition["vs"] = deployment + virtual_sensor

            condition_str = ("&c_join[{i:d}]={join:s}"
                            + "&c_vs[{i:d}]={vs:s}&c_field[{i:d}]={field:s}"
                            + "&c_min[{i:d}]={min:d}&c_max[{i:d}]={max:d}").format(**condition)

            # Bug fix: Ignore OR conditions, as they mistakenly also OR c_min and c_max (resulting in c_min=X or c_max=Y, making conditions unusable)
            if condition['join'] == 'or':
                self._logger.warning('Ignoring OR condition: ' + condition_str)
            else:
                url += condition_str

        # Add limit on number of samples if desired
        if max_nr_points is not None and isinstance(max_nr_points, int):
            url += "&nb=SPECIFIED&nb_value={0}".format(max_nr_points)

        self._logger.debug("GSN http-query is:\n%s" % (url,))

        return url

    def fetch_binary_data(self, path):
        """ Download binary samples from GSN and return them

        @param path: str # Path of the file on the server (html escaped)

        @returns: Data retrieved from that URL

        @raises ValueError if URL does not return 200
        """
        # Make HTTP request
        request = urllib.request.urlopen(self._server_url + path)

        # If return code not ok: Raise exception
        if request.getcode() != 200:
            raise ValueError("Error %d while fetching data from %s" % (request.getcode(), self._server_url,))
        else:
            # if no error: return retrieved data
            return request.read()

    def fetch_csv_data(self, path, description="", abort=True, cache=True):
        """ Convert CSV to Pandas data frame

        @param path:        str  # Path can both be local and remote (can also pass URL directly)
        @param description: str  # Data description to be printed for error handling
        @param abort:       bool # Whether program execution should be aborted upon failure
        @param cache:       bool # Whether to use a cached file version if available
        @returns: Pandas data frame
        """

        if not isinstance(path, str):
            self._logger.warning("Given path is not a string, abort fetching data")
            return None

        # If caching is enabled, check whether data is already stored
        cached_path  = None
        cache_exists = False
        if cache:
            arguments   = path.split('&')
            start_date  = None
            end_date    = None
            vsensor     = None
            condition   = ''
            max_defined = False

            for argument in arguments:
                if 'vs' in argument and 'c_vs' not in argument:
                    vsensor = argument[len('vs[0]='):]
                elif 'from' in argument:
                    start_date = dt.datetime.strptime(argument.strip('from='), '%d/%m/%Y+%H:%M:%S')
                elif 'to' in argument:
                    end_date = dt.datetime.strptime(argument.strip('to='), '%d/%m/%Y+%H:%M:%S')
                elif 'nb=SPECIFIED' in argument:
                    max_defined = True
                elif 'c_join' in argument:
                    condition_join = argument[len('c_join[0]='):]
                elif 'c_field' in argument:
                    condition += '_' + argument[len('c_field[0]='):]
                elif 'c_min' in argument:
                    condition += '=' + argument[len('c_min[0]='):]
                elif 'c_max' in argument:
                    condition += '-' + argument[len('c_max[0]='):]

            if vsensor is None:
                self._logger.warning('Unknown virtual sensor %s')
            if start_date is None or end_date is None:
                self._logger.warning('Could not find start and/or end date for fetching CSV data')
            elif not max_defined:
                # Search for cached file
                cached_path  = './' + CACHE_FILE + '_' + vsensor + '_' + start_date.strftime('%Y%m%d') + '_' + end_date.strftime('%Y%m%d') + condition + '.csv'
                cache_exists = os.path.isfile(cached_path)

                if cache_exists:
                    self._logger.debug('Using cached file %s for retrieving data' % (cached_path,))
                    path = cached_path

        # Create data frame
        try:

            # Create cached file if it did not exist before
            if cache and cached_path is not None and not cache_exists:
                df = pd.read_csv(path, skiprows=2, float_precision='round_trip')  # skip header lines (first 2) for import
                with open(cached_path, 'w') as f:
                    f.write('Caching tool\nCache created at %s\n' % (dt.datetime.now().strftime(FL_TIME_FORMAT)))
                df.to_csv(cached_path, index=False, mode='a')  # Store file
                self._logger.info('Created new cache in file %s' % (cached_path,))
            else:
                df = pd.read_csv(path, skiprows=2)  # skip header lines (first 2) for import

            # Process file
            df['time'] = pd.to_datetime(df['generation_time'], utc=True)

            df = df.sort_values(by='time')
        except pd.errors.EmptyDataError:
            self._logger.info('No ' + description + ' data found for CSV from path:\n{:s}'.format(path))
            return None
        except error.HTTPError as http_error:
            self._logger.warning('Fetching test results threw HTTP Error {:3d}: {:s}'.format(http_error.code, http_error.reason))

            if http_error.code == 403:
                self._logger.info('Make sure that you are connected to ee-tik, as the server is not accessible from outside ETH')
            return None
        except Exception as ex_mgr:
            self._logger.error('Could not fetch ' + description + ' CSV data from path:\n{:s}\n\n{:s}'.format(path, str(ex_mgr)))

            if abort:
                sys.exit(1)
            else:
                return None

        # Remove columns with only 'null'
        df = df.replace(r'null', np.nan, regex=True)
        isnull = df.isnull().all()
        [df.drop([col_name], axis=1, inplace=True) for col_name in df.columns[isnull]]

        # Remove '#' from first column name
        first_column = df.columns[0]
        df.rename(columns={first_column: first_column.replace('#', '')}, inplace=True)

        # Drop 'generation_time' and order according to time
        df.drop(columns={'generation_time'}, inplace=True)
        df.set_index('time', inplace=True)

        self._logger.debug('Received data from URL:\n{:s}'.format(path))

        return df

    def fetch_rain_dates(self, include_gruengarten=False, include_breithorn=False, include_grabengufer=False):

        prec_dates = []

        # Fetch corresponding data
        weather_station_gruengarten_pos = 13  # Dirruhorn Gruengarten - still existing
        weather_station_grabengufer_pos = 42  # Grabengufer           - still existing
        weather_station_breithorn_pos   = 68  # Breithorn             - still existing
        weather_station_glacier_pos     = 69  # Dirruhorn Blockgletscher - existed until June 2020

        df = self.fetch_precipitation_data(weather_station_glacier_pos)

        if include_gruengarten:
            # Also include data from Dirruhorn Gruengarten
            df = df.append(self.fetch_precipitation_data(weather_station_gruengarten_pos), ignore_index=False, verify_integrity=False)
        if include_breithorn:
            # Also include data from Breithorn
            df = df.append(self.fetch_precipitation_data(weather_station_breithorn_pos), ignore_index=False, verify_integrity=False)
        if include_grabengufer:
            # Also include data from Grabengufer
            df = df.append(self.fetch_precipitation_data(weather_station_grabengufer_pos), ignore_index=False, verify_integrity=False)

        # Drop rows with null due to faulty data from Vaisala sensor
        df.dropna(inplace=True)

        # Order again (as multiple dataframes got appended)
        df.sort_index(inplace=True)

        # Go through data and detect days above threshold
        start_date = df.index.min().replace(hour=0, minute=0, second=0, microsecond=0)
        end_date   = df.index.max().replace(hour=0, minute=0, second=0, microsecond=0)

        curr_date = start_date

        while curr_date < end_date:
            next_date = curr_date + dt.timedelta(days=1)
            curr_data = df.loc[curr_date:next_date]

            if (curr_data['rain_intensity'] > MAX_RAIN_INTENSITY).any():
                self._logger.debug("Detected rain on %s" % (curr_date.strftime("%Y-%m-%d"),))
                prec_dates.append([curr_date.strftime("%d/%m/%Y")])
            elif (curr_data['hail_intensity'] > MAX_HAIL_INTENSITY).any():
                self._logger.debug("Detected hail on %s" % (curr_date.strftime("%Y-%m-%d"),))
                prec_dates.append([curr_date.strftime("%d/%m/%Y")])

            curr_date = next_date

        return prec_dates

    def fetch_precipitation_data(self, position):
        station_cond = [{'join': 'and',
                         'field': 'position',
                         'min': position - 1,
                         'max': position}]
        station_fields = 'generation_time,rain_intensity,hail_intensity,position'

        precipitation_url = self.assemble_gsn_url(VS_PRECIPITATION, fields=station_fields, conditions=station_cond)

        return self.fetch_csv_data(precipitation_url)

    def generate_codetection_trace(self, nr_codets, nr_evts, max_prop_delay_ms=MAX_EVT_PROPAGATION_MS, max_duration_ms=MAX_EVT_DURATION_MS, inter_arrival_s=(MAX_EVT_DURATION_MS / S_TO_MS + 5), interval_variation_percent=0):
        trace          = []
        time_offset_us = 0

        for codet in range(nr_codets):

            # Add the first event which has an absolute Unix timestamp
            curr_codet = [{'id': 0, 'time': time_offset_us, 'duration': randrange(1 * S_TO_MS, max_duration_ms) / S_TO_MS}]

            for event in range(1, nr_evts):
                curr_evt = {'id': event, 'time': randrange(0, max_prop_delay_ms * MS_TO_US), 'duration': randrange(1 * S_TO_MS, max_duration_ms) / S_TO_MS}
                curr_codet.append(curr_evt)

            trace.append(curr_codet)
            effective_inter_arrival_s = randrange(floor((1 - interval_variation_percent / 100) * inter_arrival_s), ceil((1 + interval_variation_percent / 100) * inter_arrival_s) + 1)
            time_offset_us += effective_inter_arrival_s * S_TO_US

        self._logger.info('Generated trace with %i co-detections consisting of %i events each' % (nr_codets, nr_evts,))
        return trace

    def store_codetection_trace(self, trace, trace_path, relative_path=True, overwrite=True):

        # Complete and check path
        nr_events = self.verify_trace(trace)
        if nr_events == 0:
            self._logger.warning('Received empty trace to log')
            return

        if relative_path:
            trace_path = './' + TRACE_FILE + trace_path + '.log'

        if TRACE_FILE not in trace_path or os.path.splitext(trace_path)[1] != '.log':
            raise ValueError('Attempted to store trace to invalid trace file %s' % (trace_path,))

        # Check if trace file already exists
        if os.path.isfile(trace_path):
            if not overwrite:
                self._logger.info('Trace file (%s) already exists, skipping generation' % (trace_path,))
                return
            else:
                self._logger.info('Trace file (%s) already exists, overwriting file with new trace' % (trace_path,))
                os.remove(trace_path)

        # Write to file
        test_file  = open(trace_path, 'w')
        test_file.write(str(trace))
        test_file.close()
        self._logger.debug('Finished co-detection trace storing with %i co-detections consisting of %i events' % (len(trace), nr_events,))
        return trace_path

    def load_codetection_trace(self, trace_path, relative_path=True, id_offset=0):

        # Complete and check path
        if relative_path:
            trace_path = './' + TRACE_FILE + '_' + trace_path + '.log'

        if TRACE_FILE not in trace_path or os.path.splitext(trace_path)[1] != '.log':
            raise ValueError('Attempted to load trace from invalid trace file %s' % (trace_path,))

        if not os.path.isfile(trace_path):
            self._logger.warning('Trace file (%s) not found' % (trace_path,))
            return None

        # Read from file
        trace_file = open(trace_path, 'r')
        trace_str  = trace_file.read()
        trace_file.close()

        # Convert trace from string to object
        trace     = literal_eval(trace_str)
        nr_events = self.verify_trace(trace)

        if id_offset > 0:
            for codet in trace:
                for event in codet:
                    event['id'] += id_offset

            self._logger.debug('Offset IDs from trace by offset %i' % (id_offset,))

        self._logger.debug('Finished co-detection trace loading with %i co-detections consisting of %i events' % (len(trace), nr_events,))
        return trace

    @staticmethod
    def generate_trace_postfix(codets, evts, prop_delay_ms=None, evt_interval_s=None):
        return '_c_%i_e_%i%s%s' % (codets, evts, '_p_%i' % (prop_delay_ms,) if prop_delay_ms is not None else '', '_i_%i' % (evt_interval_s,) if evt_interval_s is not None else '',)

    @staticmethod
    def verify_trace(trace):
        nr_events = 0

        # Verify that trace is of correct type
        if not isinstance(trace, list):
            raise TypeError('Expected to load trace as list, but encountered %s' % (type(trace),))

        for codet in trace:
            if not isinstance(codet, list):
                raise TypeError('Expected to load co-detection in trace as list, but encountered %s' % (type(trace),))
            else:
                nr_events += len(codet)

            for event in codet:
                if not isinstance(event, dict):
                    raise TypeError('Expected to load co-detection event in trace as dictionary, but encountered %s' % (type(trace),))

        return nr_events

    @staticmethod
    def reduce_to_time_trace(trace):
        time_trace = [[] for _ in range(MAX_NR_OF_NODES_PER_DETECTION)]

        # Add to list
        for codet in trace:
            curr_codet = []

            for event in codet:
                curr_codet.append(event['time'])

            # Add sorted list to trace
            sorted_codet = [curr_codet[0]] + sorted(curr_codet[1:])
            time_trace[len(sorted_codet)].append(sorted_codet)

        return time_trace

    @staticmethod
    def xml_indent(elem, level=0):
        i = "\n" + level * "\t"
        if len(elem):
            if not elem.text or not elem.text.strip():
                elem.text = i + "\t"
            if not elem.tail or not elem.tail.strip():
                elem.tail = "\n" + i if (level == 1) else i
            for elem in elem:
                DataManager.xml_indent(elem, level + 1)
            if not elem.tail or not elem.tail.strip():
                elem.tail = i
        else:
            if level and (not elem.tail or not elem.tail.strip()):
                elem.tail = i

    def create_FL_xml(self, codet_list, eligible_ids, bs_id, evt_gap_t, path, bs_img=None, include_codetections=True, randomize_order=False, pretty_print=True, powertracing_enabled=True):

        # Register namespace
        ET.register_namespace('', FL_NAMESPACE)

        # Import template
        tree = ET.parse(FL_XML_TEMPLATE)
        root = tree.getroot()

        # Gather necessary information for XML
        if codet_list is None or len(codet_list) == 0:
            raise ValueError("Did not receive any co-detections to schedule")
        elif eligible_ids is None or len(eligible_ids) == 0:
            raise ValueError("Did not receive any eligible Flocklab nodes")
        elif bs_id == 0 or bs_id not in eligible_ids:
            raise ValueError("Invalid BaseStation ID %d; must be in %s" % (bs_id, str(eligible_ids),))

        # Choose test time
        test_time_s = ceil((len(codet_list) * EVT_TIME_MAX_MS + (len(codet_list) - 1) * evt_gap_t + TEST_BUFFER_MS)/1000)
        self._logger.debug("Scheduling test of length %ds for %d co-detections with a gap time of %dms" % (test_time_s, len(codet_list), evt_gap_t))

        # Choose nodes
        max_nr_nodes = max(len(x) for x in codet_list)
        eligible_ids.remove(bs_id)  # Make sure BaseStation is not eligible
        if len(eligible_ids) < max_nr_nodes:
            self._logger.warning("Insufficient number eligible FL nodes (%d) for max co-detection of size %d, truncate co-detections" % (len(eligible_ids), max_nr_nodes))
            ids = eligible_ids
        else:
            ids = eligible_ids[:max_nr_nodes]

        # If desired, shuffle list to vary topology
        if randomize_order:
            shuffle(ids)

        # Import binary data
        binary_src_path = path + TEST_FILE + self._project_name + '.elf'
        if not os.path.isfile(binary_src_path):
            raise TypeError('Source file (%s) not found' % (binary_src_path,))
        else:
            data = open(binary_src_path, 'rb').read()
            binary_src = b64encode(data).decode("utf-8")

        if bs_img is None:
            binary_bs_path = path + TEST_FILE + self._project_name + '_bs.elf'
            if not os.path.isfile(binary_bs_path):
                raise TypeError('BaseStation file (%s) not found' % (binary_bs_path,))
            else:
                data = open(binary_bs_path, 'rb').read()
                binary_bs = b64encode(data).decode("utf-8")
        else:
            binary_bs = 'dbImgId %s used' % (bs_img,)

        # Edit XML elements
        target_elem = None
        image_elem  = None
        power_elem  = None

        for elem in root:
            if   elem.tag == '{%s}%s' % (FL_NAMESPACE, 'generalConf',):
                for subelem in elem:
                    if subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'schedule',):
                        subelem[0].text = str(test_time_s)
                    elif subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'custom',) and include_codetections:
                        subelem.text = str(codet_list)
            elif elem.tag == '{%s}%s' % (FL_NAMESPACE, 'targetConf',):
                for subelem in elem:
                    if subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'obsIds',):
                        if   max(e.text == 'Image_src' for e in elem):
                            subelem.text = ' '.join(map(str, ids))
                        elif max(e.text == 'Image_bs' for e in elem):
                            subelem.text = str(bs_id)
                            target_elem  = elem  # Used to change element if image is provided by dbImageId
                        else:
                            self._logger.error('Found invalid target configuration with unknown embeddedImageId')
            elif elem.tag == '{%s}%s' % (FL_NAMESPACE, 'serialConf',):
                for subelem in elem:
                    if   subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'obsIds',):
                        subelem.text = ' '.join(map(str, ids + [bs_id]))
                    elif subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'baudrate',):
                        subelem.text = str(FL_BAUDRATE)
            elif elem.tag == '{%s}%s' % (FL_NAMESPACE, 'gpioTracingConf',):
                for subelem in elem:
                    if subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'obsIds',):
                        subelem.text = ' '.join(map(str, ids + [bs_id]))
            elif elem.tag == '{%s}%s' % (FL_NAMESPACE, 'gpioActuationConf',):
                self._logger.warning('Found existing \'gpioActuationConf\'')
            elif elem.tag == '{%s}%s' % (FL_NAMESPACE, 'powerProfilingConf',):
                power_elem = elem
                for subelem in elem:
                    if   subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'obsIds',):
                        subelem.text = ' '.join(map(str, ids + [bs_id]))
                    elif subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'duration',):
                        subelem.text = str(test_time_s)
            elif elem.tag == '{%s}%s' % (FL_NAMESPACE, 'embeddedImageConf',):
                for subelem in elem:
                    if subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'data',):
                        if   max(e.text == 'Image_src' for e in elem):
                            subelem.text = str(binary_src)
                        elif max(e.text == 'Image_bs' for e in elem):
                            subelem.text = str(binary_bs)
                            image_elem = elem
                        else:
                            self._logger.error('Found invalid embedded image configuration with embeddedImageId')
            else:
                self._logger.warning('Found unknown XML element %s in \'%s\'' % (elem.tag, FL_XML_TEMPLATE,))

        # Edit XML for BaseStation if necessary - maintains correct order so that <targetConf> is defined before <embeddedImageConf>
        if bs_img is not None:
            # Remove 'embeddedImageId' attribute
            for subelem in target_elem:
                if subelem.tag == '{%s}%s' % (FL_NAMESPACE, 'embeddedImageId',):
                    if target_elem is not None:
                        target_elem.remove(subelem)
                    else:
                        self._logger.warning('Could not remove \'embeddedImageId\' as <targetConf> was not set')
                    break

            # Remove <embeddedImageConf> for 'Image_bs'
            if image_elem is not None:
                root.remove(image_elem)
            else:
                self._logger.warning('Could not remove <embeddedImageConf> as it was not set')

            # Add 'dbImageId' as attribute
            ET.SubElement(target_elem, 'dbImageId').text = str(bs_img)

        # Remove PowerTracing if desired
        if not powertracing_enabled:
            root.remove(power_elem)

        # Add XML items for actuation
        offset_ms     = TEST_BUFFER_MS * 2 / 3
        codet_offsets = []

        for i in range(0, len(ids)):
            codet_offsets.append([])

        # First find offsets for each ID
        for codet in codet_list:
            for node_offset in codet:
                if codet.index(node_offset) < len(ids):
                    codet_offsets[codet.index(node_offset)].append((offset_ms + node_offset / 1000) if (codet.index(node_offset) > 0) else offset_ms)
                else:
                    self._logger.warning('Skipping trigger %d due to insufficient number of available FL nodes (%d)' % (codet.index(node_offset) + 1, len(ids),))

            # Add offset for next co-detection
            offset_ms += EVT_TIME_MAX_MS + evt_gap_t

        # Add configuration for each node
        for node in ids:
            actuation_elem = ET.SubElement(root,           'gpioActuationConf')
            ET.SubElement(actuation_elem, 'obsIds').text = str(node)

            for offset_abs in codet_offsets[ids.index(node)]:
                pinConf = ET.SubElement(actuation_elem, 'pinConf')
                ET.SubElement(pinConf, 'pin').text    = 'SIG1'
                ET.SubElement(pinConf, 'level').text  = 'high'
                ET.SubElement(pinConf, 'offset').text = str(offset_abs / 1000)
                ET.SubElement(pinConf, 'period').text = str(1)
                ET.SubElement(pinConf, 'count').text  = str(1)

            self._logger.debug('Added gpioActuationConf for node %d with %d actuation(s)' % (node, len(codet_offsets[ids.index(node)])))

        # Enable pretty printing of XML for better readability
        if pretty_print:
            self.xml_indent(root)

        # Write file
        xml_string = ET.tostring(root, encoding='unicode', method='xml')
        test_file  = open(FL_XML_TEST, 'w')
        test_file.write(xml_string)
        test_file.close()

        self._logger.debug("Updated XML test configuration")

    def log_test(self, log_str):

        if not isinstance(log_str, str):
            self._logger.warning("Given item to log is not a string: %s" % (str(log_str),))
            return False

        log_file   = open(FL_TEST_LOG, 'a+')
        str_to_log = dt.datetime.now().strftime(FL_TIME_FORMAT) + " - " + log_str + "\n"
        log_file.write(str_to_log)
        log_file.close()

        return True

    def get_logged_tests(self, path=None):

        if path is None:
            path = FL_TEST_LOG

        if not os.path.isfile(path):
            self._logger.warning('Log file (%s) not found' % (path,))
            return None

        # Get logs
        log_file = open(path, 'r')
        data     = log_file.read()
        log_file.close()

        # Parse lines
        lines     = data.split('\n')
        rows_list = []

        for line in lines:

            # Make sure line is not empty or not commented out
            if len(line) == 0 or line[0] == '#' or line[:2] == '//':
                continue

            # Extract time of creation and logged data
            time_log, data = line.split(' - ')

            test_id, time_scheduled, codetections = data.split(',', maxsplit=2)

            row = {'time_log':       dt.datetime.strptime(time_log, FL_TIME_FORMAT),
                   'time_scheduled': dt.datetime.strptime(time_scheduled, FL_TIME_FORMAT),
                   'test_id':        int(test_id),
                   'codetections':   literal_eval(codetections)}
            rows_list.append(row)

            self._logger.debug('Read test %s from log, scheduled at %s' % (test_id, time_scheduled,))

        return pd.DataFrame(rows_list)

    def extract_test_metrics(self, test_ids, path=None):

        if path is None:
            path = './'

        if not os.path.isdir(path):
            self._logger.warning('Invalid path to test files: %s' % (path,))
            return None

        rows_list = [[], [], []]

        for test_id in test_ids:

            self._logger.debug("Starting to process %i" % (test_id,))

            base_path   = path + '/' + str(test_id)
            serial_path = base_path + SERIAL_FILE
            gpio_path   = base_path + GPIO_FILE
            power_path  = base_path + POWER_FILE

            # Check if file exists
            if not os.path.isfile(serial_path):
                self._logger.warning('Invalid path to serial test files of test %d: %s' % (test_id, serial_path,))
            elif not os.path.isfile(gpio_path):
                self._logger.warning('Invalid path to GPIO test files of test %d: %s' % (test_id, gpio_path,))
            else:
                gpio_df    = pd.read_csv(gpio_path,   names=['timestamp', 'observer_id', 'node_id', 'pin_name', 'value'], skiprows=1)
                serial_df  = pd.read_csv(serial_path, names=['timestamp', 'observer_id', 'node_id', 'direction', 'output'], skiprows=1)

                if not os.path.isfile(power_path):
                    self._logger.debug('Invalid path to power test files of test %d: %s (power tracing might be disabled)' % (test_id, power_path,))
                    power_df = None
                else:
                    power_df = pd.read_csv(power_path,  names=['timestamp', 'observer_id', 'node_id', 'current_mA', 'voltage_V'], skiprows=1)


                # Node health reporting and BaseStation Information for event timestamping - must be filtered before event analysis
                df = serial_df[['timestamp', 'node_id', 'output']].copy()
                df.set_index('timestamp', drop=False, inplace=True)
                df.sort_index(inplace=True)

                # Drop lines where timestamp does not exist (is nan)
                if df.index.isnull().any():
                    self._logger.warning("Had to drop %d serial lines due to missing timestamp" % np.count_nonzero(df.index.isnull()))
                    df = df[df.index.notnull()]

                bs_reference     = df.loc[df['output'].str.contains('base station current time:', na=False)]
                local_timestamps = df.loc[df['output'].str.contains('requested timestamp sent', na=False)]
                gsn_timestamps   = df.loc[df['output'].str.contains('message.c: node ', na=False) | df['output'].str.contains('message generation time updated to', na=False)]

                nodeinfo_start = df.loc[df['output'].str.contains('node info message generated', na=False)]
                health_start   = df.loc[df['output'].str.contains('health message generated',    na=False)]

                # Eliminate corresponding Node health and BaseStation traces
                df = gpio_df[['timestamp', 'node_id', 'pin_name', 'value']].copy()
                df.set_index('timestamp', drop=False, inplace=True)
                df.sort_index(inplace=True)

                # Filter initial values where each node is set at t=0
                superfluous_traces = [df.first_valid_index()]
                df.drop(superfluous_traces, inplace=True)

                # Find actuations from BaseStation
                bs_id    = bs_reference['node_id'].unique()[0]
                mask_bs  = (df['node_id'] == bs_id) & (df['pin_name'] == FL_TRACING_PIN)
                mask_ref = mask_bs & (df['value'] == 1)
                bs_traces     = df[mask_bs].index.tolist()
                bs_ref_traces = df[mask_ref]

                # Find actuations which are due to node health messages
                traces = df[df['pin_name'] == FL_TRACING_PIN]
                health_traces_start = []
                health_traces_end   = []
                health_node_id      = []
                nodehealth_max_start_delay =  100 / 1000
                nodehealth_max_end_delay   = 1200 / 1000

                for j in health_start.index:

                    t    = health_start.loc[j].at['timestamp']
                    mask = (traces['value'] == 1) & (traces['node_id'] == health_start.loc[j].at['node_id'])
                    start_trace = traces[mask].loc[t:(t + nodehealth_max_start_delay)].index.tolist()

                    if len(start_trace) == 1:
                        # Unique actuation, find matching end
                        t    = start_trace[0]
                        mask = (traces['value'] == 0) & (traces['node_id'] == health_start.loc[j].at['node_id'])
                        end_trace = traces[mask].loc[t:(t + nodehealth_max_end_delay)].index.tolist()

                        if len(end_trace) >= 1:
                            health_traces_start.append(start_trace[0])
                            health_traces_end.append(end_trace[0])
                            health_node_id.append(health_start.loc[j].at['node_id'])
                        else:
                            self._logger.warning("Could not find matching end of trace for health message of node %i at timestamp %f in test %d" % (health_start.loc[j].at['node_id'], health_start.loc[j].at['timestamp'], test_id))
                    else:
                        self._logger.warning("Could not find unique start of trace for health message of node %i at timestamp %f in test %d" % (health_start.loc[j].at['node_id'], health_start.loc[j].at['timestamp'], test_id))


                # GPIO
                df = gpio_df[['timestamp', 'node_id', 'pin_name', 'value']].copy()

                # Sanitize data

                # Filter unused pins
                used_pins     = [FL_TRACING_PIN, FL_ACTUATION_PIN]
                unused_traces = df[~df['pin_name'].isin(used_pins)].index
                df.drop(unused_traces.tolist(), inplace=True)

                # Filter actuations from BaseStation and node health messages
                df.set_index('timestamp', drop=False, inplace=True)
                df.drop(bs_traces + health_traces_start + health_traces_end, inplace=True)
                df.sort_index(inplace=True)

                # Filter initial values where each node is set at t=0
                superfluous_traces = [df.first_valid_index()]
                df.drop(superfluous_traces, inplace=True)

                # Only look at defined node
                node_ids = sorted(df['node_id'].unique())

                evt_timing = []

                for node_id in node_ids:
                    data_cropped = df[df['node_id'] == node_id]

                    # Find actuations
                    mask       = (data_cropped['pin_name'] == FL_ACTUATION_PIN) & (data_cropped['value'] == 1)
                    actuations = data_cropped[mask].index.tolist()
                    nr_evts    = len(actuations)

                    for idx in range(0, nr_evts):
                        # Crop all traces between two actuations or after last actuation
                        t_start = actuations[idx]
                        t_end   = actuations[idx+1] if (idx < (nr_evts-1)) else data_cropped.index.max()
                        mask    = (data_cropped['pin_name'] == FL_TRACING_PIN)
                        curr_traces = data_cropped[mask].loc[t_start:t_end].index.tolist()

                        nr_traces = len(curr_traces)
                        if (nr_traces % 2) != 0 or nr_traces < 2*4 or nr_traces > 2*5:
                            self._logger.warning("Incorrect number of traces (%d) for stages of node %d at event %d in test %d" % (nr_traces, node_id, idx, test_id,))
                        else:

                            # Map current index to event
                            min_delta = MAX_FL_EVT_PROPAGATION_MS / S_TO_MS
                            min_evt   = 0
                            for i in range(0, len(evt_timing)):
                                temp_evt   = evt_timing[i]
                                curr_delta = abs(temp_evt['start'] - actuations[idx])
                                if curr_delta < min_delta:
                                    min_delta = curr_delta
                                    min_evt   = i

                            if min_delta >= (MAX_FL_EVT_PROPAGATION_MS / S_TO_MS):
                                evt = len(evt_timing)  # Create new event
                            else:
                                evt = min_evt

                            # Find corresponding timestamped event start
                            curr_start = curr_traces[0]
                            curr_end   = (curr_traces[9] if (nr_traces == 10) else curr_traces[7]) + MAX_FL_PRINT_DELAY_S  # Add extra time as printing can be done after task is done
                            local_df   = local_timestamps[local_timestamps['node_id'] == node_id].loc[curr_start:curr_end, 'output']
                            gsn_msg    = gsn_timestamps[gsn_timestamps['output'].str.contains('message generation time updated to', na=False)].loc[curr_start:curr_end, 'output']
                            gsn_offset = gsn_timestamps[gsn_timestamps['output'].str.contains('message.c: node %i' % node_id, na=False)].loc[curr_start:curr_end, 'output']
                            bs_df      = bs_reference.loc[curr_start:curr_end,  'output']
                            if (local_df.shape[0] > 0) and (gsn_msg.shape[0] > 0) and (gsn_offset.shape[0] > 0) and (bs_df.shape[0] > 0):
                                curr_local_string  = local_df.item()
                                evt_local_logged   = float(curr_local_string.split()[5][1:-1])
                                curr_gsn_msg       = gsn_msg.iloc[0]
                                curr_gsn_offset    = gsn_offset.iloc[0]
                                evt_gsn_logged     = float(curr_gsn_msg.split()[-1]) - float(curr_gsn_offset.split()[5]) * MS_TO_US
                                evt_ts_groundtruth = t_start * S_TO_US

                                # Correct timestamp by BaseStation reference - while we might have multiple ones, we choose the first occurrence
                                curr_bs_string     = bs_df.iloc[0]
                                curr_bs_ref        = bs_ref_traces.loc[curr_start:curr_end, 'timestamp'].iloc[0] * S_TO_US
                                curr_bs_correction = curr_bs_ref - float(curr_bs_string.split()[6])
                                evt_local_logged   = evt_local_logged + curr_bs_correction
                                evt_gsn_logged     = evt_gsn_logged   + curr_bs_correction

                                store_gsn = True  # Evaluate with reference to GSN timestamp or local timestamp on Geophone
                                if store_gsn:
                                    evt_ts_logged = evt_gsn_logged
                                else:
                                    evt_ts_logged = evt_local_logged

                            else:
                                if local_df.shape[0] == 0:
                                    self._logger.warning('Could not find local timestamp for node %2i at event %2i in test %4i between %f - %f' % (node_id, evt, test_id, curr_start, curr_end,))
                                if gsn_msg.shape[0] == 0 or gsn_offset.shape[0] == 0:
                                    self._logger.warning('Could not find GSN timestamp for node %2i at event %2i in test %4i between %f - %f'   % (node_id, evt, test_id, curr_start, curr_end,))
                                if bs_df.shape[0] == 0:
                                    self._logger.warning('Could not find BS reference for event %2i in test %4i between %f - %f' % (evt, test_id, curr_start, curr_end,))

                                evt_ts_logged      = np.nan
                                evt_ts_groundtruth = np.nan

                            # Store timings
                            row_start = {'test_id': test_id, 'evt': evt, 'node_id': node_id, 'start': 1, 'stag_wkup': curr_traces[0], 'leader_elec': curr_traces[2], 'data_aggr': curr_traces[4], 'report': curr_traces[6] if (nr_traces == 10) else 0, 'dist': curr_traces[8] if (nr_traces == 10) else curr_traces[6], 'evt_ts': evt_ts_logged}
                            row_end   = {'test_id': test_id, 'evt': evt, 'node_id': node_id, 'start': 0, 'stag_wkup': curr_traces[1], 'leader_elec': curr_traces[3], 'data_aggr': curr_traces[5], 'report': curr_traces[7] if (nr_traces == 10) else 0, 'dist': curr_traces[9] if (nr_traces == 10) else curr_traces[7], 'evt_ts': evt_ts_groundtruth}

                            rows_list[0].append(row_start)
                            rows_list[0].append(row_end)

                            # Update event characteristic if necessary
                            if len(evt_timing) <= evt:
                                # Add new event
                                curr_evt = {'start': row_start['stag_wkup'], 'end': row_end['dist']}
                                evt_timing.append(curr_evt)
                            else:
                                curr_evt        = evt_timing[evt]
                                evt_timing[evt] = {'start': min(curr_evt['start'], row_start['stag_wkup']), 'end': max(curr_evt['end'], row_end['dist'])}


                # Serial
                df = serial_df[['timestamp', 'node_id', 'output']].copy()
                df.set_index('timestamp', drop=False, inplace=True)
                df.sort_index(inplace=True)

                # Drop lines where timestamp does not exist (is nan)
                if df.index.isnull().any():
                    self._logger.warning("Had to drop %d serial lines due to missing timestamp" % np.count_nonzero(df.index.isnull()))
                    df = df[df.index.notnull()]

                # Identify BaseStation
                bs_id = 0
                basestations = df.loc[df['output'].str.contains('base station started', na=False), 'node_id'].unique()
                if len(basestations) == 0:
                    self._logger.warning("Could not identify base station in test %d, default to %d" % (test_id, bs_id,))
                elif len(basestations) > 1:
                    self._logger.warning("Expected a single base station in test %d, but found %d: %s" % (test_id, len(basestations), str(basestations),))
                else:
                    bs_id = basestations[0]

                # Only look at defined nodes
                serial_ids = sorted(df['node_id'].unique())
                if (sorted(node_ids + [bs_id])) != serial_ids:
                    self._logger.warning("Expected node IDs %s in serial data of test %d, but observed %s" % (str(sorted(node_ids + [bs_id])), test_id, str(serial_ids),))
                node_ids = serial_ids

                # Find node health statistics
                nack = 0
                for j in range(0, len(health_traces_start)):

                    # Search for ACK or NACK
                    mask = (df['node_id'] == health_node_id[j]) & df['output'].str.contains('TX to base failed', na=False)
                    nack_health = df[mask].loc[health_traces_start[j]:health_traces_end[j]].index.tolist()

                    if len(nack_health) == 1:
                        self._logger.debug("No ACK for node health of node %i at timestamp %s in test %d" % (health_node_id[j], health_traces_end[j], test_id,))
                        nack = nack + 1
                    elif len(nack_health) > 1:
                        self._logger.warning("Multiple NACK for node health of node %i at timestamp %s in test %d" % (health_node_id[j], health_traces_end[j], test_id,))

                health_ack = (1 - nack/len(health_traces_start)) * 100 if (len(health_traces_start) > 0) else np.nan  # Same report for all nodes of this test

                # Analyse events
                nr_evts = len(evt_timing)
                for evt in range(0, nr_evts):
                    curr_serial     = df.loc[evt_timing[evt]['start']:evt_timing[evt]['end']]
                    curr_serial_ids = sorted(curr_serial['node_id'].unique())

                    # Find leaders, reporters and acks
                    leaders       = [int(line.split()[4])  for line in curr_serial.loc[curr_serial['output'].str.contains('leader is',                 na=False), 'output']]
                    assigned_rep  = [int(line.split()[13][:-1]) for line in curr_serial.loc[curr_serial['output'].str.contains('assigned reporter is', na=False), 'output']]
                    actual_rep    = curr_serial.loc[curr_serial['output'].str.contains('is reporter',  na=False), 'node_id']
                    ack_received_rep = curr_serial.loc[curr_serial['output'].str.contains('ACK received \\(timestamp',   na=False), 'node_id']
                    ack_received_src = curr_serial.loc[curr_serial['output'].str.contains('ACK received with timestamp', na=False), 'node_id']
                    ack_received  = pd.concat([ack_received_rep, ack_received_src])  # Includes both reporters and nodes that received the information in the distribution phase

                    # Make lists unique and filter non-existing leader IDs
                    leaders      = list(set(leaders).intersection(set(curr_serial_ids)))
                    assigned_rep = list(set(assigned_rep))
                    actual_rep   = list(set(actual_rep))
                    sensor_ids   = list(set(curr_serial_ids) - {bs_id})

                    # Find established networks
                    networks = []
                    for leader in leaders:
                        members = curr_serial.loc[curr_serial['output'].str.contains('received from leader {:d}'.format(leader), na=False), 'node_id']
                        networks.append([leader] + members.tolist())

                    row = {'test_id': test_id, 'evt': evt, 'sensor_ids': sensor_ids, 'bs': bs_id, 'leaders': leaders, 'networks': networks, 'assigned_reporters': assigned_rep, 'actual_reporters': actual_rep, 'ack': len(ack_received), 'health_ack': health_ack}
                    rows_list[1].append(row)


                # Power
                if power_df is not None:
                    df = power_df[['timestamp', 'node_id', 'current_mA', 'voltage_V']].copy()
                    df.set_index('timestamp', drop=False, inplace=True)
                    df.sort_index(inplace=True)

                    # Only look at defined nodes
                    power_ids = sorted(df['node_id'].unique())
                    if node_ids != power_ids:
                        self._logger.warning("Expected node IDs %s in power data of test %d, but observed %s" % (str(node_ids), test_id, str(power_ids),))
                        # Only use IDs where we also have serial output; due to FL sync errors, we can obtain power data but no actuations / serial

                    # Compute power
                    df['power_mW'] = df['current_mA'].mul(df['voltage_V'])

                    # Compute average duration between sampling in case values are missing
                    df['duration_s']  = df['timestamp'].diff()
                    median_duration_s = df['duration_s'].median()

                    for node_id in sorted(node_ids):
                        data_cropped = df[df['node_id'] == node_id]

                        for evt in range(0, len(evt_timing)):
                            curr_power = data_cropped.loc[evt_timing[evt]['start']:evt_timing[evt]['end']]

                            # Calc mean
                            current_avg = curr_power['current_mA'].mean() * MA_TO_UA
                            current_max = curr_power['current_mA'].max()  * MA_TO_UA
                            power_avg   = curr_power['power_mW'].mean()   * MW_TO_UW
                            power_max   = curr_power['power_mW'].max()    * MW_TO_UW
                            energy_tot  = np.trapz(y=curr_power['power_mW'].values, x=curr_power['timestamp'].values, dx=median_duration_s)

                            row = {'test_id': test_id, 'evt': evt, 'node_id': node_id, 'current_avg': current_avg, 'current_max': current_max, 'power_avg': power_avg, 'power_max': power_max, 'energy_tot': energy_tot}
                            rows_list[2].append(row)


        # Convert lists to dataframes
        results_tracing = pd.DataFrame(rows_list[0])
        results_serial  = pd.DataFrame(rows_list[1])
        results_power   = pd.DataFrame(rows_list[2]) if (len(rows_list[2]) > 0) else None

        return results_tracing, results_serial, results_power

    def store_metrics(self, test_input, test_output, path):
        if len(test_output) >= 3:
            results_tracing = test_output[0]
            results_serial  = test_output[1]
            results_power   = test_output[2]
        else:
            self._logger.warning("No power data for analysis, ignoring metrics")
            return False

        # Set 'test_id' as index to allow quick check whether all tests are available
        test_input.set_index('test_id', drop=False, inplace=True)
        test_input.sort_index(inplace=True)
        results_tracing.set_index('test_id', drop=False, inplace=True)
        results_serial.set_index('test_id',  drop=False, inplace=True)
        results_power.set_index('test_id',   drop=False, inplace=True)

        # Verify we have in- and output for each test
        if not test_input['test_id'].equals(results_tracing['test_id'].unique()) and test_input['test_id'].equals(results_serial['test_id'].unique()) and test_input['test_id'].equals(results_power['test_id'].unique()):
            self._logger.warning('Missing test IDs for comparison')

        # Compare each test input with its test output
        rows_list = []

        for test_in in test_input.itertuples():

            if (test_in.test_id not in results_tracing.index) or (test_in.test_id not in results_serial.index) or (test_in.test_id not in results_power.index):
                self._logger.debug('Skipping test %d due to missing output' % (test_in.test_id,))
            else:

                # Prepare input
                offsets = [codetection[1:]  for codetection in test_in.codetections]

                for evt in range(0, len(offsets)):
                    # Prepare output
                    curr_traces = results_tracing[(results_tracing['test_id'] == test_in.test_id) & (results_tracing['evt'] == evt)]
                    curr_serial = results_serial[ (results_serial['test_id']  == test_in.test_id) & (results_serial['evt']  == evt)]

                    # In case of serious errors where no node had sufficient traces, reporting is skipped
                    if (curr_traces.shape[0] == 0) or (curr_serial.shape[0] == 0):
                        self._logger.warning('Skipping event %d of test %d due to missing output, as only %d traces and %d lines of serial output' % (evt, test_in.test_id, curr_traces.shape[0], curr_serial.shape[0],))
                        continue

                    curr_sensor_ids = curr_serial['sensor_ids'].item()
                    curr_reporters  = curr_serial['actual_reporters'].item()

                    # Check whether someone reported the event
                    reports = curr_traces.loc[(curr_traces['node_id'].isin(curr_reporters) & (curr_traces['start'] == 0)), 'report']
                    if len(reports) == 0:
                        self._logger.warning('Skipping event %2i for all nodes in test %4i because report is missing' % (evt, test_in.test_id,))
                        continue
                    elif len(reports) > 1:
                        self._logger.info('Observed multiple reports (%i) for event %2i in test %4i, choosing earliest' % (len(reports), evt, test_in.test_id,))
                        curr_evt_reported = reports.min()
                    else:
                        curr_evt_reported = reports.item()

                    curr_power = results_power[(results_power['test_id'] == test_in.test_id) & (results_power['evt'] == evt)]

                    for node_id in curr_sensor_ids:

                        # Find timestamp of trigger
                        trigger = curr_traces.loc[(curr_traces['node_id'] == node_id) & (curr_traces['start'] == 1), 'stag_wkup']
                        if len(trigger) == 0:
                            self._logger.warning('Skipping event %2i for node %2i in test %4i because trigger is missing' % (evt, node_id, test_in.test_id,))
                            continue

                        # Timing
                        curr_evt_start = trigger.item()
                        latency_rep    = int((curr_evt_reported - curr_evt_start) * S_TO_US)

                        # Power
                        current_source = int(curr_power.loc[curr_power['node_id'] == node_id, 'current_avg'].mean())
                        energy_source  = int(curr_power.loc[curr_power['node_id'] == node_id, 'energy_tot'].mean())

                        row = {'test_id': test_in.test_id, 'node_id': node_id, 'latency_us': latency_rep, 'current_mA': current_source, 'energy_mJ': energy_source}
                        rows_list.append(row)

        # Convert to metrics per node
        results = pd.DataFrame(rows_list)

        for test_in in test_input.itertuples():
            rows_list     = []
            nr_codets     = 0
            nr_codets_min = None

            # Get unique node IDs
            node_ids = results_tracing.loc[(results_tracing['test_id'] == test_in.test_id), 'node_id'].unique()

            for node_id in node_ids:
                data_cropped = results[(results['test_id'] == test_in.test_id) & (results['node_id'] == node_id)]

                # Make sure that all nodes aggregate over the same number of events
                if data_cropped.shape[0] != nr_codets and nr_codets > 0:
                    self._logger.warning('Node %2i has data from %i co-detections as opposed to %i co-detections for other nodes in test %4i' % (node_id, data_cropped.shape[0], nr_codets, test_in.test_id,))
                    nr_codets_min = min(nr_codets_min, data_cropped.shape[0])
                elif nr_codets == 0:
                    nr_codets_min = data_cropped.shape[0]
                nr_codets = max(nr_codets, data_cropped.shape[0])

                # Store metrics over all events
                # NOTE: We average here already, as some experiments do not contain all co-detections for all nodes; averaging here does not distort the measurements, as these events with missing data are automatically filtered as they ignored during processing
                row = {'test_id': test_in.test_id, 'node_id': node_id, 'latency_us': data_cropped['latency_us'].mean(), 'current_mA': data_cropped['current_mA'].mean(), 'energy_mJ': data_cropped['energy_mJ'].mean()}
                rows_list.append(row)

            # Convert to dataframe and generate path
            df        = pd.DataFrame(rows_list)
            file_path = path + 'metrics_%s_c_%i_e_%i_test_%i%s.pkl' % (self._project_name, nr_codets, len(node_ids), test_in.test_id, '_min_%i' % (nr_codets_min,) if nr_codets_min < nr_codets else '',)

            # Verify that file doesnt exist yet; if it does, overwrite
            if os.path.isfile(file_path):
                os.remove(file_path)
                self._logger.info('Removed old metrics file at path %s' % (file_path,))

            # Store dataframe
            df.to_pickle(path=file_path)
            self._logger.info('Saved metrics to path %s' % (file_path,))

    def extract_geophone_data(self, path):
        if path is None:
            raise ValueError('Path to files must not be None')

        # Get node IDs in directory
        if not os.path.isdir(path):
            raise ValueError('Path \'%s\' must point to a directory' % (path,))
        else:
            device_IDs = [int(f.name) for f in os.scandir(path) if f.is_dir()]
            self._logger.info('Extracting Geophone data from %s' % (device_IDs,))

        # Generate lists for storing data
        acquisition_list = []
        com_data_list    = []
        event_list       = []
        health_list      = []

        for device_ID in device_IDs:
            subdir_path = path + '/%i' % (device_ID,)

            try:
                # Get number of acquisitions
                with open(subdir_path + '/ACQID.TXT') as f:
                    acq_ID = int(f.read())

                # Get Config
                with open(subdir_path + '/CONFIG.TXT') as f:
                    lines = f.readlines()
                    config_str = ''

                    for line in lines:
                        if 'DEVICE_ID' in line:
                            config_str += line[:-1]

                # Get Info
                with open(subdir_path + '/INFO.TXT') as f:
                    lines = f.readlines()
                    info_str = ''

                    for line in lines:
                        if   'COMPILE_DATE' in line:
                            info_str += line[:-1]
                        elif 'COMPILE_TIME' in line:
                            info_str += ', ' + line[:-1]
                        elif 'FIRMWARE_VER' in line:
                            info_str += ', ' + line[:-1]

                # Get Resets
                with open(subdir_path + '/RESETS.TXT') as f:
                    nr_resets = int(f.read())

                # Read acq data per date
                acq_dates = [f.path for f in os.scandir(subdir_path) if f.is_dir()]
                nr_acquisitions = 0
                nr_acq_days     = 0

                for acq_date in acq_dates:
                    # Get acquisitions
                    acq_path = acq_date + '/_ACQ.TXT'
                    if os.path.isfile(acq_path):
                        nr_acq_days += 1

                        with open(acq_path) as f:
                            acquisitions = DictReader(f)

                            for row in acquisitions:
                                row['device_id'] = str(device_ID)
                                acquisition_list.append(row)
                                nr_acquisitions += 1

                    # Get communication data
                    com_path = acq_date + '/_COMDATA.TXT'
                    if os.path.isfile(com_path):

                        with open(com_path) as f:
                            com_data = DictReader(f)

                            for row in com_data:
                                row['device_id'] = str(device_ID)
                                com_data_list.append(row)
                    else:
                        self._logger.warning('Node %i did not generate communication data on %s' % (device_ID, acq_date[-10:]))

                    # Get events
                    evt_path = acq_date + '/_EVENTS.TXT'
                    if os.path.isfile(evt_path):

                        with open(evt_path) as f:
                            events = DictReader(f)

                            for row in events:
                                row['device_id'] = str(device_ID)
                                event_list.append(row)

                    # Get health data
                    health_path = acq_date + '/_HEALTH.TXT'
                    if os.path.isfile(health_path):

                        with open(health_path) as f:
                            health_data = DictReader(f)

                            for row in health_data:
                                row['device_id'] = str(device_ID)
                                health_list.append(row)
                    else:
                        self._logger.warning('Node %i did not generate health data on %s' % (device_ID, acq_date[-10:]))

                # Print statistics
                self._logger.info('Successfully extracted Geophone data from %i; %5i acquisitions on %3i dates; Config: %s; Info: %s; Resets: %2i' % (device_ID, nr_acquisitions, nr_acq_days, config_str, info_str, nr_resets,))

            except Exception as ex:
                self._logger.warning('Experienced problem for node %i: %s' % (device_ID, str(ex),))

        self._logger.info('Successfully extracted all Geophone data from %i nodes in %s' % (len(device_IDs), path,))

        return [pd.DataFrame(acquisition_list), pd.DataFrame(com_data_list), pd.DataFrame(event_list), pd.DataFrame(health_list)]

    @staticmethod
    def convert_geophone_data(acquisition_list):

        df = acquisition_list.copy()

        # Required fields
        columns = ['device_id', 'start_time', 'end_time', 'generation_time', 'trg_duration']

        # Convert fields used for computations to ints / floats
        df['device_id']  = df['device_id'].astype(int)
        df['start_time'] = df['start_time'].astype(int)

        # Generate 'trg_duration' [s]
        trigger_samples     = np.maximum(df['trg_last_pos_sample'].astype(int), df['trg_last_neg_sample'].astype(int))
        trigger_duration_us = (df['first_time'].astype(int) - df['start_time']) + (trigger_samples / df['adc_sps'].astype(int) * S_TO_US)
        df['trg_duration']  = trigger_duration_us.div(S_TO_US)

        # Generate 'end_time'
        df['end_time'] = df['start_time'] + trigger_duration_us

        # Set 'generation_time' as 'start_time' and set as index
        df['generation_time'] = pd.to_datetime(df['start_time'], utc=True, unit='us')

        df.set_index('generation_time', drop=False, inplace=True)
        df.sort_index(inplace=True)

        return df[columns]

    @staticmethod
    def convert_adc_value(value, adc_pga, bytes_per_sample):
        """ Converts integer values from the integer output of the ADC to mV

        @param value:            int # Integer value to convert
        @param adc_pga:          int # Pre amplification factor of the ADC
        @param bytes_per_sample: int # Number of bytes each sample uses

        @returns: The converted value
        """
        # Inspired by: R-Script: permasense_geophone_exporter.R (author: jbeutel)
        return (value * 2500 / (2 ** (bytes_per_sample * 8) - 1) - 1250) / adc_pga

    @staticmethod
    def convert_binary_data(binary_data, n_samples, adc_pga, endian="big"):
        """ Converts the binary output of the ADC to a list of samples in mV

        @param binary_data: bytes # Binary input to convert
        @param n_samples:   int   # Number of samples
        @param adc_pga:     int   # Pre amplification factor of the ADC
        @param endian:      str   # Byte order: 'big' or 'little'; Default: 'big'

        @returns: List of samples in mV
        """
        # Inspired by script adcdata_to_csv.py (author: rdaforno)
        if n_samples == 0:
            return []
        if not isinstance(binary_data, (bytes, bytearray)):
            raise TypeError('binary_data must be a bytes object')
        if (len(binary_data) % n_samples) > 0:
            raise ValueError('Cannot convert binary data of length %s to %d samples' % (len(binary_data), n_samples,))

        bytes_per_sample = int(len(binary_data) / n_samples)
        result           = np.zeros(n_samples)

        for i in range(0, n_samples):
            result[i] = DataManager.convert_adc_value(
                int.from_bytes(
                    binary_data[
                        (bytes_per_sample * i):(bytes_per_sample * (i + 1))
                    ],
                    byteorder=endian,
                    signed=False,
                ),
                adc_pga,
                bytes_per_sample,
            )
        return result

    @staticmethod
    def convert_metadata(metadata):
        """ Converts the metadata information

        @param metadata: # Metadata information on ADC samples

        @returns: converted metadata
        """
        # Convert times into seconds
        for element in [
            ["generation_time",             1000],
            ["timestamp",                   1000],
            ["start_time",               1000000],
            ["first_time",               1000000],
            ["end_time",                 1000000],
            ["generation_time_microsec", 1000000],
        ]:
            if element[0] in metadata and not metadata[element[0]] is None:
                metadata[element[0]] = dt.datetime.fromtimestamp(metadata[element[0]] / element[1])

        # Add end time of event to the metadata
        metadata["end_time"] = metadata["first_time"] + dt.timedelta(seconds=(metadata["samples"] - 1) / metadata["sampling_rate"])

        # Correct invalid PGA settings
        metadata["adc_pga"]  = max(1, metadata["adc_pga"])

        # Convert the values of the max and min peak
        metadata["peak_pos_val"] = DataManager.convert_adc_value(metadata["peak_pos_val"], metadata["adc_pga"], 3)
        metadata["peak_neg_val"] = DataManager.convert_adc_value(metadata["peak_neg_val"], metadata["adc_pga"], 3)

        return metadata