To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit d80db24e authored by matthmey's avatar matthmey
Browse files

modified lttb downsampling

parent 623f576b
......@@ -531,14 +531,15 @@ class SeismicSource(DataSource):
filename = "/".join(filenames[channel])
st = obspy.read(io.BytesIO(store[str(filename)]))
else:
if not Path(path).isdir():
# TODO: rewrite to only use store
datadir = Path(path)
if not datadir.is_dir():
# TODO: should this be an error or only a warning. In a period execution this could stop the whole script
raise IOError(
"Cannot find the path {}. Please provide a correct path to the permasense geophone directory".format(
datadir
)
)
datadir = Path(path)
if not datadir.joinpath(*filenames[channel]).exists():
non_existing_files_ts.append(timerange[i])
......
......@@ -74,9 +74,6 @@ class LTTBDownsampling(StuettNode):
dim = self.dim
if dim is None:
dim = data.dims[-1]
n_out = data.sizes[dim] // self.rate
""" The following tries to re-implement the LTTB for xarray
There are several issues:
1. We cannot use the LTTB package
......@@ -88,76 +85,134 @@ class LTTBDownsampling(StuettNode):
2. For multi-dimensional arrays, we run into issues since since the dimension
coordinate we want to reduce on applies to multiple axes and on each axis
LTTB chooses a different pair of (index, value). Solution would be to choose
the label for each window statically but choose the value depending on LTTB.
the index for each window statically but choose the value depending on LTTB.
"""
# rolling = data.rolling({dim: self.rate},stride=self.rate)
# print(rolling)
# # print(data.sizes['time']//self.rate)
# time_bins = data[dim].rolling({dim: self.rate},stride=self.rate).construct("buckets")
# print('time',time_bins)
# data_bins = rolling.construct("buckets")
# print(data_bins)
# # return
# # rolling_dim = utils.get_temp_dimname(self.obj.dims, "_rolling_dim")
# # windows = self.construct(rolling_dim, stride=self.stride)
# # result = windows.reduce(func, dim=rolling_dim, **kwargs)
# # # Find valid windows based on count.
# # counts = self._counts()
# # return result.where(counts >= self._min_periods)
# # mean = rolling.mean()
# mean = data_bins.mean("buckets")
# mean_time = time_bins.mean("buckets")
# print(mean_time)
# out = []
# prev = None
# for i in range(data_bins.sizes[dim]-1):
# print(i)
# item = data_bins.isel({dim:i})
# time = time_bins.isel({dim:i})
# if i == 0:
# prev_data = item.isel(buckets=-1)
# prev_time = time.isel(buckets=-1)
# out.append(prev)
# continue
# a = prev
# a_time = prev_time
# bs = item
# bs_time = time
# if i < data_bins.sizes[dim]-1:
# c = mean.isel({dim:i+1})
# c_time = mean_time.isel({dim:i+1})
# else:
# c = data_bins.isel({dim:-1})
# c_time = time_bins.isel({dim:-1})
# # calculate areas of triangle
# bs_minus_a = bs - a
# a_minus_bs = a - bs
# areas = 0.5 * abs((a[0] - c[0]) * (bs_minus_a[:, 1])
# - (a_minus_bs[:, 0]) * (c[1] - a[1]))
# x_min = rolling.min().dropna("time")
# This is quite hacky and works only if the the selected dimension is a datetime
if len(data.squeeze().dims) > 1:
raise RuntimeError("Can only work with arrays of one dimension")
d = np.array([data["time"].values.astype(np.int64), data.squeeze().values]).T
small_data = lttb.downsample(d, n_out=n_out)
array = xr.DataArray(
small_data[:, 1],
dims=[dim],
coords={dim: (dim, pd.to_datetime(small_data[:, 0]))},
)
array.attrs = data.attrs
""" Note: The lttb implementation differs from our implementation
due to different bucket generation. lttb uses numpy.array_split
which subdivides into buckets of varying length ('almost' equal) and does not drop
any values
we use a rolling window which subdivides into equal lengths
but drops values at the end
form numpy docs for array_split:
For an array of length l that should be split into n sections, it returns l % n sub-arrays of size l//n + 1 and the rest of size l//n.
"""
# adapt downsampling rate
n_out = data.sizes[dim] / self.rate
# print('n_out',n_out)
rate = np.ceil(data.sizes[dim] / (n_out-2)).astype(np.int)
# print(rate)
# rate = data.sizes[dim] // (n_out-2)
# rate = self.rate
end = data.isel({dim:-1})
data = data.isel({dim:slice(0,-1)})
rolling = data.rolling({dim: rate},stride=rate)
# print(rolling)
# print(data.sizes['time']//self.rate)
index_bins = data[dim].rolling({dim: rate},stride=rate).construct("buckets")
# print('index',index_bins)
value_bins = rolling.construct("buckets")
# print('value',value_bins)
# return
# rolling_dim = utils.get_temp_dimname(self.obj.dims, "_rolling_dim")
# windows = self.construct(rolling_dim, stride=self.stride)
# result = windows.reduce(func, dim=rolling_dim, **kwargs)
# # Find valid windows based on count.
# counts = self._counts()
# return result.where(counts >= self._min_periods)
# value_mean = rolling.mean()
value_mean = value_bins.mean("buckets")
index_mean = index_bins.mean("buckets")
# print(index_mean)
out = []
prev = None
argmaxes = []
for i in range(value_bins.sizes[dim]):
current_value = value_bins.isel({dim:i})
current_index = index_bins.isel({dim:i})
# print(i,current_value.sizes['buckets'])
# print('cv',current_value)
if i == 0:
# the first bucket consists of NaN and one entry
# We choose the one entry and continue
prev_value = current_value.isel(buckets=-1)
prev_index = current_index.isel(buckets=-1)
argmaxes.append(prev_value)
continue
# prev_data/time contains only one entry on `dim`-axis
a = prev_value
a_time = prev_index
# current_data/time contains multiple entries on `dim`-axis
bs = current_value
bs_time = current_index
if i < value_bins.sizes[dim]-1:
next_value = value_mean.isel({dim:i+1})
next_index = index_mean.isel({dim:i+1})
else:
next_value = end
next_index = end[dim]
# calculate areas of triangle
bs_minus_a = bs - a
c_minus_p_value = current_value - prev_value
c_minus_p_index = current_index - prev_index
a_minus_bs = a - bs
p_minus_c_value = prev_value - current_value
p_minus_c_index = prev_index - current_index
P_i, P_v = prev_index.astype(np.float), prev_value
C_i, C_v = current_index.astype(np.float), current_value
N_i, N_v = next_index.astype(np.float), next_value
# print('P',P_i,P_v)
# print('C',C_i,C_v)
# print('N',N_i,N_v)
# points P (prev), C (current), N (next)
# Next is th emean
# area = 0.5 * abs(P_i * (C_v - N_v) + C_i * (N_v - P_v) + N_i * (P_v - C_v))
areas = 0.5 * abs(P_i * (C_v - N_v) + C_i * (N_v - P_v) + N_i * (P_v - C_v))
# print(areas)
# print(current_value)
# print(current_value.shape)
# print(areas.shape)
# axis_num = current_value.get_axis_num('buckets')
arg = areas.argmax('buckets')
# arg = areas.argmax()
# print('c',current_value)
# print('oi',arg)
c = current_value.isel({'buckets':arg})
# print(c)
argmaxes.append(c)
argmaxes.append(end)
array = xr.concat(argmaxes,'time')
# print(array)
use_pkg = False
if use_pkg:
# This is quite hacky and works only if the the selected dimension is a datetime
if len(data.squeeze().dims) > 1:
raise RuntimeError("Can only work with arrays of one dimension")
d = np.array([data["time"].values.astype(np.int64), data.squeeze().values]).T
small_data = lttb.downsample(d, n_out=n_out)
array = xr.DataArray(
small_data[:, 1],
dims=[dim],
coords={dim: (dim, pd.to_datetime(small_data[:, 0]))},
)
array.attrs = data.attrs
return array
......
......@@ -88,17 +88,48 @@ def test_spectrogram():
def test_lttbdownsampling():
lttb_node = stuett.data.LTTBDownsampling(rate=2, dim="time")
import lttb
periods = 14
rate = 2
channels = ["EHE","EHZ"]
# channels = ["EHE"]
lttb_node = stuett.data.LTTBDownsampling(rate=rate, dim="time")
np.random.seed(123)
da = xr.DataArray(
np.random.rand(9, 1),
[("time", pd.date_range("2000-01-01", periods=9)), ("channel", ["EHE"]),],
np.arange(periods*len(channels)).reshape(periods, len(channels)),
# np.random.rand(periods, len(channels)),
[("time", pd.date_range("2000-01-01", periods=periods)), ("channel", channels),],
)
# generate comparison data
n_out = da.sizes['time'] // rate
values = np.zeros((len(da['time']),len(da['channel'])+1))
values[:,:-1] = da.values
values[:,-1] = da['time'].values
result_list = []
for i,c in enumerate(da['channel']):
# select only one channel and one timestamp
current = values[:,[-1,i]]
current_lttb = lttb.downsample(current,n_out)
result_list.append(current_lttb)
x = lttb_node(da)
print(x)
# compare channel-wise if values are correct
for i,c in enumerate(da['channel']):
# we can only compare values not timestamps
print(x.values[:,i])
print(result_list[i][:,1])
# TODO: currently we cannot compare to lttb implementatino
# due to different bucket generation
# assert x.values[:,i] == result_list[i][:,1]
#TODO: assert if timestamps are correct
# print(x)
test_lttbdownsampling()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment