To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit 15bbcb32 authored by matthmey's avatar matthmey
Browse files

added optional dask.delayed and spectogram

parent e51d964c
......@@ -6,7 +6,7 @@
You can install the package with
```
pip install poetry
potery install
poetry install
```
If you want to install it into a anaconda environment do
......
......@@ -141,7 +141,7 @@ description = "Parallel PyData with Task Scheduling"
name = "dask"
optional = false
python-versions = ">=3.6"
version = "2.7.0"
version = "2.8.0"
[package.dependencies]
[package.dependencies.PyYaml]
......@@ -162,7 +162,7 @@ version = ">=2.0"
[package.dependencies.fsspec]
optional = true
version = ">=0.5.1"
version = ">=0.6.0"
[package.dependencies.numpy]
optional = true
......@@ -182,9 +182,9 @@ version = ">=0.7.3"
[package.extras]
array = ["numpy (>=1.13.0)", "toolz (>=0.7.3)"]
bag = ["cloudpickle (>=0.2.1)", "fsspec (>=0.5.1)", "toolz (>=0.7.3)", "partd (>=0.3.10)"]
complete = ["pyyaml", "bokeh (>=1.0.0)", "cloudpickle (>=0.2.1)", "distributed (>=2.0)", "fsspec (>=0.5.1)", "numpy (>=1.13.0)", "pandas (>=0.21.0)", "partd (>=0.3.10)", "toolz (>=0.7.3)"]
dataframe = ["numpy (>=1.13.0)", "pandas (>=0.21.0)", "toolz (>=0.7.3)", "partd (>=0.3.10)", "fsspec (>=0.5.1)"]
bag = ["cloudpickle (>=0.2.1)", "fsspec (>=0.6.0)", "toolz (>=0.7.3)", "partd (>=0.3.10)"]
complete = ["pyyaml", "bokeh (>=1.0.0)", "cloudpickle (>=0.2.1)", "distributed (>=2.0)", "fsspec (>=0.6.0)", "numpy (>=1.13.0)", "pandas (>=0.21.0)", "partd (>=0.3.10)", "toolz (>=0.7.3)"]
dataframe = ["numpy (>=1.13.0)", "pandas (>=0.21.0)", "toolz (>=0.7.3)", "partd (>=0.3.10)", "fsspec (>=0.6.0)"]
delayed = ["cloudpickle (>=0.2.1)", "toolz (>=0.7.3)"]
diagnostics = ["bokeh (>=1.0.0)"]
distributed = ["distributed (>=2.0)"]
......@@ -204,12 +204,12 @@ marker = "extra == \"complete\""
name = "distributed"
optional = false
python-versions = ">=3.6"
version = "2.7.0"
version = "2.8.0"
[package.dependencies]
click = ">=6.6"
cloudpickle = ">=0.2.2"
dask = ">=2.5.2"
dask = ">=2.7.0"
msgpack = "*"
psutil = ">=5.0"
pyyaml = "*"
......@@ -260,7 +260,7 @@ marker = "extra == \"complete\""
name = "fsspec"
optional = false
python-versions = ">=3.5"
version = "0.5.2"
version = "0.6.0"
[[package]]
category = "main"
......@@ -459,8 +459,8 @@ category = "main"
description = "NumPy is the fundamental package for array computing with Python."
name = "numpy"
optional = false
python-versions = ">=3.5"
version = "1.17.4"
python-versions = ">=2.7,!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*"
version = "1.16.5"
[[package]]
category = "main"
......@@ -864,17 +864,12 @@ description = "N-D labeled arrays and datasets in Python"
name = "xarray"
optional = false
python-versions = ">=3.6"
version = "0.14.0+55.g675b60bf"
version = "0.14.0"
[package.dependencies]
numpy = ">=1.14"
pandas = ">=0.24"
[package.source]
reference = "675b60bfe507caaad715bb8f3ff322289278ad98"
type = "git"
url = "https://github.com/niowniow/xarray.git"
[[package]]
category = "main"
description = "Advanced / experimental algorithms for xarray"
......@@ -934,7 +929,7 @@ docs = ["sphinx", "jaraco.packaging (>=3.2)", "rst.linker (>=1.9)"]
testing = ["pathlib2", "contextlib2", "unittest2"]
[metadata]
content-hash = "32baf8d99e4f420eec3193fb9d7da2541d92e08a0147c6717a8434b04e9ef1ff"
content-hash = "9a05d00236e28e47fe7ae6369984909e9cc2ec0373f77da360918d73521793be"
python-versions = "^3.7" # Compatible python versions must be declared here
[metadata.files]
......@@ -1019,16 +1014,16 @@ cycler = [
{file = "cycler-0.10.0.tar.gz", hash = "sha256:cd7b2d1018258d7247a71425e9f26463dfb444d411c39569972f4ce586b0c9d8"},
]
dask = [
{file = "dask-2.7.0-py3-none-any.whl", hash = "sha256:ce97aa2bc079bf43d43dbef9408f93f9a6287851b7f71188298e816f6ffce7dd"},
{file = "dask-2.7.0.tar.gz", hash = "sha256:0dfcb2bd7963b34fbebaa0dd393bae9a6323a6f33cf1260f3ab0ee93ba9fa89f"},
{file = "dask-2.8.0-py3-none-any.whl", hash = "sha256:d3cf6f11abafb3087337f410d34977c1a773fcae51667e74df5044884fd791f6"},
{file = "dask-2.8.0.tar.gz", hash = "sha256:000f1d8cea21e73d4691718d9224903e9ba37fbbe756c8e7d11d4067ef9e0609"},
]
decorator = [
{file = "decorator-4.4.1-py2.py3-none-any.whl", hash = "sha256:5d19b92a3c8f7f101c8dd86afd86b0f061a8ce4540ab8cd401fa2542756bce6d"},
{file = "decorator-4.4.1.tar.gz", hash = "sha256:54c38050039232e1db4ad7375cfce6748d7b41c29e95a081c8a6d2c30364a2ce"},
]
distributed = [
{file = "distributed-2.7.0-py3-none-any.whl", hash = "sha256:3bbbe31951b6bbc338f767c652add1669cdb7b4b8f486bcc51029956af19ef4b"},
{file = "distributed-2.7.0.tar.gz", hash = "sha256:f099a2b552d42913f8cf3a6f5a9e1a15fbab0354e5bd49e9da04199fea2a6c63"},
{file = "distributed-2.8.0-py3-none-any.whl", hash = "sha256:87c29bc33613b653e703d3214bd6b3a3fce677a51b8482b71173de29e6942cea"},
{file = "distributed-2.8.0.tar.gz", hash = "sha256:37f8a89bb499b7858a2396e3fdd2e5997dece543725d3791ce239d960a647710"},
]
entrypoints = [
{file = "entrypoints-0.3-py2.py3-none-any.whl", hash = "sha256:589f874b313739ad35be6e0cd7efde2a4e9b6fea91edcc34e58ecbb8dbe56d19"},
......@@ -1043,7 +1038,8 @@ flake8 = [
{file = "flake8-3.7.9.tar.gz", hash = "sha256:45681a117ecc81e870cbf1262835ae4af5e7a8b08e40b944a8a6e6b895914cfb"},
]
fsspec = [
{file = "fsspec-0.5.2.tar.gz", hash = "sha256:6531a5fa9ea6bf27a5180d225558a8a7aa5d7c3cbf7e8b146dd37ac699017937"},
{file = "fsspec-0.6.0-py3-none-any.whl", hash = "sha256:3f32b291ba9f531c2c5952730782a4eb61930ac8b8ab765e53a1fbaf9e1f1d8d"},
{file = "fsspec-0.6.0.tar.gz", hash = "sha256:5108f9192b7b2c6a03e69d5084d5fc88c05d4312724a38efce37c9f3a6d360fa"},
]
future = [
{file = "future-0.18.2.tar.gz", hash = "sha256:b1bead90b70cf6ec3f0710ae53a525360fa360d306a86583adc6bf83a4db537d"},
......@@ -1290,27 +1286,29 @@ numexpr = [
{file = "numexpr-2.7.0.tar.gz", hash = "sha256:37324b5981b8962102bdc8640c4f05f5589da5d1df2702418783085cb78ca217"},
]
numpy = [
{file = "numpy-1.17.4-cp35-cp35m-macosx_10_6_intel.whl", hash = "sha256:ede47b98de79565fcd7f2decb475e2dcc85ee4097743e551fe26cfc7eb3ff143"},
{file = "numpy-1.17.4-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:43bb4b70585f1c2d153e45323a886839f98af8bfa810f7014b20be714c37c447"},
{file = "numpy-1.17.4-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:c7354e8f0eca5c110b7e978034cd86ed98a7a5ffcf69ca97535445a595e07b8e"},
{file = "numpy-1.17.4-cp35-cp35m-win32.whl", hash = "sha256:64874913367f18eb3013b16123c9fed113962e75d809fca5b78ebfbb73ed93ba"},
{file = "numpy-1.17.4-cp35-cp35m-win_amd64.whl", hash = "sha256:6ca4000c4a6f95a78c33c7dadbb9495c10880be9c89316aa536eac359ab820ae"},
{file = "numpy-1.17.4-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:75fd817b7061f6378e4659dd792c84c0b60533e867f83e0d1e52d5d8e53df88c"},
{file = "numpy-1.17.4-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:7d81d784bdbed30137aca242ab307f3e65c8d93f4c7b7d8f322110b2e90177f9"},
{file = "numpy-1.17.4-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:fe39f5fd4103ec4ca3cb8600b19216cd1ff316b4990f4c0b6057ad982c0a34d5"},
{file = "numpy-1.17.4-cp36-cp36m-win32.whl", hash = "sha256:e467c57121fe1b78a8f68dd9255fbb3bb3f4f7547c6b9e109f31d14569f490c3"},
{file = "numpy-1.17.4-cp36-cp36m-win_amd64.whl", hash = "sha256:8d0af8d3664f142414fd5b15cabfd3b6cc3ef242a3c7a7493257025be5a6955f"},
{file = "numpy-1.17.4-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:9679831005fb16c6df3dd35d17aa31dc0d4d7573d84f0b44cc481490a65c7725"},
{file = "numpy-1.17.4-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:acbf5c52db4adb366c064d0b7c7899e3e778d89db585feadd23b06b587d64761"},
{file = "numpy-1.17.4-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:3d52298d0be333583739f1aec9026f3b09fdfe3ddf7c7028cb16d9d2af1cca7e"},
{file = "numpy-1.17.4-cp37-cp37m-win32.whl", hash = "sha256:475963c5b9e116c38ad7347e154e5651d05a2286d86455671f5b1eebba5feb76"},
{file = "numpy-1.17.4-cp37-cp37m-win_amd64.whl", hash = "sha256:0c0763787133dfeec19904c22c7e358b231c87ba3206b211652f8cbe1241deb6"},
{file = "numpy-1.17.4-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:683828e50c339fc9e68720396f2de14253992c495fdddef77a1e17de55f1decc"},
{file = "numpy-1.17.4-cp38-cp38-manylinux1_i686.whl", hash = "sha256:e2e9d8c87120ba2c591f60e32736b82b67f72c37ba88a4c23c81b5b8fa49c018"},
{file = "numpy-1.17.4-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:a8f67ebfae9f575d85fa859b54d3bdecaeece74e3274b0b5c5f804d7ca789fe1"},
{file = "numpy-1.17.4-cp38-cp38-win32.whl", hash = "sha256:0a7a1dd123aecc9f0076934288ceed7fd9a81ba3919f11a855a7887cbe82a02f"},
{file = "numpy-1.17.4-cp38-cp38-win_amd64.whl", hash = "sha256:ada4805ed51f5bcaa3a06d3dd94939351869c095e30a2b54264f5a5004b52170"},
{file = "numpy-1.17.4.zip", hash = "sha256:f58913e9227400f1395c7b800503ebfdb0772f1c33ff8cb4d6451c06cabdf316"},
{file = "numpy-1.16.5-cp27-cp27m-macosx_10_9_x86_64.whl", hash = "sha256:37fdd3bb05caaaacac58015cfa38e38b006ee9cef1eaacdb70bb68c16ac7db1d"},
{file = "numpy-1.16.5-cp27-cp27m-manylinux1_i686.whl", hash = "sha256:f42e21d8db16315bc30b437bff63d6b143befb067b8cd396fa3ef17f1c21e1a0"},
{file = "numpy-1.16.5-cp27-cp27m-manylinux1_x86_64.whl", hash = "sha256:4208b225ae049641a7a99ab92e84ce9d642ded8250d2b6c9fd61a7fa8c072561"},
{file = "numpy-1.16.5-cp27-cp27m-win32.whl", hash = "sha256:4d790e2a37aa3350667d8bb8acc919010c7e46234c3d615738564ddc6d22026f"},
{file = "numpy-1.16.5-cp27-cp27m-win_amd64.whl", hash = "sha256:1594aec94e4896e0688f4f405481fda50fb70547000ae71f2e894299a088a661"},
{file = "numpy-1.16.5-cp27-cp27mu-manylinux1_i686.whl", hash = "sha256:2c5a556272c67566e8f4607d1c78ad98e954fa6c32802002a4a0b029ad8dd759"},
{file = "numpy-1.16.5-cp27-cp27mu-manylinux1_x86_64.whl", hash = "sha256:3a96e59f61c7a8f8838d0f4d19daeba551c5f07c5cdd5c81e8e9d4089ade0042"},
{file = "numpy-1.16.5-cp35-cp35m-macosx_10_9_x86_64.whl", hash = "sha256:612297115bade249a118616c065597ff2e5e1f47ed220d7ba71f3e6c6ebcd814"},
{file = "numpy-1.16.5-cp35-cp35m-manylinux1_i686.whl", hash = "sha256:dbc9e9a6a5e0c4f57498855d4e30ef8b599c0ce13fdf9d64299197508d67d9e8"},
{file = "numpy-1.16.5-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:fada0492dd35412cd96e0578677e9a4bdae8f102ef2b631301fcf19066b57119"},
{file = "numpy-1.16.5-cp35-cp35m-win32.whl", hash = "sha256:ada1a1cd68b9874fa480bd287438f92bd7ce88ca0dd6e8d56c70f2b3dab97314"},
{file = "numpy-1.16.5-cp35-cp35m-win_amd64.whl", hash = "sha256:27aa457590268cb059c47daa8c55f48c610ce81da8a062ec117f74efa9124ec9"},
{file = "numpy-1.16.5-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:03b28330253904d410c3c82d66329f29645eb54a7345cb7dd7a1529d61fa603f"},
{file = "numpy-1.16.5-cp36-cp36m-manylinux1_i686.whl", hash = "sha256:911d91ffc6688db0454d69318584415f7dfb0fc1b8ac9b549234e39495684230"},
{file = "numpy-1.16.5-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:ceb353e3ae840ce76256935b18c17236ca808509f231f41d5173d7b2680d5e77"},
{file = "numpy-1.16.5-cp36-cp36m-win32.whl", hash = "sha256:e6ce7c0051ed5443f8343da2a14580aa438822ae6526900332c4564f371d2aaf"},
{file = "numpy-1.16.5-cp36-cp36m-win_amd64.whl", hash = "sha256:9a2b950bca9faca0145491ae9fd214c432f2b1e36783399bc2c3732e7bcc94f4"},
{file = "numpy-1.16.5-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:00836128feaf9a7c7fedeea05ad593e7965f523d23fe3ffbf20cfffd88e9f2b1"},
{file = "numpy-1.16.5-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:3d6a354bb1a1ce2cabd47e0bdcf25364322fb55a29efb59f76944d7ee546d8b6"},
{file = "numpy-1.16.5-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:f7fb27c0562206787011cf299c03f663c604b58a35a9c2b5218ba6485a17b145"},
{file = "numpy-1.16.5-cp37-cp37m-win32.whl", hash = "sha256:46469e7fcb689036e72ce61c3d432ed35eb4c71b5119e894845b434b0fae5813"},
{file = "numpy-1.16.5-cp37-cp37m-win_amd64.whl", hash = "sha256:fb207362394567343d84c0462ec3ba203a21c78be9a0fdbb94982e76859ec37e"},
{file = "numpy-1.16.5.zip", hash = "sha256:8bb452d94e964b312205b0de1238dd7209da452343653ab214b5d681780e7a0c"},
]
obsplus = [
{file = "obsplus-0.0.2-py3-none-any.whl", hash = "sha256:76a9ee599205450785e3a37f41d547d115d7bbc18c3a4ae0374775f04ba6d80a"},
......@@ -1586,7 +1584,10 @@ urllib3 = [
{file = "urllib3-1.25.7-py2.py3-none-any.whl", hash = "sha256:a8a318824cc77d1fd4b2bec2ded92646630d7fe8619497b142c84a9e6f5a7293"},
{file = "urllib3-1.25.7.tar.gz", hash = "sha256:f3c5fd51747d450d4dcf6f923c81f78f811aab8205fda64b0aba34a4e48b0745"},
]
xarray = []
xarray = [
{file = "xarray-0.14.0-py3-none-any.whl", hash = "sha256:9a4f97c6a7fdf9a6dd873ac679a86abfa1910263a85774d69bc3c0fa1e7967f5"},
{file = "xarray-0.14.0.tar.gz", hash = "sha256:a8b93e1b0af27fa7de199a2d36933f1f5acc9854783646b0f1b37fed9b4da091"},
]
xarray-extras = [
{file = "xarray_extras-0.4.2.tar.gz", hash = "sha256:f458174d2adc66a947dbee6929242f56c1afa5a1c507b982d5ec4f4ee7e31e69"},
]
......
......@@ -25,6 +25,7 @@ dask = {extras = ["complete"], version = "^2.6.0"}
pandas = "^0.25.3"
toolz = "^0.10.0"
obspy = "^1.1.1"
numpy = "1.16.5"
appdirs = "^1.4.3"
obsplus = "^0.0.2"
zarr = "^2.3.2"
......
......@@ -53,9 +53,14 @@ class Node(object):
)
return requests
@dask.delayed
def __call__(self, x, request=None):
raise NotImplementedError()
def __call__(self, data=None, request=None, delayed=False):
if delayed:
return dask.delayed(self.forward(data=data, request=request))
else:
return self.forward(data=data, request=request)
def forward(self, x, request):
raise NotImplementedError
def get_config(self):
""" returns a dictionary of configurations to recreate the state
......@@ -66,8 +71,12 @@ class Node(object):
class StuettNode(Node): # TODO: define where this class should be (maybe not here)
def __init__(self, **kwargs):
self.config = locals().copy()
while "kwargs" in self.config and self.config["kwargs"]:
while "kwargs" in self.config:
if "kwargs" not in self.config["kwargs"]:
self.config.update(self.config["kwargs"])
break
self.config.update(self.config["kwargs"])
del self.config["kwargs"]
del self.config["self"]
......
......@@ -23,12 +23,32 @@ import warnings
import numpy as np
import pandas as pd
import datetime as dt
from pathlib import Path
class DataSource(StuettNode):
def __init__(self, **kwargs):
super().__init__(kwargs=kwargs)
pass
def __call__(self, data=None, request=None, delayed=False):
if data is not None:
if request is None:
request = data
else:
warnings.warning(
"Two inputs (data, request) provided to the DataSource but it can only handle a request. Choosing request. "
)
# DataSource only require a request
# Therefore merge permanent config and request
config = self.config.copy() # TODO: do we need a deep copy?
if request is not None:
config.update(request)
if delayed:
return dask.delayed(self.forward(config))
else:
return self.forward(config)
def configure(self, requests=None):
""" Default configure for DataSource nodes
......@@ -49,6 +69,7 @@ class DataSource(StuettNode):
class SeismicSource(DataSource):
def __init__(
self,
path=None,
station=None,
channel=None,
start_time=None,
......@@ -68,6 +89,7 @@ class SeismicSource(DataSource):
return_obspy {bool} -- By default an xarray is returned. If true, an obspy stream will be returned (default: {False})
"""
super().__init__(
path=path,
station=station,
channel=channel,
start_time=start_time,
......@@ -77,15 +99,18 @@ class SeismicSource(DataSource):
kwargs=kwargs,
)
@dask.delayed
def __call__(self, request=None):
config = self.config.copy()
if request is not None:
config.update(request)
def forward(self, request=None):
config = request
if config["use_arclink"]:
arclink = get_setting("arclink")
try:
arclink = get_setting("arclink")
except KeyError as err:
raise RuntimeError(
f"The following error occured \n{err}. "
"Please provide either the credentials to access arclink or a path to the dataset"
)
arclink_user = arclink["user"]
arclink_password = arclink["password"]
fdsn_client = Client(
......@@ -102,26 +127,37 @@ class SeismicSource(DataSource):
endtime=UTCDateTime(config["end_time"]),
attach_response=True,
)
# TODO: remove response x.remove_response(output=vel)
x = x.slice(
UTCDateTime(config["start_time"]), UTCDateTime(config["end_time"])
)
# TODO: potentially resample
else: # 20180914 is last full day available in permasense_vault
# logging.info('Loading seismic with fdsn')
x = self.get_obspy_stream(
config["path"],
config["start_time"],
config["end_time"],
config["station"],
config["channel"],
)
x = x.slice(UTCDateTime(config["start_time"]), UTCDateTime(config["end_time"]))
# TODO: remove response x.remove_response(output=vel)
# x = self.process_seismic_data(x)
if not config["return_obspy"]:
x = obspy_to_array(x)
# we assume that all starttimes are equal
starttime = x.starttime.values.squeeze()[0]
for s in x.starttime.values.squeeze():
if s != starttime:
raise RuntimeError(
"Please make sure that starttime of each seimsic channel is equal"
)
# change time coords from relative to absolute time
starttime = obspy.UTCDateTime(x.starttime.values).datetime
starttime = obspy.UTCDateTime(starttime).datetime
starttime = pd.to_datetime(starttime, utc=True).tz_localize(
None
) # TODO: change when xarray #3291 is fixed
......@@ -134,8 +170,86 @@ class SeismicSource(DataSource):
return x
def process_seismic_data(
self,
stream,
remove_response=True,
unit="VEL",
station_inventory=None,
detrend=True,
taper=False,
pre_filt=(0.025, 0.05, 124.0, 125.0),
water_level=60,
apply_filter=True,
freqmin=0.002,
freqmax=125,
resample=False,
resample_rate=250,
rotation_angle=None,
):
# author: Samuel Weber
if station_inventory is None:
station_inventory = Path(get_setting("metadata_directory")).joinpath(
"inventory_stations__MH.xml"
)
print(station_inventory)
inv = obspy.read_inventory(str(station_inventory))
# st = stream.copy()
st = stream
st.attach_response(inv)
if taper:
st.detrend("demean")
st.detrend("linear")
if taper:
st.taper(max_percentage=0.05)
if remove_response:
if hasattr(st[0].stats, "response"):
st.remove_response(
output=unit,
pre_filt=pre_filt,
plot=False,
zero_mean=False,
taper=False,
water_level=water_level,
)
else:
st.remove_response(
output=unit,
inventory=inv,
pre_filt=pre_filt,
plot=False,
zero_mean=False,
taper=False,
water_level=water_level,
)
if taper:
st.detrend("demean")
st.detrend("linear")
if taper:
st.taper(max_percentage=0.05)
if apply_filter:
st.filter("bandpass", freqmin=freqmin, freqmax=freqmax)
if resample:
st.resample(resample_rate)
if rotation_angle is None:
rotation_angle = np.nan
if not np.isnan(rotation_angle):
st.rotate("NE->RT", back_azimuth=int(rotation_angle), inventory=inv)
return st
def get_obspy_stream(
self,
path,
start_time,
end_time,
station,
......@@ -144,6 +258,7 @@ class SeismicSource(DataSource):
verbose=False,
fill=0,
fill_sampling_rate=1000,
old_stationname=False,
):
"""
Loads the microseismic data for the given timeframe into a miniseed file.
......@@ -164,15 +279,12 @@ class SeismicSource(DataSource):
if not isinstance(channels, list):
channels = [channels]
datadir = (
os.path.join(get_setting("permasense_vault_dir"), "")
+ "geophones/binaries/PS/%s/" % station
)
datadir = Path(path)
if not os.path.isdir(datadir):
# TODO: should this be an error or only a warning. In a period execution this could stop the whole script
raise IOError(
"Cannot find the path {}. Please provide a correct path to the permasense_vault directory".format(
"Cannot find the path {}. Please provide a correct path to the permasense geophone directory".format(
datadir
)
)
......@@ -197,15 +309,19 @@ class SeismicSource(DataSource):
st_list = obspy.Stream()
datayear = timerange[i].strftime("%Y/")
sn = "MHDL" if station == "MH36" else "MHDT" # TODO: do not hardcode it
if old_stationname:
station = (
"MHDL" if station == "MH36" else "MHDT"
) # TODO: do not hardcode it
filenames = {}
for channel in channels:
filenames[channel] = (
datadir
+ datayear
+ "%s.D/PS.%s.A.%s.D." % (channel, sn, channel)
filenames[channel] = datadir.joinpath(
station,
datayear,
"%s.D/" % channel,
"4D.%s.A.%s.D." % (station, channel)
+ timerange[i].strftime("%Y%m%d_%H%M%S")
+ ".miniseed"
+ ".miniseed",
)
# print(filenames[channel])
if not os.path.isfile(filenames[channel]):
......@@ -238,7 +354,7 @@ class SeismicSource(DataSource):
else:
continue
else:
st = obspy.read(filenames[channel])
st = obspy.read(str(filenames[channel]))
st_list += st
......@@ -264,7 +380,7 @@ class SeismicSource(DataSource):
return stream
class MHDSLRFilenames(StuettNode):
class MHDSLRFilenames(DataSource):
def __init__(
self, base_directory=None, method="directory", start_time=None, end_time=None
):
......@@ -286,7 +402,7 @@ class MHDSLRFilenames(StuettNode):
end_time=end_time,
)
def __call__(self, request=None):
def forward(self, request=None):
"""Retrieves the images for the selected time period from the server. If only a start_time timestamp is provided,
the file with the corresponding date will be loaded if available. For periods (when start and end time are given)
all available images are indexed first to provide an efficient retrieval.
......@@ -300,10 +416,7 @@ class MHDSLRFilenames(StuettNode):
Returns:
dataframe -- Returns containing the image filenames of the selected period.
"""
config = self.config.copy() # TODO: do we need a deep copy?
if request is not None:
config.update(request)
config = request
methods = ["directory", "web"]
if config["method"].lower() not in methods:
raise RuntimeError(
......@@ -375,12 +488,17 @@ class MHDSLRFilenames(StuettNode):
if end_time is None:
if start_time < imglist_df.index[0]:
start_time = imglist_df.index[0]
return imglist_df.iloc[
imglist_df.index.get_loc(start_time, method="nearest")
]
else:
# if end_time.tzinfo is None:
# end_time = end_time.replace(tzinfo=timezone.utc)
print(imglist_df)
if start_time > imglist_df.index[-1] or end_time < imglist_df.index[0]:
# return empty dataframe
return imglist_df[0:0]
if start_time < imglist_df.index[0]:
start_time = imglist_df.index[0]
......@@ -630,21 +748,17 @@ class MHDSLRImages(MHDSLRFilenames):
)
self.config["output_format"] = output_format
def __call__(self, request):
config = self.config.copy() # TODO: do we need a deep copy?
if request is not None:
config.update(request)
filenames = super().__call__(request)
def forward(self, request):
filenames = super().forward(request=request)
if config["output_format"] is "xarray":
if request["output_format"] is "xarray":
return self.construct_xarray(filenames)
elif config["output_format"] is "base64":
elif request["output_format"] is "base64":
return self.construct_base64(filenames)
else:
output_formats = ["xarray", "base64"]
raise RuntimeError(
f"The {config['output_format']} output_format is not supported. Allowed formats are {output_formats}"
f"The {request['output_format']} output_format is not supported. Allowed formats are {output_formats}"
)
def construct_xarray(self, filenames):
......@@ -722,19 +836,14 @@ class Freezer(StuettNode):
def open_zarr(self, requests):
ds_zarr = xr.open_zarr(self.store)
print('read',ds_zarr)
print("read", ds_zarr)
def __call__(self, x=None, requests=None):
if x is not None: # TODO: check if this is always good
if requests is None:
requests = x
else:
raise RuntimeError("No input provided")
def forward(self, data=None, request=None):
self.to_zarr(x)
self.open_zarr(requests)
self.to_zarr(data)
self.open_zarr(request)
return x
return data
# TODO: check request start_time and load the data which is available, store the data which is not available
# TODO: crop
......@@ -745,13 +854,8 @@ class CsvSource(DataSource):
filename=filename, start_time=start_time, end_time=end_time, kwargs=kwargs
)
def __call__(self, request=None):
# TODO: This will stay but the rest will move to forward()
config = self.config.copy()
if request is not None:
config.update(request)
csv = pd.read_csv(self.config["filename"])
def forward(self, request):
csv = pd.read_csv(request["filename"])
csv.set_index("time", inplace=True)
csv.index = pd.to_datetime(csv.index, utc=True).tz_localize(
None
......@@ -775,12 +879,12 @@ class CsvSource(DataSource):
# TODO: add a warning or test explicitly if units exist
pass
if "start_time" not in config:
config["start_time"] = x.coords["time"][0]
if "end_time" not in config:
config["end_time"] = x.coords["time"][-1]
if "start_time" not in request:
request["start_time"] = x.coords["time"][0]
if "end_time" not in request:
request["end_time"] = x.coords["time"][-1]
x = x.sel(time=slice(config["start_time"], config["end_time"]))
x = x.sel(time=slice(request["start_time"], request["end_time"]))
return x
......@@ -808,12 +912,8 @@ class BoundingBoxAnnotation(DataSource):
kwargs=kwargs,
)