Commit 4b496e65 authored by matthmey's avatar matthmey

Merge branch 'develop'

parents 7b7fd6ac 35079b2c
# stuett
Disclaimer: This project is still under heavy development.
## Quickstart
You can install the package with
```
pip install stuett
```
You can find examples on how to use the package [here](https://github.com/ETHZ-TEC/permafrostanalytics/).
## Install from sources
On Linux, you can install the package with
```
pip install poetry
poetry install
......@@ -11,7 +19,17 @@ poetry install
If you want to install it into a anaconda environment do
```
conda create -n stuett python==3.7 poetry
conda create -n stuett python==3.7
conda activate stuett
pip install --pre poetry
poetry install
```
Note that there are several issues when installing on Windows
* Make sure git is installed and can be found by Anaconda! You can install git with Anaconda `conda install git curl`
* Install a Microsoft C Compiler, gcc seems to have some issues
## Versions
The versioning scheme is based on a [CalVer](https://calver.org/) using YY.MM.MICRO
\ No newline at end of file
This diff is collapsed.
[tool.poetry]
name = "stuett"
version = "0.1.0"
version = "19.11.0"
description = "Data processing framework for PermaSense project"
license = "MIT"
......@@ -25,12 +25,15 @@ dask = {extras = ["complete"], version = "^2.6.0"}
pandas = "^0.25.3"
toolz = "^0.10.0"
obspy = "^1.1.1"
numpy = "1.16.5"
appdirs = "^1.4.3"
obsplus = "^0.0.2"
obsplus = {git = "https://github.com/niosh-mining/obsplus" }
zarr = "^2.3.2"
xarray = { git = "https://github.com/niowniow/xarray.git", branch = "strided_rolling" }
pillow = "^6.2.1"
xarray-extras = "^0.4.2"
lttb = "^0.2.0"
pyarrow = "^0.15.1"
# Optional dependencies (extras)
......
......@@ -2,23 +2,4 @@ from __future__ import absolute_import
from . import data
from . import global_config
import dask
# TODO: make it a proper decorator with arguments etc
def dat(x):
""" Helper function to tranform input callable into
dask.delayed object
From low german 'stütt dat!' which means "support it!"
Arguments:
x {callable} -- Any input callable which is supported
by dask.delayed
Returns:
dask.delayed object --
"""
return dask.delayed(x)
from .convenience import *
from pandas import to_timedelta, to_timedelta
import dask
# TODO: make it a proper decorator with arguments etc
def dat(x):
""" Helper function to tranform input callable into
dask.delayed object
From low german 'stütt dat!' which means "support it!"
Arguments:
x {callable} -- Any input callable which is supported
by dask.delayed
Returns:
dask.delayed object --
"""
return dask.delayed(x)
......@@ -53,9 +53,14 @@ class Node(object):
)
return requests
@dask.delayed
def __call__(self, x, request=None):
raise NotImplementedError()
def __call__(self, data=None, request=None, delayed=False):
if delayed:
return dask.delayed(self.forward)(data, request)
else:
return self.forward(data=data, request=request)
def forward(self, data, request):
raise NotImplementedError
def get_config(self):
""" returns a dictionary of configurations to recreate the state
......@@ -66,8 +71,12 @@ class Node(object):
class StuettNode(Node): # TODO: define where this class should be (maybe not here)
def __init__(self, **kwargs):
self.config = locals().copy()
while "kwargs" in self.config and self.config["kwargs"]:
while "kwargs" in self.config:
if "kwargs" not in self.config["kwargs"]:
self.config.update(self.config["kwargs"])
break
self.config.update(self.config["kwargs"])
del self.config["kwargs"]
del self.config["self"]
......@@ -128,6 +137,8 @@ def configuration(delayed, request, keys=None, default_merge=None):
if not isinstance(delayed, list):
collections = [delayed]
else:
collections = delayed
# dsk = dask.base.collections_to_dsk(collections)
dsk, dsk_keys = dask.base._extract_graph_and_keys(collections)
......@@ -179,14 +190,20 @@ def configuration(delayed, request, keys=None, default_merge=None):
# print(f"configuring {k}",requests[k])
# set configuration for this node k
# If we create a delayed object from a class, `self` will be dsk[k][1]
if isinstance(dsk[k], tuple) and isinstance(
dsk[k][1], Node
): # Check if we get a node of type Node class
argument_is_node = None
if isinstance(dsk[k], tuple):
# check the first argument
# # TODO: make tests for all use cases and then remove for-loop
for ain in range(1):
if hasattr(dsk[k][ain], "__self__"):
if isinstance(dsk[k][ain].__self__, Node):
argument_is_node = ain
# Check if we get a node of type Node class
if argument_is_node is not None:
# current_requests = [r for r in requests[k] if r] # get all requests belonging to this node
current_requests = requests[k]
new_request = dsk[k][1].configure(
new_request = dsk[k][argument_is_node].__self__.configure(
current_requests
) # Call the class configuration function
if not isinstance(
......@@ -208,25 +225,24 @@ def configuration(delayed, request, keys=None, default_merge=None):
new_request = requests[k]
if (
"requires_request" in new_request[0]
new_request[0] is not None
and "requires_request" in new_request[0]
and new_request[0]["requires_request"] == True
):
del new_request[0]["requires_request"]
input_requests[k] = copy.deepcopy(
new_request[0]
) # TODO: check if we need a deepcopy here!
# TODO: check if we need a deepcopy here!
input_requests[k] = copy.deepcopy(new_request[0])
# update dependencies
current_deps = get_dependencies(dsk, k, as_list=True)
for i, d in enumerate(current_deps):
if d in requests:
requests[d] += new_request
remove[d] = remove[d] and (not new_request[0])
remove[d] = remove[d] and (new_request[0] is None)
else:
requests[d] = new_request
remove[d] = not new_request[
0
] # if we received an empty dictionary flag deps for removal
# if we received None
remove[d] = new_request[0] is None
# only configure each node once in a round!
if d not in new_work and d not in work: # TODO: verify this
......@@ -239,64 +255,40 @@ def configuration(delayed, request, keys=None, default_merge=None):
# Assembling the configured new graph
out = {k: dsk[k] for k in out_keys if not remove[k]}
# After we have aquired all requests we can input the required_requests as a input node to the requiring node
# we assume that the last argument is the request
for k in input_requests:
out[k] += (input_requests[k],)
# Here we assume that we always receive the same tuple of (bound method, data, request)
# If the interface changes this will break #TODO: check for all cases
if isinstance(out[k][2], tuple):
# TODO: find a better inversion of to_task_dask()
if out[k][2][0] == dict:
my_dict = {item[0]: item[1] for item in out[k][2][1]}
my_dict.update(input_requests[k])
out[k] = out[k][:2] + (my_dict,)
else:
# replace the last entry
out[k] = out[k][:2] + (input_requests[k],)
else:
# replace the last entry
out[k] = out[k][:2] + (input_requests[k],)
# convert to delayed object
from dask.delayed import Delayed
in_keys = list(flatten(keys))
# print(in_keys)
if len(in_keys) > 1:
collection = [Delayed(key=key, dsk=out) for key in in_keys]
else:
collection = Delayed(key=in_keys[0], dsk=out)
if isinstance(collection, list):
if isinstance(delayed, list):
collection = [collection]
return collection
class Freezer(Node):
def __init__(self, caching=True):
self.caching = caching
@dask.delayed
def __call__(self, x):
"""If caching is enabled load a cached result or stores the input data and returns it
Arguments:
x {xarray or dict} -- Either the xarray data to be passed through (and cached)
or request dictionary containing information about the data
to be loaded
Returns:
xarray -- Data loaded from cache or input data passed through
"""
if isinstance(x, dict):
if self.is_cached(x) and self.caching:
# TODO: load from cache and return it
pass
elif not self.caching:
raise RuntimeError(f"If caching is disabled cannot perform request {x}")
else:
raise RuntimeError(
f"Result is not cached but cached result is requested with {x}"
)
if self.caching:
# TODO: store the input data
pass
return x
def configure(self, requests):
if self.caching:
return [{}]
return config_conflict(requests)
def optimize_freeze(dsk, keys, request_key="request"):
""" Return new dask with tasks removed which are unnecessary because a later stage
reads from cache
......
from .management import *
from .processing import *
# from .collection import *
from .collection import *
import numpy as np
import pandas as pd
import warnings
import datetime as dt
from .management import DataSource
from ..core import configuration
class DataCollector(DataSource):
def __init__(self, data_paths=[], granularities=[]):
"""Add and choose data path according to its granularity.
The data collector returns the data path given an index segment (index_end - index_start).
The index segment is compared against the given granularities and the mapped data path is
returned. For example, for a time series where the index is a datetime object, the timedelta
of (end_time - start_time) is compared against the given list of granularity timedeltas.
Keyword Arguments:
datapaths {list} -- a list of data paths, e.g. the leafs of a dask graph (default: {[]})
granularities {list} -- a list of sorted granularities (default: {[]})
"""
super().__init__()
self.data_paths = data_paths
self.granularities = granularities
if len(self.data_paths) != len(self.granularities):
raise ValueError(
"Each granularity is supposed to have its corresponding data manager"
)
if len(self.granularities) > 1 and not self.is_sorted(self.granularities):
raise ValueError("Granularities should be sorted")
def forward(self, data=None, request=None):
if len(self.data_paths) != len(self.granularities):
raise ValueError(
"Each granularity is supposed to have its corresponding data manager"
)
if len(self.granularities) > 1 and not self.is_sorted(self.granularities):
raise ValueError("Granularities should be sorted")
# TODO: change to generic indices or slices
granularity = request["end_time"] - request["start_time"]
data_path = None
for i in range(len(self.granularities)):
print(i, granularity, "<", self.granularities[i], self.data_paths[i])
if granularity < self.granularities[i]:
data_path = self.data_paths[i]
break
if data_path is None:
raise AttributeError("No data manager can be used for this timeframe")
return data_path
def is_sorted(self, l):
"""Check whether a list is sorted
Arguments:
l {list} -- the list to be determined whether sorted
Returns:
[bool] -- if the list is sorted, return true
"""
return all(a <= b for a, b in zip(l, l[1:]))
This diff is collapsed.
......@@ -4,37 +4,223 @@ from ..core.graph import StuettNode
import dask
import numpy as np
import xarray as xr
import scipy.signal
import pandas as pd
import lttb
class MinMaxDownsampling(StuettNode):
def __init__(self, rate=1):
def __init__(self, rate=1, dim=None):
# dim is by default the last dimension
# since we always choose two values (min and max) per bucket the
# the internal downsampling rate must be of factor two larger than
# the effective (and desired) downsampling rate
self.rate = rate * 2
self.dim = dim
@dask.delayed
def __call__(self, x):
rolling = x.rolling(time=self.rate, stride=self.rate)
def forward(self, data=None, request=None):
dim = self.dim
if dim is None:
dim = data.dims[-1]
# x_min = rolling.construct("time", stride=self.rate).min("time").dropna('time')
# x_max = rolling.construct("time", stride=self.rate).max("time").dropna('time')
# rolling = data.rolling({dim:self.rate})
# x_min = rolling.construct("buckets", stride=self.rate).min("time").rename({"buckets":"time"}).dropna('time')
# x_max = rolling.construct("buckets", stride=self.rate).max("time").rename({"buckets":"time"}).dropna('time')
x_min = rolling.min().dropna("time")
x_max = rolling.max().dropna("time")
rolling = data.rolling({dim: self.rate}, stride=self.rate)
x_min = rolling.min().dropna(dim)
x_max = rolling.max().dropna(dim)
x_ds = xr.concat(
[x_min, x_max], "time"
[x_min, x_max], dim
) # TODO: better interleave instead of concat
x_ds = x_ds.sortby("time") # TODO: try to avoid this by using interleaving
x_ds = x_ds.sortby(dim) # TODO: try to avoid this by using interleaving
return x_ds
class LTTBDownsampling(StuettNode):
def __init__(self, rate=1, dim=None):
# Based on and many thanks to https://github.com/javiljoen/lttb.py
self.rate = rate
self.dim = dim
def forward(self, data=None, request=None):
dim = self.dim
if dim is None:
dim = data.dims[-1]
n_out = data.sizes[dim] // self.rate
""" The following tries to re-implement the LTTB for xarray
There are several issues:
1. We cannot use the LTTB package
it expects the index and the data in one numpy array. Since our index is
usually in a xarray coordinate, we would need to disassemble the xarray
and then reassemble it. (As done in the current implementation).
We would potentially loose some information from the original datastructure
and we would need to assume certain datatypes for certain dimensions.
2. For multi-dimensional arrays, we run into issues since since the dimension
coordinate we want to reduce on applies to multiple axes and on each axis
LTTB chooses a different pair of (index, value). Solution would be to choose
the label for each window statically but choose the value depending on LTTB.
"""
# rolling = data.rolling({dim: self.rate},stride=self.rate)
# print(rolling)
# # print(data.sizes['time']//self.rate)
# time_bins = data[dim].rolling({dim: self.rate},stride=self.rate).construct("buckets")
# print('time',time_bins)
# data_bins = rolling.construct("buckets")
# print(data_bins)
# # return
# # rolling_dim = utils.get_temp_dimname(self.obj.dims, "_rolling_dim")
# # windows = self.construct(rolling_dim, stride=self.stride)
# # result = windows.reduce(func, dim=rolling_dim, **kwargs)
# # # Find valid windows based on count.
# # counts = self._counts()
# # return result.where(counts >= self._min_periods)
# # mean = rolling.mean()
# mean = data_bins.mean("buckets")
# mean_time = time_bins.mean("buckets")
# print(mean_time)
# out = []
# prev = None
# for i in range(data_bins.sizes[dim]-1):
# print(i)
# item = data_bins.isel({dim:i})
# time = time_bins.isel({dim:i})
# if i == 0:
# prev_data = item.isel(buckets=-1)
# prev_time = time.isel(buckets=-1)
# out.append(prev)
# continue
# a = prev
# a_time = prev_time
# bs = item
# bs_time = time
# if i < data_bins.sizes[dim]-1:
# c = mean.isel({dim:i+1})
# c_time = mean_time.isel({dim:i+1})
# else:
# c = data_bins.isel({dim:-1})
# c_time = time_bins.isel({dim:-1})
# # calculate areas of triangle
# bs_minus_a = bs - a
# a_minus_bs = a - bs
# areas = 0.5 * abs((a[0] - c[0]) * (bs_minus_a[:, 1])
# - (a_minus_bs[:, 0]) * (c[1] - a[1]))
# x_min = rolling.min().dropna("time")
# This is quite hacky and works only if the the selected dimension is a datetime
if len(data.squeeze().dims) > 1:
raise RuntimeError("Can only work with arrays of one dimension")
d = np.array([data["time"].values.astype(np.int64), data.squeeze().values]).T
small_data = lttb.downsample(d, n_out=n_out)
array = xr.DataArray(
small_data[:, 1],
dims=[dim],
coords={dim: (dim, pd.to_datetime(small_data[:, 0]))},
)
array.attrs = data.attrs
return array
class Downsampling(StuettNode):
def __init__(self):
raise NotImplementedError()
# TODO: high level downsampling node which uses one of the other downsampling
# classes depending on the user request
pass
class Spectrogram(StuettNode):
def __init__(
self,
nfft=2048,
stride=1024,
dim=None,
sampling_rate=None,
window=("tukey", 0.25),
detrend="constant",
return_onesided=True,
scaling="density",
mode="psd",
):
super().__init__(
nfft=nfft,
stride=stride,
dim=dim,
sampling_rate=sampling_rate,
window=window,
detrend=detrend,
return_onesided=return_onesided,
scaling=scaling,
mode=mode,
)
def forward(self, data=None, request=None):
config = self.config.copy() # TODO: do we need a deep copy?
if request is not None:
config.update(request)
if config["dim"] is None:
config["dim"] = data.dims[-1]
axis = data.get_axis_num(config["dim"])
if config["sampling_rate"] is None:
if "sampling_rate" not in data.attrs:
raise RuntimeError(
"Please provide a sampling_rate attribute "
"to your config or your input data"
)
else:
config["sampling_rate"] = data.attrs["sampling_rate"]
samples = data.values
noverlap = config["nfft"] - config["stride"]
freqs, spectrum_time, spectrum = scipy.signal.spectrogram(
samples,
nfft=config["nfft"],
nperseg=config["nfft"],
noverlap=noverlap,
fs=config["sampling_rate"],
axis=axis,
detrend=config["detrend"],
scaling=config["scaling"],
return_onesided=config["return_onesided"],
mode=config["mode"],
window=config["window"],
)
# TODO: check if this is what we want. it's: the earliest timestamp of input + the delta computed by scipy
ds_coords = pd.to_datetime(
pd.to_datetime(data[config["dim"]][0].values)
+ pd.to_timedelta(spectrum_time, "s")
).tz_localize(
None
) # TODO: change when xarray #3291 is fixed
# Create a new DataArray for the spectogram
dims = (
data.dims[:axis] + ("frequency",) + data.dims[axis + 1 :] + (config["dim"],)
)
coords = dict(data.coords)
coords.update({"frequency": ("frequency", freqs), config["dim"]: ds_coords})
dataarray = xr.DataArray(spectrum, dims=dims, coords=coords)
return dataarray
......@@ -8,6 +8,7 @@ import datetime as dt