To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit b24aef98 authored by matthmey's avatar matthmey
Browse files

added minmaxdownsampling

parent 930f2d7c
from .management import *
# from .processing import *
from .processing import *
# from .collection import *
\ No newline at end of file
......@@ -39,21 +39,47 @@ class Freezer(StuettNode):
def __init__(self):
pass
def configure(self,x=None, requests=None):
""" Default configure for DataSource nodes
Same as configure from StuettNode but adds is_source flag
def configure(self,requests):
"""
Arguments:
request {list} -- List of requests
Returns:
dict -- Original request or merged requests
dict -- Original, updated or merged request(s)
"""
requests = super().configure(requests)
requests['requires_request'] = True #TODO: only if data partially available
#TODO: check if data is available for requested time period
#TODO: check how we need to update the boundaries such that we get data that fits and that is available
#TODO: with only one start/end_time it might be inefficient for the case where [unavailable,available,unavailable] since we need to load everything
# one option could be to duplicate the graph by returning multiple requests...
#TODO: make a distinction between requested start_time and freeze_output_start_time
#TODO: add node specific hash to freeze_output_start_time (there might be multiple in the graph) <- probably not necessart becaue we receive a copy of the request which is unique to this node
#TODO: maybe the configuration method must add (and delete) the node name in the request?
# we always require a request to crop out the right time period
requests['requires_request'] = True
return requests
@dask.delayed
def __call__(self,x=None, requests=None):
#TODO: check requests if none
if requests is None:
#bypass
#TODO: maybe a warning
return x
#TODO: check request start_time and load the data which is available, store the data which is not available
#TODO: crop
class SeismicSource(DataSource):
......@@ -99,18 +125,18 @@ class SeismicSource(DataSource):
x = obspy_to_array(st)
print(x)
# print(x)
starttime = obspy.UTCDateTime(x.starttime.values).datetime
starttime = pd.to_datetime(starttime,utc=True)
print('starttime',starttime)
print(x['time'].values)
# print('starttime',starttime)
# print(x['time'].values)
timedeltas = pd.to_timedelta(x['time'].values,unit='seconds')
print('timedeltas',timedeltas)
print(type(starttime),type(timedeltas))
# print('timedeltas',timedeltas)
# print(type(starttime),type(timedeltas))
xt = starttime + timedeltas
print(type(xt),xt)
# print(type(xt),xt)
x['time'] = pd.to_datetime(xt,utc=True)
print('xtime',x['time'])
# print('xtime',x['time'])
return x
......
from ..global_config import get_setting
from ..core.graph import StuettNode
import dask
import numpy as np
import xarray as xr
class MinMaxDownsampling(StuettNode):
def __init__(self,rate=1):
# since we always choose two values (min and max) per bucket the
# the internal downsampling rate must be of factor two larger than
# the effective (and desired) downsampling rate
self.rate = rate*2
@dask.delayed
def __call__(self,x):
rolling = x.rolling(time=self.rate,stride=self.rate)
x_min = rolling.min().dropna('time')
x_max = rolling.max().dropna('time')
x_ds = xr.concat([x_min,x_max],'time') # TODO: better interleave instead of concat
x_ds = x_ds.sortby('time') # TODO: try to avoid this by using interleaving
return x_ds
class Downsampling(StuettNode):
def __init__(self):
raise NotImplementedError()
# TODO: high level downsampling node which uses one of the other downsampling
# classes depending on the user request
pass
\ No newline at end of file
......@@ -8,6 +8,7 @@ import datetime as dt
# initialize global settings with certain default values
_GLOBAL_CONFIG_DICT = {
"permasense_server": "http://data.permasense.ch/",
"reference_time": dt.datetime(2000,1,1,0,0,0)
}
......
......@@ -9,8 +9,8 @@ stations = ['MH36', 'MH44', 'MH48', 'MH52', 'MH54']
start_time = dt.datetime(2019,7,14,7,7,0,tzinfo=dt.timezone.utc)
end_time = dt.datetime(2019,7,14,7,7,7,tzinfo=dt.timezone.utc)
# start_time = dt.datetime(2017,7,14,7,7,0,tzinfo=dt.timezone.utc)
# end_time = dt.datetime(2017,7,14,7,7,7,tzinfo=dt.timezone.utc)
start_time = dt.datetime(2017,7,14,7,7,0,tzinfo=dt.timezone.utc)
end_time = dt.datetime(2017,7,14,7,7,7,tzinfo=dt.timezone.utc)
offset = dt.timedelta(days=1)
config = {'channel':channels[0],
......@@ -18,3 +18,4 @@ config = {'channel':channels[0],
'start_time':start_time,
'end_time':end_time}
#TODO: use @pytest.mark.parametrize
\ No newline at end of file
import stuett
import datetime as dt
import xarray as xr
import numpy as np
import pandas as pd
from tests.stuett.sample_data import *
import pytest
class TestMinMaxDownsampling(object):
def test_minmax(self):
minmax_default = stuett.data.MinMaxDownsampling()
minmax_rate2 = stuett.data.MinMaxDownsampling(2)
np.random.seed(123)
da = xr.DataArray(np.random.rand(9, 2),
[('time', pd.date_range('2000-01-01', periods=9)),
('channel', ['EHE','EHZ'])])
x = minmax_rate2(da)
x = x.compute()
print(da)
print(x)
# TODO: proper test
def test_seismic(self):
seismic_source = stuett.data.SeismicSource()
minmax_rate2 = stuett.data.MinMaxDownsampling(2)
x = seismic_source(config)
x = minmax_rate2(x)
x.compute()
# TODO: proper test
TestMinMaxDownsampling().test_minmax()
# TestMinMaxDownsampling().test_seismic()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment