To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit d7a6964d authored by matthmey's avatar matthmey
Browse files

added MH Image source

parent 39bf3d51
# stuett # stuett
\ No newline at end of file
## Quickstart
You can install the package with
```
pip install poetry
potery install
```
If you want to install it into a anaconda environment do
```
conda create -n stuett python==3.7 poetry
conda activate stuett
poetry install
```
This diff is collapsed.
...@@ -22,13 +22,15 @@ toml = "^0.9" ...@@ -22,13 +22,15 @@ toml = "^0.9"
# Dependencies with extras # Dependencies with extras
dask = {extras = ["complete"], version = "^2.6.0"} dask = {extras = ["complete"], version = "^2.6.0"}
xarray = "^0.14.0"
pandas = "^0.25.3" pandas = "^0.25.3"
toolz = "^0.10.0" toolz = "^0.10.0"
obspy = "^1.1.1" obspy = "^1.1.1"
appdirs = "^1.4.3" appdirs = "^1.4.3"
obsplus = "^0.0.2" obsplus = "^0.0.2"
zarr = "^2.3.2" zarr = "^2.3.2"
xarray = { git = "https://github.com/niowniow/xarray.git", branch = "strided_rolling" }
pillow = "^6.2.1"
xarray-extras = "^0.4.2"
# Optional dependencies (extras) # Optional dependencies (extras)
...@@ -36,3 +38,4 @@ zarr = "^2.3.2" ...@@ -36,3 +38,4 @@ zarr = "^2.3.2"
pytest = "^3.0" pytest = "^3.0"
pytest-cov = "^2.4" pytest-cov = "^2.4"
black = {version = "^19.10b0", allows-prereleases = true} black = {version = "^19.10b0", allows-prereleases = true}
flake8 = "^3.7.9"
...@@ -21,4 +21,4 @@ def dat(x): ...@@ -21,4 +21,4 @@ def dat(x):
dask.delayed object -- dask.delayed object --
""" """
return dask.delayed(x) return dask.delayed(x)
\ No newline at end of file
from .graph import * from .graph import *
\ No newline at end of file
import dask import dask
from dask.core import get_dependencies, flatten from dask.core import get_dependencies, flatten
import numpy as np import numpy as np
import copy import copy
class Node(object): class Node(object):
def __init__(self): def __init__(self):
pass pass
def configure(self,requests): def configure(self, requests):
""" Before a task graph is executed each node is configured. """ Before a task graph is executed each node is configured.
The request is propagated from the end to the beginning The request is propagated from the end to the beginning
of the DAG and each nodes "configure" routine is called. of the DAG and each nodes "configure" routine is called.
...@@ -42,16 +43,18 @@ class Node(object): ...@@ -42,16 +43,18 @@ class Node(object):
node from the task graph. node from the task graph.
""" """
if not isinstance(requests,list): if not isinstance(requests, list):
raise RuntimeError('Please provide a **list** of request') raise RuntimeError("Please provide a **list** of request")
if len(requests) > 1: if len(requests) > 1:
raise RuntimeError('Default configuration function cannot handle ' raise RuntimeError(
'multiple requests. Please provide a custom ' "Default configuration function cannot handle "
'configuration implementation') "multiple requests. Please provide a custom "
"configuration implementation"
)
return requests return requests
@dask.delayed @dask.delayed
def __call__(self,x,request=None): def __call__(self, x, request=None):
raise NotImplementedError() raise NotImplementedError()
def get_config(self): def get_config(self):
...@@ -59,8 +62,9 @@ class Node(object): ...@@ -59,8 +62,9 @@ class Node(object):
""" """
raise NotImplementedError() raise NotImplementedError()
class StuettNode(Node): # TODO: define where this class should be (maybe not here)
def configure(self,requests): class StuettNode(Node): # TODO: define where this class should be (maybe not here)
def configure(self, requests):
""" Default configure for stuett nodes """ Default configure for stuett nodes
Expects two keys per request (*start_time* and *tend*) Expects two keys per request (*start_time* and *tend*)
If multiple requests are passed, they will be merged If multiple requests are passed, they will be merged
...@@ -73,24 +77,25 @@ class StuettNode(Node): # TODO: define where this class should be (m ...@@ -73,24 +77,25 @@ class StuettNode(Node): # TODO: define where this class should be (m
Returns: Returns:
dict -- Original request or merged requests dict -- Original request or merged requests
""" """
if not isinstance(requests,list): if not isinstance(requests, list):
raise RuntimeError('Please provide a list of request') raise RuntimeError("Please provide a list of request")
# For time requests we just use the union of both time segments # For time requests we just use the union of both time segments
new_request = requests[0].copy() new_request = requests[0].copy()
key_func = {'start_time':np.minimum, 'end_time':np.maximum} key_func = {"start_time": np.minimum, "end_time": np.maximum}
for r in requests[1:]: for r in requests[1:]:
for key in ['start_time', 'end_time']: for key in ["start_time", "end_time"]:
if key in r: if key in r:
if key in new_request: if key in new_request:
new_request[key] = key_func[key](new_request[key],r[key]) new_request[key] = key_func[key](new_request[key], r[key])
else: else:
new_request[key] = r[key] new_request[key] = r[key]
return new_request return new_request
def configuration(delayed,request,keys=None,default_merge=None):
def configuration(delayed, request, keys=None, default_merge=None):
""" Configures each node of the graph by propagating the request from outputs """ Configures each node of the graph by propagating the request from outputs
to inputs. to inputs.
Each node checks if it can fulfil the request and what it needs to fulfil the request. Each node checks if it can fulfil the request and what it needs to fulfil the request.
...@@ -114,20 +119,16 @@ def configuration(delayed,request,keys=None,default_merge=None): ...@@ -114,20 +119,16 @@ def configuration(delayed,request,keys=None,default_merge=None):
dask.delayed or list -- Config-optimized delayed object or list of delayed objects dask.delayed or list -- Config-optimized delayed object or list of delayed objects
""" """
if not isinstance(delayed, list):
if not isinstance(delayed,list):
collections = [delayed] collections = [delayed]
# dsk = dask.base.collections_to_dsk(collections) # dsk = dask.base.collections_to_dsk(collections)
dsk, dsk_keys = dask.base._extract_graph_and_keys(collections) dsk, dsk_keys = dask.base._extract_graph_and_keys(collections)
dependencies,dependants = dask.core.get_deps(dsk) dependencies, dependants = dask.core.get_deps(dsk)
if keys is None: if keys is None:
keys = dsk_keys keys = dsk_keys
print('dsk',dsk.layers)
# print('keys',keys)
if not isinstance(keys, (list, set)): if not isinstance(keys, (list, set)):
keys = [keys] keys = [keys]
out_keys = [] out_keys = []
...@@ -135,17 +136,19 @@ def configuration(delayed,request,keys=None,default_merge=None): ...@@ -135,17 +136,19 @@ def configuration(delayed,request,keys=None,default_merge=None):
work = list(set(flatten(keys))) work = list(set(flatten(keys)))
if isinstance(request,list): if isinstance(request, list):
if len(request) != len(work): if len(request) != len(work):
raise RuntimeError("When passing multiple request items " raise RuntimeError(
"The number of request items must be same " "When passing multiple request items "
"as the number of keys") "The number of request items must be same "
"as the number of keys"
requests = {work[i]: [request[i]] for i in range(len(request)) } )
requests = {work[i]: [request[i]] for i in range(len(request))}
else: else:
requests = {k: [request] for k in work } requests = {k: [request] for k in work}
remove = {k:False for k in work} remove = {k: False for k in work}
input_requests = {} input_requests = {}
while work: while work:
new_work = [] new_work = []
...@@ -159,7 +162,11 @@ def configuration(delayed,request,keys=None,default_merge=None): ...@@ -159,7 +162,11 @@ def configuration(delayed,request,keys=None,default_merge=None):
# check if we have collected all dependencies so far # check if we have collected all dependencies so far
# we will come back to this node another time # we will come back to this node another time
# TODO: make a better check for the case when dependants[k] is a set, also: why is it a set in the first place..? # TODO: make a better check for the case when dependants[k] is a set, also: why is it a set in the first place..?
if k in dependants and len(dependants[k]) != len(requests[k]) and not isinstance(dependants[k],set): if (
k in dependants
and len(dependants[k]) != len(requests[k])
and not isinstance(dependants[k], set)
):
# print(f'Waiting at {k}', dependants[k], requests[k]) # print(f'Waiting at {k}', dependants[k], requests[k])
continue continue
...@@ -167,27 +174,40 @@ def configuration(delayed,request,keys=None,default_merge=None): ...@@ -167,27 +174,40 @@ def configuration(delayed,request,keys=None,default_merge=None):
# set configuration for this node k # set configuration for this node k
# If we create a delayed object from a class, `self` will be dsk[k][1] # If we create a delayed object from a class, `self` will be dsk[k][1]
if isinstance(dsk[k],tuple) and isinstance(dsk[k][1],Node): # Check if we get a node of type Node class if isinstance(dsk[k], tuple) and isinstance(
dsk[k][1], Node
): # Check if we get a node of type Node class
# current_requests = [r for r in requests[k] if r] # get all requests belonging to this node # current_requests = [r for r in requests[k] if r] # get all requests belonging to this node
current_requests = requests[k] current_requests = requests[k]
new_request = dsk[k][1].configure(current_requests) # Call the class configuration function new_request = dsk[k][1].configure(
if not isinstance(new_request,list): # prepare the request return value current_requests
) # Call the class configuration function
if not isinstance(
new_request, list
): # prepare the request return value
new_request = [new_request] new_request = [new_request]
else: # We didn't get a Node class so there is no else: # We didn't get a Node class so there is no
# custom configuration function: pass through # custom configuration function: pass through
if len(requests[k]) > 1: if len(requests[k]) > 1:
if callable(default_merge): if callable(default_merge):
new_request = default_merge(requests[k]) new_request = default_merge(requests[k])
else: else:
raise RuntimeError("No valid default merger supplied. Cannot merge requests. " raise RuntimeError(
"Either convert your function to a class Node or provide " "No valid default merger supplied. Cannot merge requests. "
"a default merger") "Either convert your function to a class Node or provide "
"a default merger"
)
else: else:
new_request = requests[k] new_request = requests[k]
if 'requires_request' in new_request[0] and new_request[0]['requires_request'] == True: if (
del new_request[0]['requires_request'] "requires_request" in new_request[0]
input_requests[k] = copy.deepcopy(new_request[0]) #TODO: check if we need a deepcopy here! and new_request[0]["requires_request"] == True
):
del new_request[0]["requires_request"]
input_requests[k] = copy.deepcopy(
new_request[0]
) # TODO: check if we need a deepcopy here!
# update dependencies # update dependencies
current_deps = get_dependencies(dsk, k, as_list=True) current_deps = get_dependencies(dsk, k, as_list=True)
...@@ -197,12 +217,16 @@ def configuration(delayed,request,keys=None,default_merge=None): ...@@ -197,12 +217,16 @@ def configuration(delayed,request,keys=None,default_merge=None):
remove[d] = remove[d] and (not new_request[0]) remove[d] = remove[d] and (not new_request[0])
else: else:
requests[d] = new_request requests[d] = new_request
remove[d] = (not new_request[0]) # if we received an empty dictionary flag deps for removal remove[d] = not new_request[
0
] # if we received an empty dictionary flag deps for removal
# only configure each node once in a round! # only configure each node once in a round!
if d not in new_work and d not in work: # TODO: verify this if d not in new_work and d not in work: # TODO: verify this
new_work.append(d) # TODO: Do we need to configure dependency if we'll remove it? new_work.append(
d
) # TODO: Do we need to configure dependency if we'll remove it?
work = new_work work = new_work
# Assembling the configured new graph # Assembling the configured new graph
...@@ -211,25 +235,23 @@ def configuration(delayed,request,keys=None,default_merge=None): ...@@ -211,25 +235,23 @@ def configuration(delayed,request,keys=None,default_merge=None):
for k in input_requests: for k in input_requests:
out[k] += (input_requests[k],) out[k] += (input_requests[k],)
# convert to delayed object # convert to delayed object
from dask.delayed import Delayed from dask.delayed import Delayed
in_keys = list(flatten(keys)) in_keys = list(flatten(keys))
print(in_keys) # print(in_keys)
if len(in_keys) > 1: if len(in_keys) > 1:
collection = [Delayed(key=key,dsk=out) for key in in_keys] collection = [Delayed(key=key, dsk=out) for key in in_keys]
else: else:
collection = Delayed(key=in_keys[0],dsk=out) collection = Delayed(key=in_keys[0], dsk=out)
if isinstance(collection,list): if isinstance(collection, list):
collection = [collection] collection = [collection]
return collection return collection
class Freezer(Node): class Freezer(Node):
def __init__(self,caching=True): def __init__(self, caching=True):
self.caching = caching self.caching = caching
@dask.delayed @dask.delayed
...@@ -244,29 +266,31 @@ class Freezer(Node): ...@@ -244,29 +266,31 @@ class Freezer(Node):
Returns: Returns:
xarray -- Data loaded from cache or input data passed through xarray -- Data loaded from cache or input data passed through
""" """
if isinstance(x,dict): if isinstance(x, dict):
if self.is_cached(x) and self.caching: if self.is_cached(x) and self.caching:
# TODO: load from cache and return it # TODO: load from cache and return it
pass pass
elif not self.caching: elif not self.caching:
raise RuntimeError(f'If caching is disabled cannot perform request {x}') raise RuntimeError(f"If caching is disabled cannot perform request {x}")
else: else:
raise RuntimeError(f'Result is not cached but cached result is requested with {x}') raise RuntimeError(
f"Result is not cached but cached result is requested with {x}"
)
if self.caching: if self.caching:
# TODO: store the input data # TODO: store the input data
pass pass
return x return x
def configure(self,requests): def configure(self, requests):
if self.caching: if self.caching:
return [{}] return [{}]
return config_conflict(requests) return config_conflict(requests)
def optimize_freeze(dsk, keys, request_key='request'): def optimize_freeze(dsk, keys, request_key="request"):
""" Return new dask with tasks removed which are unnecessary because a later stage """ Return new dask with tasks removed which are unnecessary because a later stage
reads from cache reads from cache
``keys`` may be a single key or list of keys. ``keys`` may be a single key or list of keys.
...@@ -285,16 +309,18 @@ def optimize_freeze(dsk, keys, request_key='request'): ...@@ -285,16 +309,18 @@ def optimize_freeze(dsk, keys, request_key='request'):
seen = set() seen = set()
dependencies = dict() dependencies = dict()
if (request_key not in dsk): if request_key not in dsk:
raise RuntimeError(f"Please provide a task graph which includes '{request_key}'") raise RuntimeError(
f"Please provide a task graph which includes '{request_key}'"
)
request = dsk[request_key] request = dsk[request_key]
def is_cached(task,request): def is_cached(task, request):
if isinstance(task,tuple): if isinstance(task, tuple):
if isinstance(task[0],Freezer): if isinstance(task[0], Freezer):
return task[0].is_cached(request) return task[0].is_cached(request)
return False return False
work = list(set(flatten(keys))) work = list(set(flatten(keys)))
cached_keys = [] cached_keys = []
...@@ -303,11 +329,11 @@ def optimize_freeze(dsk, keys, request_key='request'): ...@@ -303,11 +329,11 @@ def optimize_freeze(dsk, keys, request_key='request'):
out_keys += work out_keys += work
deps = [] deps = []
for k in work: for k in work:
if is_cached(dsk[k],request): if is_cached(dsk[k], request):
cached_keys.append(k) cached_keys.append(k)
else: else:
deps.append((k, get_dependencies(dsk, k, as_list=True))) deps.append((k, get_dependencies(dsk, k, as_list=True)))
dependencies.update(deps) dependencies.update(deps)
for _, deplist in deps: for _, deplist in deps:
for d in deplist: for d in deplist:
...@@ -319,7 +345,7 @@ def optimize_freeze(dsk, keys, request_key='request'): ...@@ -319,7 +345,7 @@ def optimize_freeze(dsk, keys, request_key='request'):
out = {k: dsk[k] for k in out_keys} out = {k: dsk[k] for k in out_keys}
# finally we need to replace the input of the caching nodes with the request # finally we need to replace the input of the caching nodes with the request
cached = {k: (out[k][0],request_key) for k in cached_keys} cached = {k: (out[k][0], request_key) for k in cached_keys}
out.update(cached) out.update(cached)
return out, dependencies return out, dependencies
from .management import * from .management import *
from .processing import * from .processing import *
# from .collection import *
\ No newline at end of file # from .collection import *
This diff is collapsed.
...@@ -2,35 +2,39 @@ from ..global_config import get_setting ...@@ -2,35 +2,39 @@ from ..global_config import get_setting
from ..core.graph import StuettNode from ..core.graph import StuettNode
import dask import dask
import numpy as np import numpy as np
import xarray as xr import xarray as xr
class MinMaxDownsampling(StuettNode): class MinMaxDownsampling(StuettNode):
def __init__(self,rate=1): def __init__(self, rate=1):
# since we always choose two values (min and max) per bucket the # since we always choose two values (min and max) per bucket the
# the internal downsampling rate must be of factor two larger than # the internal downsampling rate must be of factor two larger than
# the effective (and desired) downsampling rate # the effective (and desired) downsampling rate
self.rate = rate*2 self.rate = rate * 2
@dask.delayed @dask.delayed
def __call__(self,x): def __call__(self, x):
rolling = x.rolling(time=self.rate,stride=self.rate) rolling = x.rolling(time=self.rate, stride=self.rate)
x_min = rolling.min().dropna('time')
x_max = rolling.max().dropna('time')
x_ds = xr.concat([x_min,x_max],'time') # TODO: better interleave instead of concat # x_min = rolling.construct("time", stride=self.rate).min("time").dropna('time')
# x_max = rolling.construct("time", stride=self.rate).max("time").dropna('time')
x_ds = x_ds.sortby('time') # TODO: try to avoid this by using interleaving x_min = rolling.min().dropna("time")
x_max = rolling.max().dropna("time")
return x_ds x_ds = xr.concat(
[x_min, x_max], "time"
) # TODO: better interleave instead of concat
x_ds = x_ds.sortby("time") # TODO: try to avoid this by using interleaving
return x_ds
class Downsampling(StuettNode): class Downsampling(StuettNode):
def __init__(self): def __init__(self):
raise NotImplementedError() raise NotImplementedError()
# TODO: high level downsampling node which uses one of the other downsampling # TODO: high level downsampling node which uses one of the other downsampling
# classes depending on the user request # classes depending on the user request
pass pass
\ No newline at end of file
import yaml import yaml
from os.path import dirname, abspath, join from os.path import dirname, abspath, join
import os import os
import appdirs import appdirs
import warnings import warnings
import datetime as dt