Commit 3816abf3 authored by matthmey's avatar matthmey

added gsn and one-channel LTTB downsampling

parent b91059c5
......@@ -875,12 +875,16 @@ description = "N-D labeled arrays and datasets in Python"
name = "xarray"
optional = false
python-versions = ">=3.6"
version = "0.14.0"
version = "0.14.0+56.g684b3627"
[package.dependencies]
numpy = ">=1.14"
pandas = ">=0.24"
[package.source]
reference = "684b3627475e6120a649b9fc2d16615aaba9db61"
type = "git"
url = "https://github.com/niowniow/xarray.git"
[[package]]
category = "main"
description = "Advanced / experimental algorithms for xarray"
......@@ -1599,10 +1603,7 @@ urllib3 = [
{file = "urllib3-1.25.7-py2.py3-none-any.whl", hash = "sha256:a8a318824cc77d1fd4b2bec2ded92646630d7fe8619497b142c84a9e6f5a7293"},
{file = "urllib3-1.25.7.tar.gz", hash = "sha256:f3c5fd51747d450d4dcf6f923c81f78f811aab8205fda64b0aba34a4e48b0745"},
]
xarray = [
{file = "xarray-0.14.0-py3-none-any.whl", hash = "sha256:9a4f97c6a7fdf9a6dd873ac679a86abfa1910263a85774d69bc3c0fa1e7967f5"},
{file = "xarray-0.14.0.tar.gz", hash = "sha256:a8b93e1b0af27fa7de199a2d36933f1f5acc9854783646b0f1b37fed9b4da091"},
]
xarray = []
xarray-extras = [
{file = "xarray_extras-0.4.2.tar.gz", hash = "sha256:f458174d2adc66a947dbee6929242f56c1afa5a1c507b982d5ec4f4ee7e31e69"},
]
......
......@@ -65,6 +65,140 @@ class DataSource(StuettNode):
return requests
class GSNDataSource(DataSource):
def __init__(
self, deployment=None, vsensor=None, position=None, start_time=None, end_time=None, **kwargs
):
super().__init__(deployment=deployment, position=position, vsensor=vsensor, start_time=start_time, end_time=end_time, kwargs=kwargs)
def forward(self, request):
#### 1 - DEFINE VSENSOR-DEPENDENT COLUMNS ####
colnames = pd.read_csv(
Path(get_setting("metadata_directory")).joinpath("vsensor_metadata/{:s}_{:s}.csv".format(
request["deployment"], request["vsensor"])
),
skiprows=0,
)
print(colnames)
columns_old = colnames["colname_old"].values
columns_new = colnames["colname_new"].values
columns_unit = colnames["unit"].values
if len(columns_old) != len(columns_new):
warnings.warn(
"WARNING: Length of 'columns_old' ({:d}) is not equal length of 'columns_new' ({:d})".format(
len(columns_old), len(columns_new)
)
)
if len(columns_old) != len(columns_unit):
warnings.warn(
"WARNING: Length of 'columns_old' ({:d}) is not equal length of 'columns_new' ({:d})".format(
len(columns_old), len(columns_unit)
)
)
unit = dict(zip(columns_new, columns_unit))
#### 2 - DEFINE CONDITIONS AND CREATE HTTP QUERY ####
# Set server
server = get_setting("permasense_server")
# Create virtual_sensor
virtual_sensor = request["deployment"] + "_" + request["vsensor"]
# Create query and add time as well as position selection
query = (
"vs[1]={:s}"
"&time_format=iso"
"&timeline=generation_time"
"&field[1]=All"
"&from={:s}"
"&to={:s}"
"&c_vs[1]={:s}"
"&c_join[1]=and"
"&c_field[1]=position"
"&c_min[1]={:02d}"
"&c_max[1]={:02d}"
).format(
virtual_sensor,
pd.to_datetime(request['start_time'],utc=True).strftime("%d/%m/%Y+%H:%M:%S"),
pd.to_datetime(request['end_time'],utc=True).strftime("%d/%m/%Y+%H:%M:%S"),
virtual_sensor,
int(request["position"]) - 1,
request["position"],
)
# query extension for images
if request["vsensor"] == "binary__mapped":
query = (
query
+ "&vs[2]={:s}&field[2]=relative_file&c_join[2]=and&c_vs[2]={:s}&c_field[2]=file_complete&c_min[2]=0&c_max[2]=1&vs[3]={:s}&field[3]=generation_time&c_join[3]=and&c_vs[3]={:s}&c_field[3]=file_size&c_min[3]=2000000&c_max[3]=%2Binf&download_format=csv".format(
virtual_sensor, virtual_sensor, virtual_sensor, virtual_sensor
)
)
# Construct url:
url = server + "multidata?" + query
# if self.verbose:
# print('The GSN http-query is:\n{:s}'.format(url))
#### 3 - ACCESS DATA AND CREATE PANDAS DATAFRAME ####
d = []
d = pd.read_csv(
url, skiprows=2
) # skip header lines (first 2) for import: skiprows=2
df = pd.DataFrame(columns=columns_new)
df.time = pd.to_datetime(d.generation_time,utc=True)
# if depo in ['mh25', 'mh27old', 'mh27new', 'mh30', 'jj22', 'jj25', 'mh-nodehealth']:
# d = d.convert_objects(convert_numeric=True) # TODO: rewrite to remove warning
for k in list(df):
df[k]=pd.to_numeric(df[k], errors='ignore')
# df = pd.DataFrame(columns=columns_new)
# df.time = timestamp2datetime(d['generation_time']/1000)
for i in range(len(columns_old)):
if columns_new[i] != "time":
setattr(df, columns_new[i], getattr(d, columns_old[i]))
df = df.sort_values(by="time")
# Remove columns with names 'delete'
try:
df.drop(["delete"], axis=1, inplace=True)
except:
pass
# Remove columns with only 'null'
df = df.replace(r"null", np.nan, regex=True)
isnull = df.isnull().all()
[df.drop([col_name], axis=1, inplace=True) for col_name in df.columns[isnull]]
df = df.set_index("time")
df = df.sort_index(axis=1)
print(df)
x = xr.DataArray(df, dims=["time", "name"], name="CSV")
try:
unit_coords = []
for name in x.coords["name"].values:
# name = re.sub(r"\[.*\]", "", name).lstrip().rstrip()
u = unit[str(name)]
u = re.findall(r"\[(.*)\]", u)[0]
# name_coords.append(name)
unit_coords.append(u)
x = x.assign_coords({"unit": ("name", unit_coords)})
except:
# TODO: add a warning or test explicitly if units exist
pass
return x
class SeismicSource(DataSource):
def __init__(
......
......@@ -10,62 +10,131 @@ import lttb
class MinMaxDownsampling(StuettNode):
def __init__(self, rate=1):
def __init__(self, rate=1, dim=None):
# dim is by default the last dimension
# since we always choose two values (min and max) per bucket the
# the internal downsampling rate must be of factor two larger than
# the effective (and desired) downsampling rate
self.rate = rate * 2
self.dim = dim
def forward(self, data=None, request=None):
rolling = data.rolling(time=self.rate, stride=self.rate)
dim = self.dim
if dim is None:
dim = data.dims[-1]
# x_min = rolling.construct("buckets", stride=self.rate).min("time").dropna('time')
# x_max = rolling.construct("buckets", stride=self.rate).max("time").dropna('time')
# rolling = data.rolling({dim:self.rate})
# x_min = rolling.construct("buckets", stride=self.rate).min("time").rename({"buckets":"time"}).dropna('time')
# x_max = rolling.construct("buckets", stride=self.rate).max("time").rename({"buckets":"time"}).dropna('time')
x_min = rolling.min().dropna("time")
x_max = rolling.max().dropna("time")
rolling = data.rolling({dim: self.rate}, stride=self.rate)
x_min = rolling.min().dropna(dim)
x_max = rolling.max().dropna(dim)
x_ds = xr.concat(
[x_min, x_max], "time"
[x_min, x_max], dim
) # TODO: better interleave instead of concat
x_ds = x_ds.sortby("time") # TODO: try to avoid this by using interleaving
x_ds = x_ds.sortby(dim) # TODO: try to avoid this by using interleaving
return x_ds
class LTTBDownsampling(StuettNode):
def __init__(self, rate=1):
def __init__(self, rate=1, dim=None):
# Based on and many thanks to https://github.com/javiljoen/lttb.py
self.rate = rate
self.dim = dim
def forward(self, data=None, request=None):
rolling = data.rolling(time=self.rate, stride=self.rate) #TODO: make this not dependend on time dimension
print(rolling)
n_out = data.sizes['time']//self.rate
# print(data.sizes['time']//self.rate)
data_bins = rolling.construct("buckets")
return
dim = self.dim
if dim is None:
dim = data.dims[-1]
n_out = data.sizes[dim] // self.rate
""" The following tries to re-implement the LTTB for xarray
There are several issues:
1. We cannot use the LTTB package
it expects the index and the data in one numpy array. Since our index is
usually in a xarray coordinate, we would need to disassemble the xarray
and then reassemble it. (As done in the current implementation).
We would potentially loose some information from the original datastructure
and we would need to assume certain datatypes for certain dimensions.
2. For multi-dimensional arrays, we run into issues since since the dimension
coordinate we want to reduce on applies to multiple axes and on each axis
LTTB chooses a different pair of (index, value). Solution would be to choose
the label for each window statically but choose the value depending on LTTB.
"""
# rolling = data.rolling({dim: self.rate},stride=self.rate)
# print(rolling)
# # print(data.sizes['time']//self.rate)
# time_bins = data[dim].rolling({dim: self.rate},stride=self.rate).construct("buckets")
# print('time',time_bins)
# data_bins = rolling.construct("buckets")
# print(data_bins)
# # return
# # rolling_dim = utils.get_temp_dimname(self.obj.dims, "_rolling_dim")
# # windows = self.construct(rolling_dim, stride=self.stride)
# # result = windows.reduce(func, dim=rolling_dim, **kwargs)
# # # Find valid windows based on count.
# # counts = self._counts()
# # return result.where(counts >= self._min_periods)
# # mean = rolling.mean()
# mean = data_bins.mean("buckets")
# mean_time = time_bins.mean("buckets")
# print(mean_time)
# out = []
# prev = None
# for i in range(data_bins.sizes[dim]-1):
# print(i)
# item = data_bins.isel({dim:i})
# time = time_bins.isel({dim:i})
# if i == 0:
# prev_data = item.isel(buckets=-1)
# prev_time = time.isel(buckets=-1)
# out.append(prev)
# continue
# a = prev
# a_time = prev_time
# bs = item
# bs_time = time
# if i < data_bins.sizes[dim]-1:
# c = mean.isel({dim:i+1})
# c_time = mean_time.isel({dim:i+1})
# else:
# c = data_bins.isel({dim:-1})
# c_time = time_bins.isel({dim:-1})
# # calculate areas of triangle
# bs_minus_a = bs - a
# a_minus_bs = a - bs
# areas = 0.5 * abs((a[0] - c[0]) * (bs_minus_a[:, 1])
# - (a_minus_bs[:, 0]) * (c[1] - a[1]))
mean = rolling.mean()
print(mean)
out = []
for i, (index, roll) in enumerate(rolling):
print(i)
if i == 0:
prev = data[0]
print(prev)
print(mean.sel(time=index))
print(roll)
# x_min = rolling.min().dropna("time")
prev = roll
# This is quite hacky and works only if the the selected dimension is a datetime
if len(data.squeeze().dims) > 1:
raise RuntimeError("Can only work with arrays of one dimension")
d = np.array([data["time"].values.astype(np.int64), data.squeeze().values]).T
small_data = lttb.downsample(d, n_out=n_out)
# x_min = rolling.min().dropna("time")
small_data = lttb.downsample(data, n_out=20)
array = xr.DataArray(
small_data[:, 1],
dims=[dim],
coords={dim: (dim, pd.to_datetime(small_data[:, 0]))},
)
return x_ds
array.attrs = data.attrs
return array
class Downsampling(StuettNode):
......@@ -110,11 +179,13 @@ class Spectrogram(StuettNode):
config["dim"] = data.dims[-1]
axis = data.get_axis_num(config["dim"])
if config["sampling_rate"] is None:
if 'sampling_rate' not in data.attrs:
raise RuntimeError("Please provide a sampling_rate attribute "
"to your config or your input data")
if "sampling_rate" not in data.attrs:
raise RuntimeError(
"Please provide a sampling_rate attribute "
"to your config or your input data"
)
else:
config["sampling_rate"] = data.attrs["sampling_rate"]
......@@ -143,13 +214,13 @@ class Spectrogram(StuettNode):
None
) # TODO: change when xarray #3291 is fixed
# Create a new DataArray for the spectogram
dims = data.dims[:axis] + ("frequency",) + data.dims[axis+1:] + (config["dim"],)
dims = (
data.dims[:axis] + ("frequency",) + data.dims[axis + 1 :] + (config["dim"],)
)
coords = dict(data.coords)
coords.update({"frequency": ("frequency", freqs), config["dim"]: ds_coords})
coords.update({"frequency": ("frequency", freqs), config["dim"]: ds_coords})
dataarray = xr.DataArray(spectrum, dims=dims, coords=coords)
return dataarray
......@@ -47,6 +47,16 @@ class TestSeismicSource(object):
# TODO: make test to compare start_times
def test_gsn():
# test_data = pd.read_csv(test_data_dir + 'matterhorn_27_temperature_rock.csv',index_col='time')
gsn_node = stuett.data.GSNDataSource(deployment="matterhorn", position=30, vsensor="temperature_rock")
x = gsn_node({"start_time":"2017-07-01","end_time":"2017-07-02"})
print(x)
test_gsn()
def test_delayed():
filename = Path(test_data_dir).joinpath(
"timeseries", "MH30_temperature_rock_2017.csv"
......@@ -219,4 +229,3 @@ def test_annotations():
# batch_dims={"time": pd.to_timedelta(10, "s")},
# )
# test_datasets()
......@@ -11,8 +11,8 @@ import pytest
class TestMinMaxDownsampling(object):
def test_minmax(self):
minmax_default = stuett.data.MinMaxDownsampling()
minmax_rate2 = stuett.data.MinMaxDownsampling(2)
minmax_default = stuett.data.MinMaxDownsampling(dim="time")
minmax_rate2 = stuett.data.MinMaxDownsampling(rate=2, dim="time")
np.random.seed(123)
da = xr.DataArray(
......@@ -25,15 +25,14 @@ class TestMinMaxDownsampling(object):
x = minmax_rate2(da)
x = x.compute()
# TODO: proper test
assert x.shape == (4, 2)
assert x.mean() == 0.46291252327532006
@pytest.mark.slow
def test_seismic(self):
seismic_source = stuett.data.SeismicSource(use_arclink=True)
minmax_rate2 = stuett.data.MinMaxDownsampling(2)
minmax_rate2 = stuett.data.MinMaxDownsampling(2, dim="time")
x = seismic_source(config)
x = minmax_rate2(x)
......@@ -47,7 +46,7 @@ class TestMinMaxDownsampling(object):
def test_spectrogram():
spectrogram = stuett.data.Spectrogram(nfft=2, stride=1, dim='time')
spectrogram = stuett.data.Spectrogram(nfft=2, stride=1, dim="time")
np.random.seed(123)
da = xr.DataArray(
......@@ -56,28 +55,22 @@ def test_spectrogram():
("time", pd.date_range("2000-01-01", periods=9)),
("channel", ["EHE", "EHZ", "EHN"]),
],
attrs={"sampling_rate":0.000011574}
attrs={"sampling_rate": 0.000011574},
)
x = spectrogram(da)
print(x)
test_spectrogram()
# test_spectrogram()
def test_lttbdownsampling():
lttb_node = stuett.data.LTTBDownsampling(rate=2)
lttb_node = stuett.data.LTTBDownsampling(rate=2, dim="time")
np.random.seed(123)
da = xr.DataArray(
np.random.rand(9, 2),
[
("time", pd.date_range("2000-01-01", periods=9)),
("channel", ["EHE", "EHZ"]),
],
np.random.rand(9, 1),
[("time", pd.date_range("2000-01-01", periods=9)), ("channel", ["EHE"]),],
)
x = lttb_node(da)
......@@ -85,4 +78,4 @@ def test_lttbdownsampling():
print(x)
# test_lttbdownsampling()
\ No newline at end of file
test_lttbdownsampling()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment