Commit 930f2d7c authored by matthmey's avatar matthmey
Browse files

added configuration and seimsic data sources

parent f8f45ba5
# Byte-compiled / optimized / DLL files
# C extensions
# Distribution / packaging
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
# Installer logs
# Unit test / coverage reports
# Translations
# Django stuff:
# Sphinx documentation
# PyBuilder
# Rep Ignores
This diff is collapsed.
name = "stuett"
version = "0.1.0"
description = "Data processing framework for PermaSense project"
license = "MIT"
authors = [
"Matthias Meyer <>"
readme = '' # Markdown files are supported
repository = ""
homepage = ""
keywords = ['data analysis', 'data processing']
python = "^3.7" # Compatible python versions must be declared here
toml = "^0.9"
# Dependencies with extras
dask = {extras = ["complete"], version = "^2.6.0"}
xarray = "^0.14.0"
pandas = "^0.25.3"
toolz = "^0.10.0"
obspy = "^1.1.1"
appdirs = "^1.4.3"
obsplus = "^0.0.2"
# Optional dependencies (extras)
pytest = "^3.0"
pytest-cov = "^2.4"
black = {version = "^19.10b0", allows-prereleases = true}
from __future__ import absolute_import
from . import data
from . import global_config
import dask
# TODO: make it a proper decorator with arguments etc
def dat(x):
""" Helper function to tranform input callable into
dask.delayed object
From low german 'stütt dat!' which means "support it!"
x {callable} -- Any input callable which is supported
by dask.delayed
dask.delayed object --
return dask.delayed(x)
\ No newline at end of file
from .graph import *
\ No newline at end of file
import dask
from dask.core import get_dependencies, flatten
import numpy as np
import copy
class Node(object):
def __init__(self):
def configure(self,requests):
""" Before a task graph is executed each node is configured.
The request is propagated from the end to the beginning
of the DAG and each nodes "configure" routine is called.
The request can be updated to reflect additional requirements,
The return value gets passed to predecessors.
Essentially the following question must be answered:
What do I need to fulfil the request of my successor?
Here, you must not configure the internal parameters of the
Node otherwise it would not be thread-safe. You can however
introduce a new key 'requires_request' in the request being
returned. This request will then be passed as an argument
to the __call__ function.
Best practice is to configure the Node on initialization with
runtime independent configurations and define all runtime
dependant configurations here.
requests {List} -- List of requests (i.e. dictionaries).
dict -- The (updated) request. If updated modifications
must be made on a copy of the input. The return value
must be a dictionary.
If multiple requests are input to this function they
must be merged.
If nothing needs to be requested an empty dictionary
can be return. This removes all dependencies of this
node from the task graph.
if not isinstance(requests,list):
raise RuntimeError('Please provide a **list** of request')
if len(requests) > 1:
raise RuntimeError('Default configuration function cannot handle '
'multiple requests. Please provide a custom '
'configuration implementation')
return requests
def __call__(self,x,request=None):
raise NotImplementedError()
def get_config(self):
""" returns a dictionary of configurations to recreate the state
raise NotImplementedError()
class StuettNode(Node): # TODO: define where this class should be (maybe not here)
def configure(self,requests):
""" Default configure for stuett nodes
Expects two keys per request (*start_time* and *tend*)
If multiple requests are passed, they will be merged
start_time = minimum of all requests' start_time
end_time = maximum of all requests' end_time
request {list} -- List of requests
dict -- Original request or merged requests
if not isinstance(requests,list):
raise RuntimeError('Please provide a list of request')
# For time requests we just use the union of both time segments
new_request = requests[0].copy()
key_func = {'start_time':np.minimum, 'end_time':np.maximum}
for r in requests[1:]:
for key in ['start_time', 'end_time']:
if key in r:
if key in new_request:
new_request[key] = key_func[key](new_request[key],r[key])
new_request[key] = r[key]
return new_request
def configuration(delayed,request,keys=None,default_merge=None):
""" Configures each node of the graph by propagating the request from outputs
to inputs.
Each node checks if it can fulfil the request and what it needs to fulfil the request.
If a node requires additional configurations to fulfil the request it can set the
'requires_request' flag in the returned request and this function will add the
return request as a a new input to the node's __call__(). See also Node.configure()
delayed {dask.delayed or list} -- Delayed object or list of delayed objects
request {dict or list} -- request (dict), list of requests
default_merge {callable} -- request merge function
Keyword Arguments:
keys {[type]} -- [description] (default: {None})
RuntimeError: [description]
RuntimeError: [description]
dask.delayed or list -- Config-optimized delayed object or list of delayed objects
if not isinstance(delayed,list):
collections = [delayed]
# dsk = dask.base.collections_to_dsk(collections)
dsk, dsk_keys = dask.base._extract_graph_and_keys(collections)
dependencies,dependants = dask.core.get_deps(dsk)
if keys is None:
keys = dsk_keys
# print('keys',keys)
if not isinstance(keys, (list, set)):
keys = [keys]
out_keys = []
seen = set()
work = list(set(flatten(keys)))
if isinstance(request,list):
if len(request) != len(work):
raise RuntimeError("When passing multiple request items "
"The number of request items must be same "
"as the number of keys")
requests = {work[i]: [request[i]] for i in range(len(request)) }
requests = {k: [request] for k in work }
remove = {k:False for k in work}
input_requests = {}
while work:
new_work = []
out_keys += work
deps = []
for k in work:
# if k not in requests:
# # there wasn't any request stored use initial config
# requests[k] = [config]
# check if we have collected all dependencies so far
# we will come back to this node another time
# TODO: make a better check for the case when dependants[k] is a set, also: why is it a set in the first place..?
if k in dependants and len(dependants[k]) != len(requests[k]) and not isinstance(dependants[k],set):
# print(f'Waiting at {k}', dependants[k], requests[k])
# print(f"configuring {k}",requests[k])
# set configuration for this node k
# If we create a delayed object from a class, `self` will be dsk[k][1]
if isinstance(dsk[k],tuple) and isinstance(dsk[k][1],Node): # Check if we get a node of type Node class
# current_requests = [r for r in requests[k] if r] # get all requests belonging to this node
current_requests = requests[k]
new_request = dsk[k][1].configure(current_requests) # Call the class configuration function
if not isinstance(new_request,list): # prepare the request return value
new_request = [new_request]
else: # We didn't get a Node class so there is no
# custom configuration function: pass through
if len(requests[k]) > 1:
if callable(default_merge):
new_request = default_merge(requests[k])
raise RuntimeError("No valid default merger supplied. Cannot merge requests. "
"Either convert your function to a class Node or provide "
"a default merger")
new_request = requests[k]
if 'requires_request' in new_request[0] and new_request[0]['requires_request'] == True:
del new_request[0]['requires_request']
input_requests[k] = copy.deepcopy(new_request[0]) #TODO: check if we need a deepcopy here!
# update dependencies
current_deps = get_dependencies(dsk, k, as_list=True)
for i, d in enumerate(current_deps):
if d in requests:
requests[d] += new_request
remove[d] = remove[d] and (not new_request[0])
requests[d] = new_request
remove[d] = (not new_request[0]) # if we received an empty dictionary flag deps for removal
# only configure each node once in a round!
if d not in new_work and d not in work: # TODO: verify this
new_work.append(d) # TODO: Do we need to configure dependency if we'll remove it?
work = new_work
# Assembling the configured new graph
out = {k: dsk[k] for k in out_keys if not remove[k]}
# After we have aquired all requests we can input the required_requests as a input node to the requiring node
for k in input_requests:
out[k] += (input_requests[k],)
# convert to delayed object
from dask.delayed import Delayed
in_keys = list(flatten(keys))
if len(in_keys) > 1:
collection = [Delayed(key=key,dsk=out) for key in in_keys]
collection = Delayed(key=in_keys[0],dsk=out)
if isinstance(collection,list):
collection = [collection]
return collection
class Freezer(Node):
def __init__(self,caching=True):
self.caching = caching
def __call__(self, x):
"""If caching is enabled load a cached result or stores the input data and returns it
x {xarray or dict} -- Either the xarray data to be passed through (and cached)
or request dictionary containing information about the data
to be loaded
xarray -- Data loaded from cache or input data passed through
if isinstance(x,dict):
if self.is_cached(x) and self.caching:
# TODO: load from cache and return it
elif not self.caching:
raise RuntimeError(f'If caching is disabled cannot perform request {x}')
raise RuntimeError(f'Result is not cached but cached result is requested with {x}')
if self.caching:
# TODO: store the input data
return x
def configure(self,requests):
if self.caching:
return [{}]
return config_conflict(requests)
def optimize_freeze(dsk, keys, request_key='request'):
""" Return new dask with tasks removed which are unnecessary because a later stage
reads from cache
``keys`` may be a single key or list of keys.
dsk: culled dask graph
dependencies: Dict mapping {key: [deps]}. Useful side effect to accelerate
other optimizations, notably fuse.
if not isinstance(keys, (list, set)):
keys = [keys]
out_keys = []
seen = set()
dependencies = dict()
if (request_key not in dsk):
raise RuntimeError(f"Please provide a task graph which includes '{request_key}'")
request = dsk[request_key]
def is_cached(task,request):
if isinstance(task,tuple):
if isinstance(task[0],Freezer):
return task[0].is_cached(request)
return False
work = list(set(flatten(keys)))
cached_keys = []
while work:
new_work = []
out_keys += work
deps = []
for k in work:
if is_cached(dsk[k],request):
deps.append((k, get_dependencies(dsk, k, as_list=True)))
for _, deplist in deps:
for d in deplist:
if d not in seen:
work = new_work
out = {k: dsk[k] for k in out_keys}
# finally we need to replace the input of the caching nodes with the request
cached = {k: (out[k][0],request_key) for k in cached_keys}
return out, dependencies
from .management import *
# from .processing import *
# from .collection import *
\ No newline at end of file
from ..global_config import get_setting
from ..core.graph import StuettNode
import os
import dask
import logging
import obspy
from obspy.clients.fdsn import Client
from obspy import UTCDateTime
from obsplus import obspy_to_array
#TODO: revisit the following packages
import numpy as np
import pandas as pd
import datetime as dt
import warnings
class DataSource(StuettNode):
def __init__(self):
def configure(self,requests=None):
""" Default configure for DataSource nodes
Same as configure from StuettNode but adds is_source flag
request {list} -- List of requests
dict -- Original request or merged requests
requests = super().configure(requests)
requests['requires_request'] = True
return requests
class Freezer(StuettNode):
def __init__(self):
def configure(self,x=None, requests=None):
""" Default configure for DataSource nodes
Same as configure from StuettNode but adds is_source flag
request {list} -- List of requests
dict -- Original request or merged requests
requests = super().configure(requests)
requests['requires_request'] = True #TODO: only if data partially available
return requests
class SeismicSource(DataSource):
def __init__(self,config={},use_arclink=False):
""" Seismic data source to get data from permasense
The user can predefine the source's settings or provide them in a request
Predefined setting should never be updated (to stay thread safe), but will be ignored
if request contains settings
DataSource {[type]} -- [description]
config {dict} -- configuration for seismic sources
self.config = config
self.use_arclink = use_arclink
if 'source' not in self.config:
self.config['source'] = None
# TODO: make fallback to permasense_vault
if use_arclink:
arclink_user = get_setting('arclink_user')
arclink_password = get_setting('arclink_password')
self.fdsn_client = Client(base_url='',user=arclink_user,password=arclink_password)
def __call__(self,request=None):
#'Loading seismic with fdsn')
print('loading with seismic')
# TODO: check if request contains settings
config = self.config.copy()
if request is not None:
if self.use_arclink:
st = self.fdsn_client.get_waveforms(network='4D', station=config['station'], location='A',
channel=config['channel'], starttime=UTCDateTime(config['start_time']), endtime=UTCDateTime(config['end_time']), attach_response=True)
else: #20180914 is last full day available in permasense_vault
st = self.get_obspy_stream(config['start_time'],config['end_time'],config['station'],config['channel'])
x = obspy_to_array(st)
starttime = obspy.UTCDateTime(x.starttime.values).datetime
starttime = pd.to_datetime(starttime,utc=True)
timedeltas = pd.to_timedelta(x['time'].values,unit='seconds')
xt = starttime + timedeltas
x['time'] = pd.to_datetime(xt,utc=True)
return x
def get_obspy_stream(self,
Loads the microseismic data for the given timeframe into a miniseed file.
tbeg {datetime} -- start timestamp of the desired obspy stream
tend {datetime} -- end timestamp of the desired obspy stream
Keyword Arguments:
pad {bool} -- If padding is true, the data will be zero padded if the data is not consistent
fill {} -- If numpy.nan or fill value: error in the seismic stream will be filled with the value. If None no fill will be used
verbose {bool} -- If info should be printed
obspy stream -- obspy stream with up to three channels
the stream's channels will be sorted alphabetically
if not isinstance(channels,list):
channels = [channels]
datadir = os.path.join(get_setting('permasense_vault_dir'), '') + 'geophones/binaries/PS/%s/'%station
if not os.path.isdir(datadir):
# TODO: should this be an error or only a warning. In a period execution this could stop the whole script
raise IOError('Cannot find the path {}. Please provide a correct path to the permasense_vault directory'.format(datadir))
# We will get the full hours seismic data and trim it to the desired length afterwards
tbeg_hours = pd.to_datetime(tbeg).replace(minute=0,second=0,microsecond=0)
timerange = pd.date_range(start=tbeg_hours,end=tend, freq='H')
non_existing_files_ts = [] # keep track of nonexisting files
# drawback of memmap files is that we need to calculate the size beforehand