To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit ae7ab410 authored by Roman Trüb's avatar Roman Trüb
Browse files

fixed handling of OverflowPkts in different sequences

parent a52b784e
......@@ -442,8 +442,8 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue
(itemtype, obsid, fdir, f, workerstate) = queueitem
input_filename = "%s/%s" % (fdir, f)
loggername = "(%s.%d) " % (cur_p.name, obsid)
# DEBUG
shutil.copyfile(input_filename, "%s_raw" % resultfile_path)
# # DEBUG
# shutil.copyfile(input_filename, "%s_raw" % resultfile_path)
# parse the file
# first line of the log file contains the variable names
varnames = ""
......@@ -464,13 +464,17 @@ def worker_datatrace(queueitem=None, nodeid=None, resultfile_path=None, logqueue
with open(resultfile_path, "a") as outfile:
dfData.to_csv(
outfile,
columns=['global_ts', 'obsid', 'nodeid', 'varname', 'data', 'operation', 'PC'],
columns=['global_ts', 'obsid', 'nodeid', 'varname', 'data', 'operation', 'PC', 'local_ts_tc'],
index=False,
header=False
)
# append overflow events to errorlog
for idx, row in dfOverflow.iterrows():
write_to_error_log(row['global_ts_uncorrected'], obsid, nodeid, 'Datatrace: event rate too high (overflow occurred)!')
# append info about delayed timestamps to errorlog
for idx, row in dfLocalTs.iterrows():
if row['tc'] != 0:
write_to_error_log(row['global_ts'], obsid, nodeid, 'Datatrace: timestamp has been delayed (tc={})!'.format(row['tc']))
except:
msg = "Error in datatrace worker process: %s: %s\n%s" % (str(sys.exc_info()[0]), str(sys.exc_info()[1]), traceback.format_exc())
......@@ -1085,7 +1089,7 @@ def main(argv):
elif service == 'serial':
header = 'timestamp,observer_id,node_id,direction,output\n'
elif service == 'datatrace':
header = 'timestamp,observer_id,node_id,variable,value,access,pc\n'
header = 'timestamp,observer_id,node_id,variable,value,access,pc,timestamp_delayed\n'
lock.acquire()
f = open(path, 'w')
f.write(header)
......
......@@ -302,6 +302,11 @@ def processDatatraceOutput(input_file):
# parse data/globalTs stream from list
pktList = parseDataTs(dataTsList)
# # DEBUG
# with open('pktList.txt', 'w') as f:
# for i, pkt in enumerate(pktList):
# f.write('{}\n{}\n'.format(i, pkt))
# split localTs epochs
batchList = splitEpochs(pktList)
......@@ -387,8 +392,11 @@ def splitEpochs(pktList):
if type(pktList[startIdx]) == SwoParser.LocalTimestampPkt and pktList[startIdx].ts == FULL_TIMESTAMP:
# next packet is local timestamp overflow packet -> put it into its own batch
stopIdx += 1
elif type(pktList[startIdx]) == SwoParser.OverflowPkt:
# next packet is overflow packet -> put it into its own batch
stopIdx += 1
else:
# next packet is NOT local timestamp overflow packet
# next packet is NOT local timestamp overflow packet and NOT OverflowPkt
## search start of following localTs epoch
......@@ -417,6 +425,8 @@ def splitEpochs(pktList):
# following reference packet found
if type(pktList[followingRefpktIdx]) == SwoParser.LocalTimestampPkt and pktList[followingRefpktIdx].ts == FULL_TIMESTAMP:
stopIdx = followingRefpktIdx
elif type(pktList[followingRefpktIdx]) == SwoParser.OverflowPkt:
stopIdx = followingRefpktIdx
else:
## go back to data packet that caused the reference packet
## based on sample traces, up to 2 datatrace packet can precede a LocalTsPkt (PC and data)
......@@ -425,7 +435,7 @@ def splitEpochs(pktList):
data2Idx = followingRefpktIdx
while type(pktList[data2Idx]) != SwoParser.DatatracePkt:
data2Idx -= 1
assert data2Idx >= currentRefpktIdx # at least packets up to the found refernce packet should be in the in the current epoch
assert data2Idx >= currentRefpktIdx # at least packets up to the found reference packet should be in the in the current epoch
# find data packet preceding the data2 data pkt
data1Idx = data2Idx - 1
while True:
......@@ -447,8 +457,7 @@ def splitEpochs(pktList):
stopIdx = data2Idx
# # DEBUG
# numLocalTsPkts = np.sum([type(pkt)==SwoParser.LocalTimestampPkt for pkt in pktList[startIdx:stopIdx]])
# print(numLocalTsPkts)
# print('({},{})'.format(startIdx, stopIdx))
# print('[')
# for pkt in pktList[startIdx:stopIdx]:
# print(pkt)
......@@ -489,7 +498,7 @@ def combinePkts(batchList):
else:
raise Exception('ERROR: Unknown packet type {}'.format(type(pkt)))
if not (len(localTsPkts) == 1 or len(overflowPkts) == 1) :
if not ( (len(localTsPkts) == 1 and len(overflowPkts) == 0) or (len(localTsPkts) == 0 and len(overflowPkts) == 1)) :
raise Exception('ERROR: batch does not contain exactly 1 reference packet (contains {} LocalTimestampPkt and {} OverflowPkt)!'.format(len(localTsPkts), len(overflowPkts)))
if localTsPkts:
......@@ -587,10 +596,10 @@ def timeCorrection(dfData, dfLocalTs):
print('INFO: Outlier filtering removed {:0.2f}%'.format(ratioFiltered*100.))
# print('INFO: Regression before filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeUnfiltered, interceptUnfiltered))
# print('INFO: Regression after filtering: slope={:0.20f}, intercept={:0.7f}'.format(slopeFiltered, interceptFiltered))
if ratioFiltered > 0.1:
if ratioFiltered > 0.15:
raise Exception('ERROR: Outlier filter filtered away more than 10% of all time sync points: filtered {:0.2f}%'.format(ratioFiltered*100.))
## DEBUG visualize
# # DEBUG visualize
# import matplotlib.pyplot as plt
# plt.close('all')
# # regression
......@@ -601,7 +610,7 @@ def timeCorrection(dfData, dfLocalTs):
# ax.set_xlabel('LocalTs')
# ax.set_ylabel('GlobalTs')
# ax.legend()
# residuals (before outlier filtering)
# # residuals (before outlier filtering)
# fig, ax = plt.subplots()
# ax.plot(x, residualsUnfiltered, label='Residual', c='b', marker='.')
# # ax.plot(x, pd.DataFrame(residualsUnfiltered).rolling(100, center=True, min_periods=1).mean().to_numpy(), label='Residual (moving avg)', c='orange', marker='.')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment