'''MIT License Copyright (c) 2019, Swiss Federal Institute of Technology (ETH Zurich), Matthias Meyer Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.''' from ..global_config import get_setting from ..core.graph import StuettNode import dask import numpy as np import xarray as xr import scipy.signal import pandas as pd import lttb class MinMaxDownsampling(StuettNode): def __init__(self, rate=1, dim=None): # dim is by default the last dimension # since we always choose two values (min and max) per bucket the # the internal downsampling rate must be of factor two larger than # the effective (and desired) downsampling rate self.rate = rate * 2 self.dim = dim def forward(self, data=None, request=None): dim = self.dim if dim is None: dim = data.dims[-1] # rolling = data.rolling({dim:self.rate}) # x_min = rolling.construct("buckets", stride=self.rate).min("time").rename({"buckets":"time"}).dropna('time') # x_max = rolling.construct("buckets", stride=self.rate).max("time").rename({"buckets":"time"}).dropna('time') rolling = data.rolling({dim: self.rate}, stride=self.rate) x_min = rolling.min().dropna(dim) x_max = rolling.max().dropna(dim) x_ds = xr.concat( [x_min, x_max], dim ) # TODO: better interleave instead of concat x_ds = x_ds.sortby(dim) # TODO: try to avoid this by using interleaving return x_ds class LTTBDownsampling(StuettNode): def __init__(self, rate=1, dim=None): # Based on and many thanks to https://github.com/javiljoen/lttb.py self.rate = rate self.dim = dim def forward(self, data=None, request=None): dim = self.dim if dim is None: dim = data.dims[-1] n_out = data.sizes[dim] // self.rate """ The following tries to re-implement the LTTB for xarray There are several issues: 1. We cannot use the LTTB package it expects the index and the data in one numpy array. Since our index is usually in a xarray coordinate, we would need to disassemble the xarray and then reassemble it. (As done in the current implementation). We would potentially loose some information from the original datastructure and we would need to assume certain datatypes for certain dimensions. 2. For multi-dimensional arrays, we run into issues since since the dimension coordinate we want to reduce on applies to multiple axes and on each axis LTTB chooses a different pair of (index, value). Solution would be to choose the label for each window statically but choose the value depending on LTTB. """ # rolling = data.rolling({dim: self.rate},stride=self.rate) # print(rolling) # # print(data.sizes['time']//self.rate) # time_bins = data[dim].rolling({dim: self.rate},stride=self.rate).construct("buckets") # print('time',time_bins) # data_bins = rolling.construct("buckets") # print(data_bins) # # return # # rolling_dim = utils.get_temp_dimname(self.obj.dims, "_rolling_dim") # # windows = self.construct(rolling_dim, stride=self.stride) # # result = windows.reduce(func, dim=rolling_dim, **kwargs) # # # Find valid windows based on count. # # counts = self._counts() # # return result.where(counts >= self._min_periods) # # mean = rolling.mean() # mean = data_bins.mean("buckets") # mean_time = time_bins.mean("buckets") # print(mean_time) # out = [] # prev = None # for i in range(data_bins.sizes[dim]-1): # print(i) # item = data_bins.isel({dim:i}) # time = time_bins.isel({dim:i}) # if i == 0: # prev_data = item.isel(buckets=-1) # prev_time = time.isel(buckets=-1) # out.append(prev) # continue # a = prev # a_time = prev_time # bs = item # bs_time = time # if i < data_bins.sizes[dim]-1: # c = mean.isel({dim:i+1}) # c_time = mean_time.isel({dim:i+1}) # else: # c = data_bins.isel({dim:-1}) # c_time = time_bins.isel({dim:-1}) # # calculate areas of triangle # bs_minus_a = bs - a # a_minus_bs = a - bs # areas = 0.5 * abs((a[0] - c[0]) * (bs_minus_a[:, 1]) # - (a_minus_bs[:, 0]) * (c[1] - a[1])) # x_min = rolling.min().dropna("time") # This is quite hacky and works only if the the selected dimension is a datetime if len(data.squeeze().dims) > 1: raise RuntimeError("Can only work with arrays of one dimension") d = np.array([data["time"].values.astype(np.int64), data.squeeze().values]).T small_data = lttb.downsample(d, n_out=n_out) array = xr.DataArray( small_data[:, 1], dims=[dim], coords={dim: (dim, pd.to_datetime(small_data[:, 0]))}, ) array.attrs = data.attrs return array class Downsampling(StuettNode): def __init__(self): raise NotImplementedError() # TODO: high level downsampling node which uses one of the other downsampling # classes depending on the user request pass class Spectrogram(StuettNode): def __init__( self, nfft=2048, stride=1024, dim=None, sampling_rate=None, window=("tukey", 0.25), detrend="constant", return_onesided=True, scaling="density", mode="psd", ): super().__init__( nfft=nfft, stride=stride, dim=dim, sampling_rate=sampling_rate, window=window, detrend=detrend, return_onesided=return_onesided, scaling=scaling, mode=mode, ) def forward(self, data=None, request=None): config = self.config.copy() # TODO: do we need a deep copy? if request is not None: config.update(request) if config["dim"] is None: config["dim"] = data.dims[-1] axis = data.get_axis_num(config["dim"]) if config["sampling_rate"] is None: if "sampling_rate" not in data.attrs: raise RuntimeError( "Please provide a sampling_rate attribute " "to your config or your input data" ) else: config["sampling_rate"] = data.attrs["sampling_rate"] samples = data.values noverlap = config["nfft"] - config["stride"] freqs, spectrum_time, spectrum = scipy.signal.spectrogram( samples, nfft=config["nfft"], nperseg=config["nfft"], noverlap=noverlap, fs=config["sampling_rate"], axis=axis, detrend=config["detrend"], scaling=config["scaling"], return_onesided=config["return_onesided"], mode=config["mode"], window=config["window"], ) # TODO: check if this is what we want. it's: the earliest timestamp of input + the delta computed by scipy ds_coords = pd.to_datetime( pd.to_datetime(data[config["dim"]][0].values) + pd.to_timedelta(spectrum_time, "s") ).tz_localize( None ) # TODO: change when xarray #3291 is fixed # Create a new DataArray for the spectogram dims = ( data.dims[:axis] + ("frequency",) + data.dims[axis + 1 :] + (config["dim"],) ) coords = dict(data.coords) coords.update({"frequency": ("frequency", freqs), config["dim"]: ds_coords}) dataarray = xr.DataArray(spectrum, dims=dims, coords=coords) return dataarray