To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

dwt_parse.py 28 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#! /usr/bin/env python3

"""
Copyright (c) 2020, ETH Zurich, Computer Engineering Group
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
  list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
  this list of conditions and the following disclaimer in the documentation
  and/or other materials provided with the distribution.

* Neither the name of the copyright holder nor the names of its
  contributors may be used to endorse or promote products derived from
  this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.

33
34
Author:  Lukas Daschinger
Adapted: Roman Trub
35
36
37
38
39
40
"""

import sys
import time
import numpy as np
import pandas as pd
41
from scipy import stats
42
43
import collections

44
45
46
################################################################################
# Constants
################################################################################
47
48
49
50
51
52
53
54
FULL_TIMESTAMP = 1999999        # timestamp in LocalTimestampPkt when overflow happened
                                # see ARM CoreSight Components Technical Reference Manual

PRESCALER = 16                  # prescaler configured in Trace Control Register (ITM_TCR)
                                # NOTE: needs to match the settings on the observer!

DT_FIXED_OFFSET = -7.450e-3     # time offset between datatrace and GPIO service
                                # (ts_datatrace + offset = ts_gpio)
55
56
57
58

################################################################################
# SwoParser Class
################################################################################
Roman Trüb's avatar
Roman Trüb committed
59
60
61
62
63
64
65
class SwoParser():
    def __init__(self):
        self._streamStarted = False
        self._currentPkt = []
        self._ignoreBytes = 0

    class SwoPkt():
66
        def __init__(self, header, globalTs=None):
Roman Trüb's avatar
Roman Trüb committed
67
68
            self._header = header
            self._plBytes = []
69
70
71
72
73
74
75
76
77
            self._globalTs = globalTs

        @property
        def globalTs(self):
            return self._globalTs

        @globalTs.setter
        def globalTs(self, globalTs):
            self._globalTs = globalTs
Roman Trüb's avatar
Roman Trüb committed
78
79
80
81
82
83
84
85
86
87

        def addByte(self, byteVal):
            raise Exception('ERROR: This function is a prototype and should not directly be called!')

        def isComplete(self):
            raise Exception('ERROR: This function is a prototype and should not directly be called!')

        def __str__(self):
            raise Exception('ERROR: This function is a prototype and should not directly be called!')

88
89
90
        def __repr__(self):
            return str(self)

Roman Trüb's avatar
Roman Trüb committed
91
92

    class LocalTimestampPkt(SwoPkt):
93
94
        def __init__(self, header, globalTs=None):
            super().__init__(header, globalTs)
Roman Trüb's avatar
Roman Trüb committed
95
96
            self._complete = False
            self._format2 = (self._header & 0b10001111 == 0) # format 2 (single-byte packet)
97
            self._tc = (header >> 4) & 0b11 if not self._format2 else 0b00 ## format 2 can only occur if timestamp is synchronous (i.e. tc=0b00)
Roman Trüb's avatar
Roman Trüb committed
98
99

        def addByte(self, byteVal):
100
101
            if len(self._plBytes) >= 4:
                raise Exception('ERROR: Payload of LocalTimestampPkt cannot be longer than 4 bytes! MCU probably not properly intialized...')
Roman Trüb's avatar
Roman Trüb committed
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
            self._plBytes.append(byteVal)

        def isComplete(self):
            if self._format2:
                # format 2 (single-byte packet) case
                return True
            else:
                # format 1 (at least one payload byte)
                if not self._plBytes:
                    return False
                continuation_bit = self._plBytes[-1] & 0x80
                # continuation_bit==0 indicates that this is the last byte of the local timstamp packet
                return continuation_bit==0

        @property
        def ts(self):
            if not self.isComplete():
                raise Exception('ERROR: Cannot get timestamp from incomplete LocalTimestampPkt')

            if self._format2:
                ret = (self._header & 0b01110000) >> 4
            else:
                ret = 0
                for i, byte in enumerate(self._plBytes):
                    ret |= (byte & 0x7f) << i * 7  # remove the first bit and shift by 0,7,14,21 depending on value
            return ret

        @property
        def tc(self):
            return self._tc

        def __str__(self):
            ret = "LocalTimestampPkt {} {:#010b}{}:".format(self._header, self._header, "" if self.isComplete() else " (incomplete)")
            ret += "\n  bytes: {}".format(self._plBytes)
136
            ret += "\n  globalTs: {}".format(self.globalTs)
Roman Trüb's avatar
Roman Trüb committed
137
138
139
140
141
142
143
            ret += "\n  format: {}".format(2 if self._format2 else 1)
            if self.isComplete():
                ret += "\n  ts: {}".format(self.ts)
                ret += "\n  tc: {}".format(self.tc)
            return ret

    class DatatracePkt(SwoPkt):
144
145
        def __init__(self, header, globalTs=None):
            super().__init__(header, globalTs)
Roman Trüb's avatar
Roman Trüb committed
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
            self._payloadSize = map_size(header & 0b11)
            self._pktType = (header >> 6) & 0b11 # 1: PC value or address; 2: data value; otherweise: reserved
            self._comparator = (header >> 4) & 0b11 # comparator that generated the data
            self._addressPkt = None # True if data trace address pkt, False if data trace PC value pkt
            self._writeAccess = None # True if write access, False if read access
            if self._pktType == 1:  # PC value or address
                self._addressPkt = (header >> 3 & 0b1)
            elif self._pktType == 2: # data value
                self._writeAccess = (header >> 3 & 0b1)
            else:
                raise Exception('ERROR: Reserved data trace packet type encountered!')

        def addByte(self, byteVal):
            self._plBytes.append(byteVal)

        def isComplete(self):
            return len(self._plBytes) == self._payloadSize

        @property
        def pktType(self):
            return self._pktType

        @property
        def comparator(self):
            return self._comparator

        @property
        def addressPkt(self):
            return self._addressPkt

        @property
        def writeAccess(self):
            return self._writeAccess

        @property
        def value(self):
182
183
184
185
            ret = 0
            for i, byte in enumerate(self._plBytes):
                ret |= byte << i * 8
            return ret
Roman Trüb's avatar
Roman Trüb committed
186
187
188
189

        def __str__(self):
            ret = "DatatracePkt {} {:#010b}{}:".format(self._header, self._header, "" if self.isComplete() else " (incomplete)")
            ret += "\n  bytes: {}".format(self._plBytes)
190
191
            ret += "\n  globalTs: {}".format(self.globalTs)
            ret += "\n  pktType: {} ({})".format(self.pktType, 'PC value or address' if self.pktType == 1 else 'data value')
Roman Trüb's avatar
Roman Trüb committed
192
193
194
195
196
197
198
199
200
            ret += "\n  comparator: {}".format(self.comparator)
            ret += "\n  payloadSize: {}".format(self._payloadSize)
            if self.pktType == 1:
                # PC value or address;
                ret += "\n  addressPkt: {}".format(self.addressPkt)
            elif self.pktType == 2:
                # data value; otherweise: reserved
                ret += "\n  writeAccess: {}".format(self.writeAccess)
            else:
201
                raise Exception("ERROR: DatatracePkt with reserved packetType!")
202

Roman Trüb's avatar
Roman Trüb committed
203
204
            if self.isComplete():
                ret += "\n  value: {}".format(self.value)
205

Roman Trüb's avatar
Roman Trüb committed
206
            return ret
207

Roman Trüb's avatar
Roman Trüb committed
208
    class OverflowPkt(SwoPkt):
209
210
        def __init__(self, header, globalTs=None):
            super().__init__(header, globalTs)
211

Roman Trüb's avatar
Roman Trüb committed
212
213
        def isComplete(self):
            # overflow packet consists of a single header byte
214
215
            return True

Roman Trüb's avatar
Roman Trüb committed
216
        def __str__(self):
217
218
219
            ret = "OverflowPkt"
            ret += "\n  globalTs: {}".format(self.globalTs)
            return ret
220
221


222
    def addSwoByte(self, swoByte, globalTs=None):
Roman Trüb's avatar
Roman Trüb committed
223
224
225
226
227
228
        """
        Args:
            swoByte: single SWO byte (header or payload) which shall be parsed.
            NOTE: SWO bytes need to be inserted in the correct sequence (as outputted by the SWO port)
        Returns:
            Parsed packet object if provided swoByte leads to the completion of a packet, None otherwise
229

Roman Trüb's avatar
Roman Trüb committed
230
        """
231

Roman Trüb's avatar
Roman Trüb committed
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
        # sync to packet in byte stream
        if not self._streamStarted:
            # read all zeros until get an 0x80, then we are in sync (Synchronization packet)
            # NOTE: dpp2lora bytestream does not contain required single-bit in Synchronization packet
            if swoByte == 0:
                return None
            elif swoByte == 0x08:
                return None
            else:
                self._streamStarted = True

        # ignore paylaod bytes of nrecognized packet
        if self._ignoreBytes:
            self._ignoreBytes -= 1
            return None

        # parse packets with content
        if len(self._currentPkt) == 0:
250
251
252
253
254
            # HEADER: we do not currently have a begun packet -> start new one
            # ignore all zero bytes from sync packets (in the end)
            if swoByte == 0b0:
                return None
            elif swoByte & 0b11001111 == 0b11000000:
Roman Trüb's avatar
Roman Trüb committed
255
                # Local timestamp packet header
256
                self._currentPkt.append(type(self).LocalTimestampPkt(header=swoByte, globalTs=globalTs))
257
            elif (swoByte & 0b10001111 == 0b0) and not (swoByte == 0b01110000):
Roman Trüb's avatar
Roman Trüb committed
258
                # Local timestamp packet header (single-byte)
259
                self._currentPkt.append(type(self).LocalTimestampPkt(header=swoByte, globalTs=globalTs))
Roman Trüb's avatar
Roman Trüb committed
260
            elif swoByte == 0b01110000:
261
                self._currentPkt.append(type(self).OverflowPkt(header=swoByte, globalTs=globalTs))
Roman Trüb's avatar
Roman Trüb committed
262
263
264
265
266
267
268
            elif swoByte >> 2 & 0b1  == 0b1 and swoByte & 0b11 != 0b00:
                # Hardware source packet header
                discriminator_id = swoByte >> 3 & 0b11111
                plSize = map_size(swoByte & 0b11)
                if discriminator_id in [0, 1, 2]:
                    # 0 Event counter wrapping, 1 Exception tracing, 2 PC sampling
                    self._ignoreBytes = plSize
269
                    print('WARNING: Hardware source packet with discriminator_id={} ignored!'.format(discriminator_id))
Roman Trüb's avatar
Roman Trüb committed
270
271
                elif discriminator_id >= 8 and discriminator_id <= 23:
                    # Data tracing
272
                    self._currentPkt.append(type(self).DatatracePkt(header=swoByte, globalTs=globalTs))
Roman Trüb's avatar
Roman Trüb committed
273
274
275
276
277
278
                else:
                    # Other undefined header
                    raise Exception("ERROR: Unrecognized discriminator ID in hardware source packet header: {}".format(swoByte)) # packets with undefined discriminator_id appear sporadically -> we cannot throw error here
            else:
                print("ERROR: Unrecognized DWT packet header: {} {:#010b}".format(swoByte, swoByte))
        else:
279
            # PAYLOAD: we currently have a begun packet -> add data
Roman Trüb's avatar
Roman Trüb committed
280
            self._currentPkt[0].addByte(swoByte)
281

Roman Trüb's avatar
Roman Trüb committed
282
283
284
285
286
        # check whether current packet is complete
        if self._currentPkt[0].isComplete():
            return self._currentPkt.pop()
        else:
            return None
287

288
289
290
291
################################################################################
# METHODS
################################################################################
def processDatatraceOutput(input_file):
292
    """
293
294
    Executes the read, parse, timestamp adding, and time correction functions to
    parse a raw datatrace file into a dataframe of results.
295
296

    Parameters:
297
        input_file (str): path to the raw data trace file
298
    Returns:
299
        df: dataframe containing the processed data
300
    """
301
302
    # read raw file into list
    dataTsList = readRaw(input_file)
303

304
305
    # parse data/globalTs stream from list
    pktList = parseDataTs(dataTsList)
306

307
308
309
310
311
    # # DEBUG
    # with open('pktList.txt', 'w') as f:
    #     for i, pkt in enumerate(pktList):
    #         f.write('{}\n{}\n'.format(i, pkt))

312
    # split localTs epochs
313
    batchList = splitEpochs(pktList)
314

315
316
317
    # # DEBUG
    # for batch in batchList:
    #     print([type(e).__name__ for e in batch])
318

319
320
    # combine data packets and add localTs
    dfData, dfLocalTs, dfOverflow = combinePkts(batchList)
321

322
    dfDataCorr, dfLocalTsCorr = timeCorrection(dfData, dfLocalTs)
323

324
    return dfDataCorr, dfLocalTsCorr, dfOverflow
Roman Trüb's avatar
Roman Trüb committed
325
326


327
def readRaw(input_file):
328
    """
329
    Reads from raw data trace file and puts each line into a queue.
330
331
    """

332
    outList = []
333

334
335
    with open(input_file) as f:
        lines = f.readlines()
336

337
338
    # ignore first line with varnames
    lines.pop(0)
339

340
341
342
343
    for i in range(int(len(lines)/2)):
        # we expect that raw file starts with data (not with global timestamp)
        data = lines[i*2].strip()
        globalTs = lines[i*2+1].strip()
Roman Trüb's avatar
Roman Trüb committed
344

345
346
347
348
349
350
        # check and convert words in data line
        numbers = []
        for word in data.split():
            if not word.isdigit():
                raise Exception('ERROR: element of line is not digits as expected for a line with data')
            numbers.append(int(word))
Roman Trüb's avatar
Roman Trüb committed
351

352
353
354
        # check if globalTs line actually contains a float
        if not is_float(globalTs):
            raise Exception('ERROR: line is not float as expected for a line with global timestamp')
355

356
357
        # add data and timestamp as tuple
        outList.append((numbers, globalTs))
358

359
360
361
362
    return outList


def parseDataTs(inList):
363
    """
364
    Parses data/globalTs stream from queue.
365
    """
366
    completedPkts = []
367
    swoParser = SwoParser()
Roman Trüb's avatar
Roman Trüb committed
368

369
370
    while inList:
        data, globalTs = inList.pop(0)
371

372
373
374
375
        for swoByte in data:
            ret = swoParser.addSwoByte(swoByte, globalTs)
            if ret:
                completedPkts.append(ret)
376

377
378
379
380
381
382
383
384
    # # DEBUG
    # for i, pkt in enumerate(completedPkts):
    #     print(i)
    #     print(pkt)

    return completedPkts


385
def splitEpochs(pktList):
386
    """
387
    Splits stream of packets into epochs. Each epoch contains exactly one reference packet (either LocalTsPkt or OverflowPkt).
388
389
390
391
392
393
394
    """
    batchList = []
    startIdx = 0
    stopIdx = 0

    while startIdx < len(pktList):
        if type(pktList[startIdx]) == SwoParser.LocalTimestampPkt and pktList[startIdx].ts == FULL_TIMESTAMP:
395
            # next packet is local timestamp overflow packet -> put it into its own batch
396
            stopIdx += 1
397
398
399
        elif type(pktList[startIdx]) == SwoParser.OverflowPkt:
            # next packet is overflow packet -> put it into its own batch
            stopIdx += 1
400
        else:
401
            # next packet is NOT local timestamp overflow packet and NOT OverflowPkt
402

403
404
405
406
407
            ## search start of following localTs epoch

            # find current and next reference packet
            currentRefpktIdx = None
            followingRefpktIdx = None
408
            for i in range(startIdx, len(pktList)):
409
410
411
                if type(pktList[i]) in (SwoParser.LocalTimestampPkt, SwoParser.OverflowPkt):
                    if not currentRefpktIdx:
                        currentRefpktIdx = i
412
                    else:
413
                        followingRefpktIdx = i
414
                        break
415

416
417
418
419
420
421
422
423
            # we expect that there is at least 1 ref packet
            assert currentRefpktIdx
            # ref pkt should not be local timestamp overflow packet
            if type(pktList[currentRefpktIdx]) == SwoParser.LocalTimestampPkt:
                assert pktList[currentRefpktIdx].ts != FULL_TIMESTAMP

            # based on following reference packet, determine stopIdx
            if not followingRefpktIdx:
424
425
426
                # no following localTs found -> rest of list is single epoch
                stopIdx = len(pktList)
            else:
427
428
429
                # following reference packet found
                if type(pktList[followingRefpktIdx]) == SwoParser.LocalTimestampPkt and pktList[followingRefpktIdx].ts == FULL_TIMESTAMP:
                    stopIdx = followingRefpktIdx
430
431
                elif type(pktList[followingRefpktIdx]) == SwoParser.OverflowPkt:
                    stopIdx = followingRefpktIdx
432
                else:
433
434
435
436
437
                    ## go back to data packet that caused the reference packet
                    ## based on sample traces, up to 2 datatrace packet can precede a LocalTsPkt (PC and data)
                    ## it is not clear if other packets (e.g. overflow packet) could be between datatrace and localTsPkt
                    # find data packet preceding the reference pkt
                    data2Idx = followingRefpktIdx
438
439
                    while type(pktList[data2Idx]) != SwoParser.DatatracePkt:
                        data2Idx -= 1
440
                        assert data2Idx >= currentRefpktIdx # at least packets up to the found reference packet should be in the in the current epoch
441
                    # find data packet preceding the data2 data pkt
442
443
444
445
                    data1Idx = data2Idx - 1
                    while True:
                        if type(pktList[data1Idx]) == SwoParser.DatatracePkt:
                            break
446
                        elif data1Idx <= currentRefpktIdx:
447
448
                            data1Idx = None
                            break
449
                        else:
450
451
452
453
                            data1Idx -= 1

                    if data1Idx is None:
                        stopIdx = data2Idx
454
                    else:
455
456
457
                        if ((pktList[data1Idx].comparator == pktList[data2Idx].comparator) and
                            (pktList[data1Idx].pktType != pktList[data2Idx].pktType)):
                            stopIdx = data1Idx
458
                        else:
459
                            stopIdx = data2Idx
460

461
        # # DEBUG
462
        # print('({},{})'.format(startIdx, stopIdx))
463
464
465
466
467
        # print('[')
        # for pkt in pktList[startIdx:stopIdx]:
        #     print(pkt)
        # print(']')

468
469
470
        # add found epoch
        batchList.append(pktList[startIdx:stopIdx])
        startIdx = stopIdx
Roman Trüb's avatar
Roman Trüb committed
471

472
    return batchList
473
474


475
def combinePkts(batchList):
476
    """
477
    Combines data packets and adds localTs
478
    """
479
    dataColumns = ['global_ts_uncorrected', 'comparator', 'data', 'PC', 'operation', 'local_ts']
480
481
    localTsColumns = ['global_ts_uncorrected', 'local_ts', 'tc', 'full_ts']
    overflowColumns = ['global_ts_uncorrected']
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502

    dataOut = []
    localTsOut = []
    overflowOut = []
    localTsCum = 0

    for batch in batchList:
        # sort packets
        dataPkts = []
        localTsPkts = []
        overflowPkts = []
        for pkt in batch:
            if type(pkt) == SwoParser.DatatracePkt:
                dataPkts.append(pkt)
            elif type(pkt) == SwoParser.LocalTimestampPkt:
                localTsPkts.append(pkt)
            elif type(pkt) == SwoParser.OverflowPkt:
                overflowPkts.append(pkt)
            else:
                raise Exception('ERROR: Unknown packet type {}'.format(type(pkt)))

503
        if not ( (len(localTsPkts) == 1 and len(overflowPkts) == 0) or (len(localTsPkts) == 0 and len(overflowPkts) == 1)) :
504
            raise Exception('ERROR: batch does not contain exactly 1 reference packet (contains {} LocalTimestampPkt and {} OverflowPkt)!'.format(len(localTsPkts), len(overflowPkts)))
505

506
507
        if localTsPkts:
            localTsCum += localTsPkts[0].ts + 1/PRESCALER # +1 cycle (scaled by prescaler) because transition from last sent value to 0 takes one clock cycle (see ARM CoreSight Components Technical Reference Manual, p. 302)
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529

        # process data pkts
        while dataPkts:
            # decide whether to process one (data only) or two (data and PC) dataPkts
            use2Pkts = False
            if len(dataPkts) >= 2:
                if (dataPkts[0].comparator == dataPkts[1].comparator) and (dataPkts[0].pktType != dataPkts[1].pktType):
                    use2Pkts = True

            # create row
            newRow = collections.OrderedDict(zip(dataColumns, [None]*len(dataColumns)))
            minGlobalTs = None
            for i in range(use2Pkts+1):
                dataPkt = dataPkts.pop(0)
                minGlobalTs = dataPkt.globalTs if (minGlobalTs is None) else min(minGlobalTs, dataPkt.globalTs)
                if dataPkt.pktType == 1: # data trace pc value pkt
                    if not dataPkt.addressPkt: # we want PC value
                        newRow['PC'] = hex(dataPkt.value)
                elif dataPkt.pktType == 2: # data trace data value pkt
                    newRow['operation'] = 'w' if dataPkt.writeAccess else 'r'
                    newRow['data'] = dataPkt.value
            newRow['comparator'] = int(dataPkt.comparator)
530
531
532
            if localTsPkts:
                newRow['local_ts'] = localTsCum
                newRow['local_ts_tc'] = localTsPkts[0].tc
533
534
535
            newRow['global_ts_uncorrected'] = minGlobalTs
            dataOut += [newRow]

536
537
538
        # process local ts packets (note: there should be 0 or 1 LocalTimestampPkt)
        if localTsPkts:
            newRow = collections.OrderedDict(zip(localTsColumns, [None]*len(localTsColumns)))
539
            newRow['local_ts'] = localTsCum
540
541
542
543
544
545
546
547
548
549
            # newRow['local_ts_diff'] = localTsPkts[0].ts # DEBUG
            newRow['global_ts_uncorrected'] = localTsPkts[0].globalTs
            newRow['tc'] = localTsPkts[0].tc
            newRow['full_ts'] = int(localTsPkts[0].ts == FULL_TIMESTAMP) # indicate wheter LocalTimestampPkt is overflow pkt or not
            localTsOut += [newRow]

        # process overflow packets (note: there should be 0 or 1 OverflowPkt)
        if overflowPkts:
            newRow = collections.OrderedDict(zip(overflowColumns, [None]*len(overflowColumns)))
            newRow['global_ts_uncorrected'] = overflowPkts[0].globalTs
550
551
552
553
554
555
556
557
558
559
            overflowOut += [newRow]

    # prepare to return DataFrames
    dfData = pd.DataFrame(dataOut) if dataOut else pd.DataFrame(columns=dataColumns)
    dfLocalTs = pd.DataFrame(localTsOut) if localTsOut else pd.DataFrame(columns=localTsColumns)
    dfOverflow = pd.DataFrame(overflowOut) if overflowOut else pd.DataFrame(columns=overflowColumns)

    return dfData, dfLocalTs, dfOverflow


560
def timeCorrection(dfData, dfLocalTs):
561
    """
562
    Calculates a regression based on the values in dfLocalTs and adds corrected global timestamps.
563

564
565
566
567
    Params:
        dfData: dataframe containing data trace data
        dfLocalTs: dataframe containing local timestamp data
        dfOverflow: dataframe containing overflow data
568
    Returns:
569
570
571
        dfDataCorr: dataframe with added corrected global timestamps
        dfLocalTsCorr: dataframe with added corrected global timestamps
        dfOverflowCorr: dataframe with added corrected global timestamps
572
    """
573
574
575
576
577
578
579
580
581
    dfDataCorr = dfData.copy()
    dfLocalTsCorr = dfLocalTs.copy()

    # make sure only local timestamp of which are synchronized are used for regression
    df = dfLocalTsCorr[dfLocalTsCorr.tc == 0]

    x = df['local_ts'].to_numpy(dtype=float)
    y = df['global_ts_uncorrected'].to_numpy(dtype=float)

582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
    # calculate intial linear regression
    # FIXME: try out more elaborate regressions (piecewise linear, regression splines), would mainly be useful for high-ppm-clock sources
    slopeUnfiltered, interceptUnfiltered, r_valueUnfiltered, p_valueUnfiltered, std_errUnfiltered = stats.linregress(x, y)

    residualsUnfiltered = (slopeUnfiltered*x + interceptUnfiltered) - y

    ## filter outliers (since they have a negative impact on slope of reconstructed globalTs)
    # determine mask of time sync points to keep
    maskFiltered = np.abs(residualsUnfiltered) < 2*np.std(residualsUnfiltered)
    xFiltered = x[maskFiltered]
    yFiltered = y[maskFiltered]
    ratioFiltered = (len(maskFiltered) - np.sum(maskFiltered))/len(maskFiltered)
    # calcualte new regression
    slopeFiltered, interceptFiltered, r_valueFiltered, p_valueFiltered, std_errFiltered = stats.linregress(xFiltered, yFiltered)
    residualsFiltered = (slopeFiltered*xFiltered + interceptFiltered) - yFiltered

    print('INFO: Outlier filtering removed {:0.2f}%'.format(ratioFiltered*100.))
599
600
    # print('INFO: Regression before filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeUnfiltered, interceptUnfiltered))
    # print('INFO: Regression  after filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeFiltered, interceptFiltered))
601
    if ratioFiltered > 0.15:
602
603
        raise Exception('ERROR: Outlier filter filtered away more than 10% of all time sync points: filtered {:0.2f}%'.format(ratioFiltered*100.))

604
    # # DEBUG visualize
605
606
    # import matplotlib.pyplot as plt
    # plt.close('all')
607
608
609
    # # regression
    # fig, ax = plt.subplots()
    # ax.scatter(x, y, marker='.', label='Data (uncorrected)', c='r')
610
    # ax.plot(x, slopeUnfiltered*x + interceptUnfiltered, label='Regression (x->y)', c='b', marker='.')
611
612
613
614
    # ax.set_title('Regression')
    # ax.set_xlabel('LocalTs')
    # ax.set_ylabel('GlobalTs')
    # ax.legend()
615
    # # residuals (before outlier filtering)
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
    # fig, ax = plt.subplots()
    # ax.plot(x, residualsUnfiltered, label='Residual', c='b', marker='.')
    # # ax.plot(x, pd.DataFrame(residualsUnfiltered).rolling(100, center=True, min_periods=1).mean().to_numpy(), label='Residual (moving avg)', c='orange', marker='.')
    # ax.set_title('Residuals (before outlier filtering)')
    # ax.set_xlabel('LocalTs')
    # ax.set_ylabel('Diff')
    # ax.legend()
    # # residuals (after outlier filtering)
    # fig, ax = plt.subplots()
    # ax.plot(xFiltered, residualsFiltered, label='Residual', c='b', marker='.')
    # # ax.plot(x, pd.DataFrame(residualsFiltered).rolling(100, center=True, min_periods=1).mean().to_numpy(), label='Residual (moving avg)', c='orange', marker='.')
    # ax.set_title('Residuals (after outlier filtering)')
    # ax.set_xlabel('LocalTs')
    # ax.set_ylabel('Diff')
    # ax.legend()
631
632

    # add corrected timestamps to dataframe
633
634
    dfDataCorr['global_ts'] = dfDataCorr.local_ts * slopeFiltered + interceptFiltered + DT_FIXED_OFFSET
    dfLocalTsCorr['global_ts'] = dfLocalTsCorr.local_ts * slopeFiltered + interceptFiltered + DT_FIXED_OFFSET
635

636
    return dfDataCorr, dfLocalTsCorr
637
638
639
640
641
642
643
644
645
646
647
648
649
650


################################################################################
# UTILS
################################################################################
def map_size(ss):
    if ss == 1:
        return 1
    elif ss == 2:
        return 2
    elif ss == 3:
        return 4
    else:
        raise Exception('ERROR: Invalid ss size: ss should not be ==0 or >3')
651

652
653
654
655
656
657
def is_float(str):
    try:
        float(str)
        return True
    except ValueError:
        return False
658

659
660
661
################################################################################
# MAIN
################################################################################
662
if __name__ == '__main__':
663
664
665
    obsid = 7
    nodeid = 7

666
667
    if len(sys.argv) > 1:
        filename = sys.argv[1]
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
        # parse the file
        # first line of the log file contains the variable names
        varnames = ""
        with open(filename, "r") as f:
            varnames = f.readline().strip().split()

        dfData, dfLocalTs, dfOverflow = processDatatraceOutput(filename)

        dfData['obsid'] = obsid
        dfData['nodeid'] = nodeid
        # convert comparator ID to variable name
        dfData['varname'] = dfData.comparator.apply(lambda x: (varnames[x] if x < len(varnames) else str(x)))
        # df_corrected.sort_values(by=['global_ts'], inplace=True)
        with open(filename + '.csv', "w") as outfile:
            dfData.to_csv(
683
684
685
686
                outfile,
                columns=['global_ts', 'obsid', 'nodeid', 'varname', 'data', 'operation', 'PC', 'local_ts_tc'],
                index=False,
                header=True,
687
            )