Commit 9497a622 authored by Roman Trüb's avatar Roman Trüb
Browse files

added piecewise regression with cubic splines

added byte rate based offset correction for old linux platform
parent a16ea08c
...@@ -38,26 +38,35 @@ import sys ...@@ -38,26 +38,35 @@ import sys
import time import time
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from scipy import stats from scipy import stats, interpolate
import collections import collections
################################################################################ ################################################################################
# Constants # Constants
################################################################################ ################################################################################
# data trace configs
FULL_TIMESTAMP = 1999999 # timestamp in LocalTimestampPkt when overflow happened FULL_TIMESTAMP = 1999999 # timestamp in LocalTimestampPkt when overflow happened
# see ARM CoreSight Components Technical Reference Manual # see ARM CoreSight Components Technical Reference Manual
PRESCALER = 16 # prescaler configured in Trace Control Register (ITM_TCR) PRESCALER = 16 # prescaler configured in Trace Control Register (ITM_TCR)
# NOTE: needs to match the settings on the observer! # NOTE: needs to match the settings on the observer!
# time offset between datatrace and GPIO service (ts_datatrace + offset = ts_gpio) # time offset between datatrace and GPIO service (ts_datatrace + offset = ts_gpio)
DT_FIXED_OFFSET = -5.0e-3 # shift half of loop delay DT_FIXED_OFFSET = -5.1e-3 # shift half of loop delay plus 0.1ms from other delays
# DT_FIXED_OFFSET_RESET = +11.65e-3 # time correction based on reset is disabled, see comment at end of timeCorrection() # DT_FIXED_OFFSET_RESET = +11.65e-3 # time correction based on reset is disabled, see comment at end of timeCorrection()
# outlier filter
FILTER_THRESHOLD = 0.15 # Threshold for percentage of filtered messages to produce an error FILTER_THRESHOLD = 0.15 # Threshold for percentage of filtered messages to produce an error
RESIDUAL_UNFILTERED_THRESHOLD = 0.300 # Threshold for residuals magnitude to producing error (in seconds) RESIDUAL_UNFILTERED_THRESHOLD = 0.300 # Threshold for residuals magnitude to producing error (in seconds)
PLATFORM_CORRECTION_OFFSET = -1.3e-3 # offset for platforms (Linux version) with larger overhead
# timestamp correction based on Linux platform
PLATFORM_SLEEP_OVERHEAD_THRESHOLD = 0.300e-3 # threshold of the measured sleepOverhead to identify platform (Linux version)
PLATFORM_WINDOW = 2 # window for calculating rolling byte rate
PLATFORM_BYTE_RATE_THRESHOLD = 500 # threshold on generated byte rate to apply offset correction (for old linux platform only)
PLATFORM_OFFSET = 1.25e-3 # offset for offset correction (for old linux platform only)
# regression
PIECEWISE_REGRESSION = True
PIECEWISE_REGRESSION_SEGMENT_LENGTH = 120 # in seconds
################################################################################ ################################################################################
# SwoParser Class # SwoParser Class
...@@ -537,6 +546,9 @@ def combinePkts(batchList): ...@@ -537,6 +546,9 @@ def combinePkts(batchList):
localTsCum = 0 localTsCum = 0
for batch in batchList: for batch in batchList:
# determine total number of bytes of this local timestamp epoch (this batch)
numBytes = np.sum([len(p._plBytes)+1 for p in batch])
# sort packets # sort packets
dataPkts = [] dataPkts = []
localTsPkts = [] localTsPkts = []
...@@ -592,6 +604,7 @@ def combinePkts(batchList): ...@@ -592,6 +604,7 @@ def combinePkts(batchList):
newRow['global_ts_uncorrected'] = localTsPkts[0].globalTs newRow['global_ts_uncorrected'] = localTsPkts[0].globalTs
newRow['tc'] = localTsPkts[0].tc newRow['tc'] = localTsPkts[0].tc
newRow['full_ts'] = int(localTsPkts[0].ts == FULL_TIMESTAMP) # indicate wheter LocalTimestampPkt is overflow pkt or not newRow['full_ts'] = int(localTsPkts[0].ts == FULL_TIMESTAMP) # indicate wheter LocalTimestampPkt is overflow pkt or not
newRow['epoch_num_bytes'] = numBytes
localTsOut += [newRow] localTsOut += [newRow]
# process overflow packets (note: there should be 0 or 1 OverflowPkt) # process overflow packets (note: there should be 0 or 1 OverflowPkt)
...@@ -630,12 +643,46 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch): ...@@ -630,12 +643,46 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch):
x = df['local_ts'].to_numpy(dtype=float) x = df['local_ts'].to_numpy(dtype=float)
y = df['global_ts_uncorrected'].to_numpy(dtype=float) + DT_FIXED_OFFSET y = df['global_ts_uncorrected'].to_numpy(dtype=float) + DT_FIXED_OFFSET
# calculate intial linear regression # offset correction for old linux platform
# FIXME: try out more elaborate regressions (piecewise linear, regression splines), would mainly be useful for high-ppm-clock sources # measured sleepOverhead is used to identify platform (linux version)
slopeUnfiltered, interceptUnfiltered, r_valueUnfiltered, p_valueUnfiltered, std_errUnfiltered = stats.linregress(x, y) if sleepOverhead > PLATFORM_SLEEP_OVERHEAD_THRESHOLD: # linux system with large overhead
# print('interceptUnfiltered: {}'.format(interceptUnfiltered)) df['global_ts_uncorrected'] = df['global_ts_uncorrected'].to_numpy(dtype=float)
ts = df.global_ts_uncorrected.to_numpy()
numBytes = df.epoch_num_bytes.to_numpy()
# eventRate = [np.sum((t - PLATFORM_WINDOW < ts) & (ts < t))/PLATFORM_WINDOW)]
# byteRate = [np.sum(numBytes[(t - PLATFORM_WINDOW < ts) & (ts < t)])/PLATFORM_WINDOW for t in ts] # approx 3x less efficient than rollingRate()
byteRate = rollingRate(ts, numBytes, window=PLATFORM_WINDOW)
residualsUnfiltered = y - (slopeUnfiltered*x + interceptUnfiltered) # # DEBUG
# import matplotlib.pyplot as plt
# plt.close('all')
# fig, ax = plt.subplots()
# # ax.plot(ts, eventRate, c='b', marker='.')
# # ax.set_ylabel('events/s')
# ax.plot(ts, byteRate, c='b', marker='.')
# ax.set_ylabel('bytes/s')
# ax.set_xlabel('GlobalTs (uncorrected)')
# y -= 1.1e-3*(1 - eventRate/eventRate.max())
# y -= 1.1e-3*(1 - byteRate/byteRate.max())
y -= PLATFORM_OFFSET*(byteRate < PLATFORM_BYTE_RATE_THRESHOLD)
## calculate intial regression
if PIECEWISE_REGRESSION:
# piecewise regression -> interpolate.splrep()
totalTime = x[-1] - x[0]
numSegments = max(1, np.floor(totalTime/(PIECEWISE_REGRESSION_SEGMENT_LENGTH*3e6))) # FIXME: hard-coded factor (3e6) to convert 1s into local timestamp counter cycles depends on clock speed (here 48MHz) and prescaler (here 16)
# TODO: determine number of segments based on duration of 1 full timestamp (timediff between local timestamps with ts=1999999)
internalKnots = np.linspace(x[0], x[-1], num=numSegments, endpoint=False)[1:]
tckUnfiltered = interpolate.splrep(x, y, k=3, s=0, t=internalKnots)
yUnfilteredReg = interpolate.splev(x, tckUnfiltered, der=0)
residualsUnfiltered = y - yUnfilteredReg
else:
# single linear regression -> stats.linregress()
slopeUnfiltered, interceptUnfiltered, r_valueUnfiltered, p_valueUnfiltered, std_errUnfiltered = stats.linregress(x, y)
# print('interceptUnfiltered: {}'.format(interceptUnfiltered))
yUnfilteredReg = slopeUnfiltered*x + interceptUnfiltered
residualsUnfiltered = y - yUnfilteredReg
# produce error if residuals are unusually large # produce error if residuals are unusually large
if np.max(np.abs(residualsUnfiltered)) > RESIDUAL_UNFILTERED_THRESHOLD: if np.max(np.abs(residualsUnfiltered)) > RESIDUAL_UNFILTERED_THRESHOLD:
...@@ -647,11 +694,19 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch): ...@@ -647,11 +694,19 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch):
xFiltered = x[maskFiltered] xFiltered = x[maskFiltered]
yFiltered = y[maskFiltered] yFiltered = y[maskFiltered]
ratioFiltered = (len(maskFiltered) - np.sum(maskFiltered))/len(maskFiltered) ratioFiltered = (len(maskFiltered) - np.sum(maskFiltered))/len(maskFiltered)
# calcualte new regression
slopeFiltered, interceptFiltered, r_valueFiltered, p_valueFiltered, std_errFiltered = stats.linregress(xFiltered, yFiltered)
residualsFiltered = yFiltered - (slopeFiltered*xFiltered + interceptFiltered)
# print('interceptFiltered: {}'.format(interceptFiltered))
# calculate final regression
if PIECEWISE_REGRESSION:
# piecewise regression -> interpolate.splrep()
tckFiltered = interpolate.splrep(xFiltered, yFiltered, k=3, s=0, t=internalKnots)
yFilteredReg = interpolate.splev(xFiltered, tckFiltered, der=0)
residualsFiltered = yFiltered - yFilteredReg
else:
# single linear regression -> stats.linregress()
slopeFiltered, interceptFiltered, r_valueFiltered, p_valueFiltered, std_errFiltered = stats.linregress(xFiltered, yFiltered)
yFilteredReg = slopeFiltered*xFiltered + interceptFiltered
residualsFiltered = yFiltered - yFilteredReg
# print('interceptFiltered: {}'.format(interceptFiltered))
print('INFO: Outlier filtering removed {:0.2f}%'.format(ratioFiltered*100.)) print('INFO: Outlier filtering removed {:0.2f}%'.format(ratioFiltered*100.))
# print('INFO: Regression before filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeUnfiltered, interceptUnfiltered)) # print('INFO: Regression before filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeUnfiltered, interceptUnfiltered))
...@@ -659,13 +714,15 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch): ...@@ -659,13 +714,15 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch):
if ratioFiltered > FILTER_THRESHOLD: if ratioFiltered > FILTER_THRESHOLD:
raise Exception('ERROR: Outlier filter filtered away more than {:.0f}% of all time sync points: filtered {:0.2f}%'.format(FILTER_THRESHOLD*100., ratioFiltered*100.)) raise Exception('ERROR: Outlier filter filtered away more than {:.0f}% of all time sync points: filtered {:0.2f}%'.format(FILTER_THRESHOLD*100., ratioFiltered*100.))
# shift regression line to compensate offset (since global timestamps can only be positive) if PIECEWISE_REGRESSION:
offset = 0 tckFinal = tckFiltered
# offset = -2*np.std(residualsFiltered) yFinalReg = interpolate.splev(xFiltered, tckFinal, der=0)
# offset = np.min(residualsFiltered) residualsFinal = yFiltered - yFinalReg
slopeFinal = slopeFiltered else:
interceptFinal = interceptFiltered + offset slopeFinal = slopeFiltered
residualsFinal = yFiltered - (slopeFinal*xFiltered + interceptFinal) interceptFinal = interceptFiltered
yFinalReg = slopeFinal*xFiltered + interceptFinal
residualsFinal = yFiltered - yFinalReg
# # DEBUG visualize # # DEBUG visualize
# import matplotlib.pyplot as plt # import matplotlib.pyplot as plt
...@@ -673,7 +730,8 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch): ...@@ -673,7 +730,8 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch):
# ## regression # ## regression
# fig, ax = plt.subplots() # fig, ax = plt.subplots()
# ax.scatter(x, y, marker='.', label='Data (uncorrected)', c='r') # ax.scatter(x, y, marker='.', label='Data (uncorrected)', c='r')
# ax.plot(x, slopeUnfiltered*x + interceptUnfiltered, label='Regression (x->y)', c='b', marker='.') # ax.plot(x, yUnfilteredReg, label='Regression (x->y) (unfiltered)', c='b', marker='.')
# ax.plot(xFiltered, yFilteredReg, label='Regression (x->y) (filtered)', c='g', marker='.')
# ax.set_title('Regression (unfiltered)') # ax.set_title('Regression (unfiltered)')
# ax.set_xlabel('LocalTs') # ax.set_xlabel('LocalTs')
# ax.set_ylabel('GlobalTs') # ax.set_ylabel('GlobalTs')
...@@ -722,13 +780,13 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch): ...@@ -722,13 +780,13 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch):
# ax.set_xlabel('Diff [s]') # ax.set_xlabel('Diff [s]')
# ax.set_ylabel('Count') # ax.set_ylabel('Count')
# add platform (linux version) dependent correction offset
# measured sleepOverhead is used to identify platform
platformCorrection = PLATFORM_CORRECTION_OFFSET if sleepOverhead > 0.263e-3 else 0
# add corrected timestamps to dataframe # add corrected timestamps to dataframe
dfDataCorr['global_ts'] = dfDataCorr.local_ts * slopeFinal + interceptFinal + platformCorrection if PIECEWISE_REGRESSION:
dfLocalTsCorr['global_ts'] = dfLocalTsCorr.local_ts * slopeFinal + interceptFinal + platformCorrection dfDataCorr['global_ts'] = interpolate.splev(dfDataCorr.local_ts, tckFinal, der=0)
dfLocalTsCorr['global_ts'] = interpolate.splev(dfLocalTsCorr.local_ts, tckFinal, der=0)
else:
dfDataCorr['global_ts'] = dfDataCorr.local_ts * slopeFinal + interceptFinal
dfLocalTsCorr['global_ts'] = dfLocalTsCorr.local_ts * slopeFinal + interceptFinal
# NOTE: Disabled offset correction based on reset again as it seemed that duration between release of reset pin and start of local timestamp counter is application dependent, i.e. not constant # NOTE: Disabled offset correction based on reset again as it seemed that duration between release of reset pin and start of local timestamp counter is application dependent, i.e. not constant
# # correct offset of global timestamp using the synchronized release from reset at start of the test on FlockLab 2 # # correct offset of global timestamp using the synchronized release from reset at start of the test on FlockLab 2
...@@ -741,7 +799,7 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch): ...@@ -741,7 +799,7 @@ def timeCorrection(dfData, dfLocalTs, sleepOverhead, firstSyncEpoch):
# offsetReset = startCorr - startActual # offsetReset = startCorr - startActual
# # DEBUG # # DEBUG
# print('INFO: offset (based on initial reset): {}'.format(offsetReset)) # print('INFO: offset (based on initial reset): {}'.format(offsetReset))
#
# dfDataCorr['global_ts'] = dfDataCorr.global_ts - offsetReset + DT_FIXED_OFFSET_RESET # dfDataCorr['global_ts'] = dfDataCorr.global_ts - offsetReset + DT_FIXED_OFFSET_RESET
# dfLocalTsCorr['global_ts'] = dfLocalTsCorr.global_ts - offsetReset + DT_FIXED_OFFSET_RESET # dfLocalTsCorr['global_ts'] = dfLocalTsCorr.global_ts - offsetReset + DT_FIXED_OFFSET_RESET
...@@ -768,6 +826,25 @@ def is_float(str): ...@@ -768,6 +826,25 @@ def is_float(str):
except ValueError: except ValueError:
return False return False
def rollingRate(ts, inVec, window):
'''
more efficient way of calculating
outVec = [np.sum(inVec[(t - window < ts) & (ts < t)])/window for t in ts]
for ordered data
'''
outVec = np.empty_like(inVec)
wStartIdx = 0
wEndIdx = 0
for tsIdx in range(len(ts)):
# update window (tIdx)
while wStartIdx < len(ts) and ts[wStartIdx] < ts[tsIdx] - window:
wStartIdx += 1
while wEndIdx < len(ts) and ts[wEndIdx] <= ts[tsIdx]:
wEndIdx += 1
# calc outVec
outVec[tsIdx] = np.sum(inVec[wStartIdx:wEndIdx])/window
return outVec
################################################################################ ################################################################################
# MAIN # MAIN
################################################################################ ################################################################################
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment