To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 60ae37eb authored by Reto Da Forno's avatar Reto Da Forno
Browse files

Merge branch 'master' of gitlab.ethz.ch:tec/public/flocklab/server

parents 72bd8ed1 d20711fc
......@@ -494,19 +494,19 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, resultfi
# append datatrace elements from obsever to datatrace log file
resultfile_lock.acquire()
with open(resultfile_path, "a") as outfile:
dfData.to_csv(
outfile,
columns=['global_ts', 'obsid', 'nodeid', 'varname', 'data', 'operation', 'PC', 'local_ts_tc'],
index=False,
header=False
)
dfData.to_csv(
outfile,
columns=['global_ts', 'obsid', 'nodeid', 'varname', 'data', 'operation', 'PC', 'local_ts_tc'],
index=False,
header=False
)
resultfile_lock.release()
# append overflow events to errorlog
for idx, row in dfOverflow.iterrows():
write_to_error_log(row['global_ts_uncorrected'], obsid, nodeid, 'Datatrace: event rate too high (overflow occurred)!')
# append info about delayed timestamps to errorlog
for idx, row in dfLocalTs.iterrows():
if row['tc'] != 0:
if row['tc'] != 0:
write_to_error_log(row['global_ts'], obsid, nodeid, 'Datatrace: timestamp has been delayed (tc={})!'.format(row['tc']))
except:
......
......@@ -50,17 +50,22 @@ FULL_TIMESTAMP = 1999999 # timestamp in LocalTimestampPkt when overflow h
PRESCALER = 16 # prescaler configured in Trace Control Register (ITM_TCR)
# NOTE: needs to match the settings on the observer!
DT_FIXED_OFFSET = -7.450e-3 # time offset between datatrace and GPIO service
# (ts_datatrace + offset = ts_gpio)
# time offset between datatrace and GPIO service (ts_datatrace + offset = ts_gpio)
DT_FIXED_OFFSET = -0.007448270618915558 # no offset correction
# DT_FIXED_OFFSET = 0.0008908960223197937 # shift by 2*std(residual)
# DT_FIXED_OFFSET = 0.0008908960223197937 # shift min(residual)
FILTER_THRESHOLD = 0.15 # Threshold for percentage of filtered messages to produce an error
RESIDUAL_UNFILTERED_THRESHOLD = 0.150 # Threshold for residuals magnitude to producing error (in seconds)
################################################################################
# SwoParser Class
################################################################################
class SwoParser():
def __init__(self):
self._streamStarted = False
self._currentPkt = []
self._ignoreBytes = 0
self._processingSyncPkt = True # we assume that SWO stream starts with sync packet
self._currentPkt = []
class SwoPkt():
def __init__(self, header, globalTs=None):
......@@ -98,7 +103,7 @@ class SwoParser():
def addByte(self, byteVal):
if len(self._plBytes) >= 4:
raise Exception('ERROR: Payload of LocalTimestampPkt cannot be longer than 4 bytes! MCU probably not properly intialized...')
raise Exception('ERROR: Payload of LocalTimestampPkt cannot be longer than 4 bytes! MCU probably not properly initialized...')
self._plBytes.append(byteVal)
def isComplete(self):
......@@ -224,33 +229,37 @@ class SwoParser():
Args:
swoByte: single SWO byte (header or payload) which shall be parsed.
NOTE: SWO bytes need to be inserted in the correct sequence (as outputted by the SWO port)
Returns:
Parsed packet object if provided swoByte leads to the completion of a packet, None otherwise
Returns: (packet, newSyncEpoch)
packet: Parsed packet object if provided swoByte leads to the completion of a packet, None otherwise
newSyncEpoch: True if a new sync epoch started (i.e. sync packet received and first non-zero byte added), False otherwise
"""
retPkt = None
newSyncEpoch = False
# ignore payload bytes of unrecognized packet
if self._ignoreBytes:
self._ignoreBytes -= 1
return retPkt, newSyncEpoch
# sync to packet in byte stream
if not self._streamStarted:
# process sync packet (to sync to packet in byte stream)
if self._processingSyncPkt:
# read all zeros until get an 0x80, then we are in sync (Synchronization packet)
# NOTE: dpp2lora bytestream does not contain required single-bit in Synchronization packet
if swoByte == 0:
return None
elif swoByte == 0x08:
return None
return retPkt, newSyncEpoch
# elif swoByte == 0x08:
# return retPkt, newSyncEpoch
else:
self._streamStarted = True
# ignore paylaod bytes of nrecognized packet
if self._ignoreBytes:
self._ignoreBytes -= 1
return None
# got a non-0 byte -> sync pkt finished, current byte is processed and interpreted as header
self._processingSyncPkt = False
newSyncEpoch = True
# parse packets with content
if len(self._currentPkt) == 0:
# HEADER: we do not currently have a begun packet -> start new one
# ignore all zero bytes from sync packets (in the end)
# HEADER: we do not currently have a begun packet -> start new sync packet
if swoByte == 0b0:
return None
# we expect a new header, got 0 byte -> interpreted as start of sync packet
self._processingSyncPkt = True
elif swoByte & 0b11001111 == 0b11000000:
# Local timestamp packet header
self._currentPkt.append(type(self).LocalTimestampPkt(header=swoByte, globalTs=globalTs))
......@@ -280,10 +289,10 @@ class SwoParser():
self._currentPkt[0].addByte(swoByte)
# check whether current packet is complete
if self._currentPkt[0].isComplete():
return self._currentPkt.pop()
else:
return None
if len(self._currentPkt) != 0 and self._currentPkt[0].isComplete():
retPkt = self._currentPkt.pop()
return retPkt, newSyncEpoch
################################################################################
# METHODS
......@@ -298,30 +307,53 @@ def processDatatraceOutput(input_file):
Returns:
df: dataframe containing the processed data
"""
# # DEBUG
# import matplotlib.pyplot as plt
# plt.close('all')
# read raw file into list
dataTsList = readRaw(input_file)
# parse data/globalTs stream from list
pktList = parseDataTs(dataTsList)
# parse data/globalTs stream from list (returns packet list split into different sync packet epochs)
syncEpochList = parseDataTs(dataTsList)
# # DEBUG
# with open('pktList.txt', 'w') as f:
# for i, pkt in enumerate(pktList):
# f.write('{}\n{}\n'.format(i, pkt))
# process packet list of each sync epoch separately
dfDataCorrList = []
dfLocalTsCorrList = []
dfOverflowList = []
for i, pktList in enumerate(syncEpochList):
# # DEBUG
print('INFO: Sync Epoch {}'.format(i))
# with open('pktList_{}.txt'.format(i), 'w') as f:
# for i, pkt in enumerate(pktList):
# f.write('{}\n{}\n'.format(i, pkt))
# split localTs epochs
batchList = splitEpochs(pktList)
# split localTs epochs
batchList = splitEpochs(pktList)
# # DEBUG
# for batch in batchList:
# print([type(e).__name__ for e in batch])
# # DEBUG
# for batch in batchList:
# print([type(e).__name__ for e in batch])
# combine data packets and add localTs
dfData, dfLocalTs, dfOverflow = combinePkts(batchList)
if len(dfLocalTs) < 2:
raise Exception('ERROR: dfLocalTs is empty or does not contain enough pkts -> unable to apply time correction!')
# combine data packets and add localTs
dfData, dfLocalTs, dfOverflow = combinePkts(batchList)
dfDataCorr, dfLocalTsCorr = timeCorrection(dfData, dfLocalTs)
dfDataCorr, dfLocalTsCorr = timeCorrection(dfData, dfLocalTs)
dfDataCorrList.append(dfDataCorr)
dfLocalTsCorrList.append(dfLocalTsCorr)
dfOverflowList.append(dfOverflow)
return dfDataCorr, dfLocalTsCorr, dfOverflow
# concat results from different sync epochs
dfData = pd.concat(dfDataCorrList)
dfLocalTs = pd.concat(dfLocalTsCorrList)
dfOverflow = pd.concat(dfOverflowList)
return dfData, dfLocalTs, dfOverflow
def readRaw(input_file):
......@@ -361,25 +393,40 @@ def readRaw(input_file):
def parseDataTs(inList):
"""
Parses data/globalTs stream from queue.
Parses data/globalTs stream from list and batches optained pkts into sync packet epochs.
"""
syncEpochList = []
completedPkts = []
firstSyncEpoch = True # we assume that SWO byte stream starts with sync pkt => first sync epoch is expected to be empty
swoParser = SwoParser()
while inList:
data, globalTs = inList.pop(0)
for swoByte in data:
ret = swoParser.addSwoByte(swoByte, globalTs)
ret, newSyncEpoch = swoParser.addSwoByte(swoByte, globalTs)
if newSyncEpoch:
# print('INFO: new sync epoch started!')
if firstSyncEpoch:
# ignore packets from first sync epoch (we assume that SWO stream starts with sync pkt -> completedPkts should be empty anyway)
firstSyncEpoch = False
else:
syncEpochList.append(completedPkts)
completedPkts = [] # NOTE: do not use completedPkts.clear() since this would also clear the list just appended to syncEpochList (as both variables point to the same list object)
if ret:
completedPkts.append(ret)
# append last sync epoch to syncEpochList
syncEpochList.append(completedPkts)
# # DEBUG
# for i, pkt in enumerate(completedPkts):
# print(i)
# print(pkt)
# for i, syncEpoch in enumerate(syncEpochList):
# print('=== Sync Epoch {} ==='.format(i))
# for i, pkt in enumerate(completedPkts):
# print(i)
# print(pkt)
return completedPkts
return syncEpochList
def splitEpochs(pktList):
......@@ -582,8 +629,13 @@ def timeCorrection(dfData, dfLocalTs):
# calculate intial linear regression
# FIXME: try out more elaborate regressions (piecewise linear, regression splines), would mainly be useful for high-ppm-clock sources
slopeUnfiltered, interceptUnfiltered, r_valueUnfiltered, p_valueUnfiltered, std_errUnfiltered = stats.linregress(x, y)
# print('interceptUnfiltered: {}'.format(interceptUnfiltered))
residualsUnfiltered = y - (slopeUnfiltered*x + interceptUnfiltered)
residualsUnfiltered = (slopeUnfiltered*x + interceptUnfiltered) - y
# produce error if residuals are unusually large
if np.max(np.abs(residualsUnfiltered)) > RESIDUAL_UNFILTERED_THRESHOLD:
raise Exception('ERROR: time correction: residuals are unusually large ({:.6f}s)! Most likely there is a gap/jump in the datatrace data.'.format(np.max(np.abs(residualsUnfiltered))))
## filter outliers (since they have a negative impact on slope of reconstructed globalTs)
# determine mask of time sync points to keep
......@@ -593,45 +645,82 @@ def timeCorrection(dfData, dfLocalTs):
ratioFiltered = (len(maskFiltered) - np.sum(maskFiltered))/len(maskFiltered)
# calcualte new regression
slopeFiltered, interceptFiltered, r_valueFiltered, p_valueFiltered, std_errFiltered = stats.linregress(xFiltered, yFiltered)
residualsFiltered = (slopeFiltered*xFiltered + interceptFiltered) - yFiltered
residualsFiltered = yFiltered - (slopeFiltered*xFiltered + interceptFiltered)
# print('interceptFiltered: {}'.format(interceptFiltered))
print('INFO: Outlier filtering removed {:0.2f}%'.format(ratioFiltered*100.))
# print('INFO: Regression before filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeUnfiltered, interceptUnfiltered))
# print('INFO: Regression after filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeFiltered, interceptFiltered))
if ratioFiltered > 0.15:
raise Exception('ERROR: Outlier filter filtered away more than 10% of all time sync points: filtered {:0.2f}%'.format(ratioFiltered*100.))
if ratioFiltered > FILTER_THRESHOLD:
raise Exception('ERROR: Outlier filter filtered away more than {:.0f}% of all time sync points: filtered {:0.2f}%'.format(FILTER_THRESHOLD*100., ratioFiltered*100.))
# shift regression line to compensate offset (since global timestamps can only be positive)
offset = 0
# offset = -2*np.std(residualsFiltered)
# offset = np.min(residualsFiltered)
slopeFinal = slopeFiltered
interceptFinal = interceptFiltered + offset
residualsFinal = yFiltered - (slopeFinal*xFiltered + interceptFinal)
# # DEBUG visualize
# import matplotlib.pyplot as plt
# plt.close('all')
# # regression
# # plt.close('all')
# ## regression
# fig, ax = plt.subplots()
# ax.scatter(x, y, marker='.', label='Data (uncorrected)', c='r')
# ax.plot(x, slopeUnfiltered*x + interceptUnfiltered, label='Regression (x->y)', c='b', marker='.')
# ax.set_title('Regression')
# ax.set_title('Regression (unfiltered)')
# ax.set_xlabel('LocalTs')
# ax.set_ylabel('GlobalTs')
# ax.legend()
# # residuals (before outlier filtering)
# ## residuals (before outlier filtering)
# fig, ax = plt.subplots()
# ax.plot(x, residualsUnfiltered, label='Residual', c='b', marker='.')
# ax.axhline(y=0, xmin=0, xmax=x[-1], linestyle='--', c='k')
# # ax.plot(x, pd.DataFrame(residualsUnfiltered).rolling(100, center=True, min_periods=1).mean().to_numpy(), label='Residual (moving avg)', c='orange', marker='.')
# ax.set_title('Residuals (before outlier filtering)')
# ax.set_xlabel('LocalTs')
# ax.set_ylabel('Diff')
# ax.set_ylabel('Diff [s]')
# ax.legend()
# # residuals (after outlier filtering)
# ## residuals (after outlier filtering)
# fig, ax = plt.subplots()
# ax.plot(xFiltered, residualsFiltered, label='Residual', c='b', marker='.')
# ax.axhline(y=0, xmin=0, xmax=x[-1], linestyle='--', c='k')
# # ax.plot(x, pd.DataFrame(residualsFiltered).rolling(100, center=True, min_periods=1).mean().to_numpy(), label='Residual (moving avg)', c='orange', marker='.')
# ax.set_title('Residuals (after outlier filtering)')
# ax.set_xlabel('LocalTs')
# ax.set_ylabel('Diff [s]')
# ax.legend()
# ## residuals (after offset correction)
# fig, ax = plt.subplots()
# ax.plot(xFiltered, residualsFinal, label='Residual', c='b', marker='.')
# ax.axhline(y=0, xmin=0, xmax=x[-1], linestyle='--', c='k')
# # ax.plot(x, pd.DataFrame(residualsFiltered).rolling(100, center=True, min_periods=1).mean().to_numpy(), label='Residual (moving avg)', c='orange', marker='.')
# ax.set_title('Residuals (after offset correction)')
# ax.set_xlabel('LocalTs')
# ax.set_ylabel('Diff')
# ax.legend()
# # residuals hist (before outlier filtering)
# fig, ax = plt.subplots()
# ax.hist(residualsUnfiltered, 50)
# # ax.axvline(y=0, xmin=0, xmax=x[-1], linestyle='--', c='k')
# # ax.plot(x, pd.DataFrame(residualsFiltered).rolling(100, center=True, min_periods=1).mean().to_numpy(), label='Residual (moving avg)', c='orange', marker='.')
# ax.set_title('Residuals Histogram (before outlier filtering)')
# ax.set_xlabel('Diff [s]')
# ax.set_ylabel('Count')
# # residuals hist (after offset correction)
# fig, ax = plt.subplots()
# ax.hist(residualsFinal, 50)
# # ax.axvline(y=0, xmin=0, xmax=x[-1], linestyle='--', c='k')
# # ax.plot(x, pd.DataFrame(residualsFiltered).rolling(100, center=True, min_periods=1).mean().to_numpy(), label='Residual (moving avg)', c='orange', marker='.')
# ax.set_title('Residuals Histogram (after offset correction)')
# ax.set_xlabel('Diff [s]')
# ax.set_ylabel('Count')
# add corrected timestamps to dataframe
dfDataCorr['global_ts'] = dfDataCorr.local_ts * slopeFiltered + interceptFiltered + DT_FIXED_OFFSET
dfLocalTsCorr['global_ts'] = dfLocalTsCorr.local_ts * slopeFiltered + interceptFiltered + DT_FIXED_OFFSET
dfDataCorr['global_ts'] = dfDataCorr.local_ts * slopeFinal + interceptFinal + DT_FIXED_OFFSET
dfLocalTsCorr['global_ts'] = dfLocalTsCorr.local_ts * slopeFinal + interceptFinal + DT_FIXED_OFFSET
return dfDataCorr, dfLocalTsCorr
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment