To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 7ec1d41f authored by Roman Trüb's avatar Roman Trüb
Browse files

add support to process multiple sync packet epochs in datatrace processing

raise error if residuals in time correction of datatrace processing are too large
parent 3a1f0c7e
......@@ -494,19 +494,19 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, resultfi
# append datatrace elements from obsever to datatrace log file
resultfile_lock.acquire()
with open(resultfile_path, "a") as outfile:
dfData.to_csv(
outfile,
columns=['global_ts', 'obsid', 'nodeid', 'varname', 'data', 'operation', 'PC', 'local_ts_tc'],
index=False,
header=False
)
dfData.to_csv(
outfile,
columns=['global_ts', 'obsid', 'nodeid', 'varname', 'data', 'operation', 'PC', 'local_ts_tc'],
index=False,
header=False
)
resultfile_lock.release()
# append overflow events to errorlog
for idx, row in dfOverflow.iterrows():
write_to_error_log(row['global_ts_uncorrected'], obsid, nodeid, 'Datatrace: event rate too high (overflow occurred)!')
# append info about delayed timestamps to errorlog
for idx, row in dfLocalTs.iterrows():
if row['tc'] != 0:
if row['tc'] != 0:
write_to_error_log(row['global_ts'], obsid, nodeid, 'Datatrace: timestamp has been delayed (tc={})!'.format(row['tc']))
except:
......
......@@ -50,17 +50,22 @@ FULL_TIMESTAMP = 1999999 # timestamp in LocalTimestampPkt when overflow h
PRESCALER = 16 # prescaler configured in Trace Control Register (ITM_TCR)
# NOTE: needs to match the settings on the observer!
DT_FIXED_OFFSET = -7.450e-3 # time offset between datatrace and GPIO service
# (ts_datatrace + offset = ts_gpio)
# time offset between datatrace and GPIO service (ts_datatrace + offset = ts_gpio)
DT_FIXED_OFFSET = -0.007448270618915558 # no offset correction
# DT_FIXED_OFFSET = 0.0008908960223197937 # shift by 2*std(residual)
# DT_FIXED_OFFSET = 0.0008908960223197937 # shift min(residual)
FILTER_THRESHOLD = 0.15 # Threshold for percentage of filtered messages to produce an error
RESIDUAL_UNFILTERED_THRESHOLD = 0.150 # Threshold for residuals magnitude to producing error (in seconds)
################################################################################
# SwoParser Class
################################################################################
class SwoParser():
def __init__(self):
self._streamStarted = False
self._currentPkt = []
self._ignoreBytes = 0
self._processingSyncPkt = True # we assume that SWO stream starts with sync packet
self._currentPkt = []
class SwoPkt():
def __init__(self, header, globalTs=None):
......@@ -224,33 +229,37 @@ class SwoParser():
Args:
swoByte: single SWO byte (header or payload) which shall be parsed.
NOTE: SWO bytes need to be inserted in the correct sequence (as outputted by the SWO port)
Returns:
Parsed packet object if provided swoByte leads to the completion of a packet, None otherwise
Returns: (packet, newSyncEpoch)
packet: Parsed packet object if provided swoByte leads to the completion of a packet, None otherwise
newSyncEpoch: True if a new sync epoch started (i.e. sync packet received and first non-zero byte added), False otherwise
"""
retPkt = None
newSyncEpoch = False
# ignore payload bytes of unrecognized packet
if self._ignoreBytes:
self._ignoreBytes -= 1
return retPkt, newSyncEpoch
# sync to packet in byte stream
if not self._streamStarted:
# process sync packet (to sync to packet in byte stream)
if self._processingSyncPkt:
# read all zeros until get an 0x80, then we are in sync (Synchronization packet)
# NOTE: dpp2lora bytestream does not contain required single-bit in Synchronization packet
if swoByte == 0:
return None
elif swoByte == 0x08:
return None
return retPkt, newSyncEpoch
# elif swoByte == 0x08:
# return retPkt, newSyncEpoch
else:
self._streamStarted = True
# ignore paylaod bytes of nrecognized packet
if self._ignoreBytes:
self._ignoreBytes -= 1
return None
# got a non-0 byte -> sync pkt finished, current byte is processed and interpreted as header
self._processingSyncPkt = False
newSyncEpoch = True
# parse packets with content
if len(self._currentPkt) == 0:
# HEADER: we do not currently have a begun packet -> start new one
# ignore all zero bytes from sync packets (in the end)
# HEADER: we do not currently have a begun packet -> start new sync packet
if swoByte == 0b0:
return None
# we expect a new header, got 0 byte -> interpreted as start of sync packet
self._processingSyncPkt = True
elif swoByte & 0b11001111 == 0b11000000:
# Local timestamp packet header
self._currentPkt.append(type(self).LocalTimestampPkt(header=swoByte, globalTs=globalTs))
......@@ -280,10 +289,10 @@ class SwoParser():
self._currentPkt[0].addByte(swoByte)
# check whether current packet is complete
if self._currentPkt[0].isComplete():
return self._currentPkt.pop()
else:
return None
if len(self._currentPkt) != 0 and self._currentPkt[0].isComplete():
retPkt = self._currentPkt.pop()
return retPkt, newSyncEpoch
################################################################################
# METHODS
......@@ -298,33 +307,53 @@ def processDatatraceOutput(input_file):
Returns:
df: dataframe containing the processed data
"""
# # DEBUG
# import matplotlib.pyplot as plt
# plt.close('all')
# read raw file into list
dataTsList = readRaw(input_file)
# parse data/globalTs stream from list
pktList = parseDataTs(dataTsList)
# parse data/globalTs stream from list (returns packet list split into different sync packet epochs)
syncEpochList = parseDataTs(dataTsList)
# # DEBUG
# with open('pktList.txt', 'w') as f:
# for i, pkt in enumerate(pktList):
# f.write('{}\n{}\n'.format(i, pkt))
# process packet list of each sync epoch separately
dfDataCorrList = []
dfLocalTsCorrList = []
dfOverflowList = []
for i, pktList in enumerate(syncEpochList):
# # DEBUG
print('INFO: Sync Epoch {}'.format(i))
# with open('pktList_{}.txt'.format(i), 'w') as f:
# for i, pkt in enumerate(pktList):
# f.write('{}\n{}\n'.format(i, pkt))
# split localTs epochs
batchList = splitEpochs(pktList)
# split localTs epochs
batchList = splitEpochs(pktList)
# # DEBUG
# for batch in batchList:
# print([type(e).__name__ for e in batch])
# # DEBUG
# for batch in batchList:
# print([type(e).__name__ for e in batch])
# combine data packets and add localTs
dfData, dfLocalTs, dfOverflow = combinePkts(batchList)
# combine data packets and add localTs
dfData, dfLocalTs, dfOverflow = combinePkts(batchList)
if len(dfLocalTs) == 0:
raise Exception('ERROR: dfLocalTs is empty -> unable to apply time correction!')
if len(dfLocalTs) < 2:
raise Exception('ERROR: dfLocalTs is empty or does not contain enough pkts -> unable to apply time correction!')
dfDataCorr, dfLocalTsCorr = timeCorrection(dfData, dfLocalTs)
dfDataCorr, dfLocalTsCorr = timeCorrection(dfData, dfLocalTs)
return dfDataCorr, dfLocalTsCorr, dfOverflow
dfDataCorrList.append(dfDataCorr)
dfLocalTsCorrList.append(dfLocalTsCorr)
dfOverflowList.append(dfOverflow)
# concat results from different sync epochs
dfData = pd.concat(dfDataCorrList)
dfLocalTs = pd.concat(dfLocalTsCorrList)
dfOverflow = pd.concat(dfOverflowList)
return dfData, dfLocalTs, dfOverflow
def readRaw(input_file):
......@@ -364,25 +393,40 @@ def readRaw(input_file):
def parseDataTs(inList):
"""
Parses data/globalTs stream from queue.
Parses data/globalTs stream from list and batches optained pkts into sync packet epochs.
"""
syncEpochList = []
completedPkts = []
firstSyncEpoch = True # we assume that SWO byte stream starts with sync pkt => first sync epoch is expected to be empty
swoParser = SwoParser()
while inList:
data, globalTs = inList.pop(0)
for swoByte in data:
ret = swoParser.addSwoByte(swoByte, globalTs)
ret, newSyncEpoch = swoParser.addSwoByte(swoByte, globalTs)
if newSyncEpoch:
# print('INFO: new sync epoch started!')
if firstSyncEpoch:
# ignore packets from first sync epoch (we assume that SWO stream starts with sync pkt -> completedPkts should be empty anyway)
firstSyncEpoch = False
else:
syncEpochList.append(completedPkts)
completedPkts = [] # NOTE: do not use completedPkts.clear() since this would also clear the list just appended to syncEpochList (as both variables point to the same list object)
if ret:
completedPkts.append(ret)
# append last sync epoch to syncEpochList
syncEpochList.append(completedPkts)
# # DEBUG
# for i, pkt in enumerate(completedPkts):
# print(i)
# print(pkt)
# for i, syncEpoch in enumerate(syncEpochList):
# print('=== Sync Epoch {} ==='.format(i))
# for i, pkt in enumerate(completedPkts):
# print(i)
# print(pkt)
return completedPkts
return syncEpochList
def splitEpochs(pktList):
......@@ -589,6 +633,10 @@ def timeCorrection(dfData, dfLocalTs):
residualsUnfiltered = y - (slopeUnfiltered*x + interceptUnfiltered)
# produce error if residuals are unusually large
if np.max(np.abs(residualsUnfiltered)) > RESIDUAL_UNFILTERED_THRESHOLD:
raise Exception('ERROR: time correction: residuals are unusually large ({:.6f}s)! Most likely there is a gap/jump in the datatrace data.'.format(np.max(np.abs(residualsUnfiltered))))
## filter outliers (since they have a negative impact on slope of reconstructed globalTs)
# determine mask of time sync points to keep
maskFiltered = np.abs(residualsUnfiltered) < 2*np.std(residualsUnfiltered)
......@@ -604,8 +652,8 @@ def timeCorrection(dfData, dfLocalTs):
print('INFO: Outlier filtering removed {:0.2f}%'.format(ratioFiltered*100.))
# print('INFO: Regression before filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeUnfiltered, interceptUnfiltered))
# print('INFO: Regression after filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeFiltered, interceptFiltered))
if ratioFiltered > 0.15:
raise Exception('ERROR: Outlier filter filtered away more than 10% of all time sync points: filtered {:0.2f}%'.format(ratioFiltered*100.))
if ratioFiltered > FILTER_THRESHOLD:
raise Exception('ERROR: Outlier filter filtered away more than {:.0f}% of all time sync points: filtered {:0.2f}%'.format(FILTER_THRESHOLD*100., ratioFiltered*100.))
# shift regression line to compensate offset (since global timestamps can only be positive)
offset = 0
......@@ -617,7 +665,7 @@ def timeCorrection(dfData, dfLocalTs):
# # DEBUG visualize
# import matplotlib.pyplot as plt
# plt.close('all')
# # plt.close('all')
# ## regression
# fig, ax = plt.subplots()
# ax.scatter(x, y, marker='.', label='Data (uncorrected)', c='r')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment