Commit c9e2e7e4 authored by matthmey's avatar matthmey

optimized freeze

parent e2e35c66
This diff is collapsed.
......@@ -29,7 +29,7 @@ numpy = "1.16.5"
appdirs = "^1.4.3"
obsplus = { git = "https://github.com/niosh-mining/obsplus" }
zarr = "^2.3.2"
xarray = { git = "https://github.com/niowniow/xarray.git", branch = "strided_rolling" }
xarray = { git = "https://github.com/niowniow/xarray.git" }
pillow = "^6.2.1"
xarray-extras = "^0.4.2"
lttb = "^0.2.0"
......
......@@ -114,16 +114,23 @@ class StuettNode(Node): # TODO: define where this class should be (maybe not he
request {list} -- List of requests
Returns:
dict -- Original request or merged requests
dict -- Original request or merged requests or None if no request is needed
"""
if not isinstance(requests, list):
raise RuntimeError("Please provide a list of request")
# For time requests we just use the union of both time segments
new_request = requests[0].copy()
new_request = None
key_func = {"start_time": np.minimum, "end_time": np.maximum}
for r in requests[1:]:
for r in requests:
if r is None:
continue
if new_request is None:
new_request = r
continue
for key in ["start_time", "end_time"]:
if key in r:
if key in new_request:
......@@ -259,13 +266,20 @@ def configuration(delayed, request, keys=None, default_merge=None):
# update dependencies
current_deps = get_dependencies(dsk, k, as_list=True)
for i, d in enumerate(current_deps):
to_be_removed = False
if new_request[0] is None:
to_be_removed = True
elif "remove_dependencies" in new_request[0]:
to_be_removed = new_request[0]["remove_dependencies"]
del new_request[0]["remove_dependencies"]
if d in requests:
requests[d] += new_request
remove[d] = remove[d] and (new_request[0] is None)
remove[d] = remove[d] and to_be_removed
else:
requests[d] = new_request
# if we received None
remove[d] = new_request[0] is None
remove[d] = to_be_removed
# only configure each node once in a round!
if d not in new_work and d not in work: # TODO: verify this
......@@ -280,6 +294,8 @@ def configuration(delayed, request, keys=None, default_merge=None):
# After we have aquired all requests we can input the required_requests as a input node to the requiring node
# we assume that the last argument is the request
for k in input_requests:
if k not in out:
continue
# Here we assume that we always receive the same tuple of (bound method, data, request)
# If the interface changes this will break #TODO: check for all cases
......
......@@ -31,6 +31,7 @@ from copy import deepcopy
from pathlib import Path
import dask
# TODO: revisit the following packages
import numpy as np
import obspy
......@@ -100,10 +101,11 @@ class DataSource(StuettNode):
Returns:
dict -- Original request or merged requests
"""
requests = super().configure(requests) # merging request here
requests["requires_request"] = True
request = super().configure(requests) # merging request here
if request is not None:
request["requires_request"] = True
return requests
return request
class GSNDataSource(DataSource):
......@@ -126,6 +128,9 @@ class GSNDataSource(DataSource):
)
def forward(self, data=None, request=None):
if request is None:
return None
# code from Samuel Weber
#### 1 - DEFINE VSENSOR-DEPENDENT COLUMNS ####
metadata = Path(get_setting("metadata_directory")).joinpath(
......@@ -231,7 +236,7 @@ class GSNDataSource(DataSource):
isnull = df.isnull().all()
[df.drop([col_name], axis=1, inplace=True) for col_name in df.columns[isnull]]
df['time'] = pd.to_datetime(df['time'])
df["time"] = pd.to_datetime(df["time"])
df = df.set_index("time")
df = df.sort_index(axis=1)
......@@ -694,7 +699,9 @@ class MHDSLRFilenames(DataSource):
# try to reload it and write to remote
imglist_df = self.image_integrity_store(config["store"])
try:
to_csv_with_store(config["store"], filename, imglist_df, dict(index=False))
to_csv_with_store(
config["store"], filename, imglist_df, dict(index=False)
)
success = True
except Exception as e:
print(e)
......@@ -866,10 +873,11 @@ class MHDSLRFilenames(DataSource):
continue
segments = pd.DataFrame(images_list)
segments.drop_duplicates(inplace=True, subset="start_time")
segments.start_time = pd.to_datetime(segments.start_time, utc=True)
segments.end_time = pd.to_datetime(segments.end_time, utc=True)
segments.sort_values("start_time")
if not segments.empty:
segments.drop_duplicates(inplace=True, subset="start_time")
segments.start_time = pd.to_datetime(segments.start_time, utc=True)
segments.end_time = pd.to_datetime(segments.end_time, utc=True)
segments.sort_values("start_time")
return segments
......@@ -1072,23 +1080,27 @@ class MHDSLRImages(MHDSLRFilenames):
class Freezer(StuettNode):
def __init__(self, store, groupname, dim, offset, sort_after_load=False):
# synchronizer = zarr.ThreadSynchronizer()
synchronizer = zarr.ThreadSynchronizer()
# synchronizer = zarr.ProcessSynchronizer(filename+'.sync')
if dim != 'time':
raise NotImplementedError('Currently Freezer only supports `time` dimension')
try:
meta = read_csv_with_store(store,groupname+'.csv')
except Exception as e:
print(e)
# create meta
# TODO: use zarr store (with xarray dataset) since we need to make use of the synchronizers
meta = pd.DataFrame(columns=[dim])
to_csv_with_store(store,groupname+'.csv',meta,{'index':False})
if dim != "time":
raise NotImplementedError(
"Currently Freezer only supports `time` dimension"
)
super().__init__(store=store, groupname=groupname, dim=dim, offset=offset, sort_after_load=sort_after_load)
super().__init__(
store=store,
groupname=groupname,
dim=dim,
offset=offset,
synchronizer=synchronizer,
sort_after_load=sort_after_load,
)
# create meta
# TODO: use zarr store (with xarray dataset) since we need to make use of the synchronizers
# meta = pd.DataFrame(columns=[dim])
# to_csv_with_store(store,groupname+'.csv',meta,{'index':False})
# Init storage if non-existent
......@@ -1118,14 +1130,16 @@ class Freezer(StuettNode):
else:
return self.forward(data, config)
def to_chunk_indexers(self,index, offset):
offset = self.config['offset']
def to_chunk_indexers(self, index, offset):
offset = self.config["offset"]
# TODO: make this index type dependant
reference_value = pd.to_datetime('2000-01-01')
reference_value = pd.to_datetime("2000-01-01")
index = pd.to_datetime(index)
chunk_index = np.floor((index - reference_value)/offset)*offset+reference_value
chunk_index = (
np.floor((index - reference_value) / offset) * offset + reference_value
)
return chunk_index
......@@ -1140,57 +1154,80 @@ class Freezer(StuettNode):
"""
requests = super().configure(requests)
dim = self.config['dim']
dim = self.config["dim"]
# TODO: change to indexers
# if dim not in request['indexers']:
# # return ...
# start_index = request['indexers'][dim].start
# end_index = request['indexers'][dim].stop
offset = self.config['offset']
# end_index = request['indexers'][dim].stop
offset = self.config["offset"]
start_chunk = self.to_chunk_indexers(requests['start_time'],offset)
end_chunk = self.to_chunk_indexers(requests['end_time'],offset)
start_chunk = self.to_chunk_indexers(requests["start_time"], offset)
end_chunk = self.to_chunk_indexers(requests["end_time"], offset)
# Note: the chunks used in this freeze module is different from the chunks used to store the data
# One freeze_chunk can include many down to zero data points. Thus the availability of a freeze_chunk
# Does not necessarily mean that there is data available. It means that we have tried to load the data
# from the underlying source once before and stored the result (no matter how much data the source provided)
meta = read_csv_with_store(self.config['store'],self.config['groupname']+'.csv')
reload = False
try:
# meta = read_csv_with_store(store,groupname+'.csv')
# meta = xr.open_zarr(
# self.config["store"], group=self.config["groupname"] + "_meta"
# )
meta = zarr.open_array(self.config["store"], path=self.config["groupname"] + "_meta", mode='a',synchronizer=self.config["synchronizer"])
except Exception as e:
print(e)
# Reload everything!
reload = True
# meta = xr.open_zarr(self.config['store'],group=self.config['groupname']+'_meta')
if start_chunk > end_chunk:
raise RuntimeError('start_index is greater than end_index. Freezer cannot handle it')
raise RuntimeError(
"start_index is greater than end_index. Freezer cannot handle it"
)
reload = False
current_chunk = start_chunk
while current_chunk <= end_chunk:
if current_chunk not in meta[dim]:
reload = True
current_chunk = current_chunk + offset
if not reload:
current_chunk = start_chunk
print(current_chunk)
print(meta)
# print(meta.shape,meta[:])
while current_chunk <= end_chunk:
if current_chunk.to_datetime64() not in meta:
reload = True
current_chunk = current_chunk + offset
# TODO: create a new path in the graph for every chunk we need to compute. Then, we can let dask handle the
# multiprocessing
# TODO: with only one start/end_time (as done below) it is inefficient for the case where [unavailable,available,unavailable]
# since we need to load everything, also data which is already stored
# one option could be to duplicate the graph by returning multiple requests...
# TODO: create a new path in the graph for every chunk we need to compute. Then, we can let dask handle the
# multiprocessing
# TODO: with only one start/end_time (as done below) it is inefficient for the case where [unavailable,available,unavailable]
# since we need to load everything, also data which is already stored
# one option could be to duplicate the graph by returning multiple requests...
print("Returnint None", reload)
if not reload:
# if we return None the previous node will be cut out of the graph
# and this freeze node gets the request to provide the data
return None
requests[f"{self.config['groupname']}_indexers"] = {
"time": slice(requests["start_time"], requests["end_time"])
}
requests["requires_request"] = True
requests["remove_dependencies"] = True
print("removing dependencies")
return requests
# else we will give the previous node the request to load/compute the values for the required chunks
# additionally we will update the request for this node to contain the original indexers
# additionally we will update the request for this node to contain the original indexers
# requests['frozen_indexers'] = requests['indexers'] # TODO: change to indexers
requests[f"{self.config['groupname']}_indexers"] = {'time':slice(requests['start_time'],requests['end_time'])}
requests['start_time'] = start_chunk
requests['end_time'] = end_chunk + offset
requests[f"{self.config['groupname']}_indexers"] = {
"time": slice(requests["start_time"], requests["end_time"])
}
requests["start_time"] = start_chunk
requests["end_time"] = end_chunk + offset
requests["requires_request"] = True
# TODO: add node specific hash to freeze_output_start_time (there might be multiple in the graph) <- probably not necessary because we receive a copy of the request which is unique to this node
# TODO: maybe the configuration method must add (and delete) the node name in the request?
......@@ -1200,54 +1237,121 @@ class Freezer(StuettNode):
x = data.to_dataset(name="frozen")
# x = x.chunk({name: x[name].shape for name in list(x.dims)})
# zarr_dataset = zarr.open(self.store, mode='r')
x.to_zarr(request['store'],group=self.config['groupname'], append_dim="time")
# try:
# ds_zarr = xr.open_zarr(request['store'],group=self.config['groupname'])
# # print('loaded data',ds_zarr)
# except:
# pass
# # print('data',data)
x.to_zarr(
request["store"],
group=self.config["groupname"],
append_dim="time",
synchronizer=self.config["synchronizer"],
)
def open_zarr(self, request):
ds_zarr = xr.open_zarr(request['store'],group=self.config['groupname'])
def open_zarr(self, request, indexers=None):
print("############################# opening zarr ")
ds_zarr = xr.open_zarr(request["store"], group=self.config["groupname"])
# TODO: this might be inefficient (depending on how xarray sorts)
ds_zarr = ds_zarr.sortby("time")
if f"{self.config['groupname']}_indexers" in request:
ds_zarr = ds_zarr.sortby('time')
indexers = request[f"{self.config['groupname']}_indexers"]
ds_zarr = ds_zarr.sel(indexers)
current_indexers = request[f"{self.config['groupname']}_indexers"]
elif indexers is not None:
current_indexers = indexers
if current_indexers is not None:
ds_zarr = ds_zarr.sel(current_indexers)
da = xr.DataArray(ds_zarr['frozen'])
da = xr.DataArray(ds_zarr["frozen"])
return da
def forward(self, data=None, request=None):
if request is not None and 'bypass' in request and request['bypass']:
if request is not None and "bypass" in request and request["bypass"]:
return data
dim = self.config['dim']
if data is not None:
indexers = request[f"{self.config['groupname']}_indexers"]
start_chunk = self.to_chunk_indexers(indexers['time'].start,self.config['offset'])
end_chunk = self.to_chunk_indexers(indexers['time'].stop,self.config['offset'])
dim = self.config["dim"]
indexers = request[f"{self.config['groupname']}_indexers"]
start_chunk = self.to_chunk_indexers(
indexers["time"].start, self.config["offset"]
)
end_chunk = self.to_chunk_indexers(
indexers["time"].stop, self.config["offset"]
)
meta = read_csv_with_store(self.config['store'],self.config['groupname']+'.csv')
current_chunk = start_chunk
offset = self.config['offset']
meta_list = []
while current_chunk <= end_chunk:
# print(current_chunk, end_chunk)
if current_chunk not in meta[dim]:
meta_list.append(current_chunk)
# avoid having data points twice
off = offset - pd.to_timedelta('1 ns')
current_indexers = {'time':slice(current_chunk,current_chunk+off)}
# TODO: Issue here is that we call a dask computation within a dask computation
# print(current_indexers)
# print(data)
data_sel = data.sel(current_indexers)
self.to_zarr(data_sel,request)
# meta = read_csv_with_store(self.config['store'],self.config['groupname']+'.csv')
current_chunk = current_chunk + offset
meta_update = pd.DataFrame(meta_list,columns=[dim])
meta = pd.concat([meta,meta_update])
to_csv_with_store(self.config['store'],self.config['groupname']+'.csv',meta,{'index':False})
current_chunk = start_chunk
offset = self.config["offset"]
meta_list = []
chunk_list = []
# if meta:
# meta["time"] = pd.to_datetime(meta["time"].values)
# print(meta)
while current_chunk <= end_chunk:
# avoid having data points twice
off = offset - pd.to_timedelta("1 ns")
current_indexers = {dim: slice(current_chunk, current_chunk + off)}
try:
# meta = xr.open_zarr(
# self.config["store"],
# group=self.config["groupname"] + "_meta",
# synchronizer=self.config["synchronizer"],
# )
meta = zarr.open_array(self.config["store"], path=self.config["groupname"] + "_meta", mode='a',synchronizer=self.config["synchronizer"])
except:
meta = None
if meta is None or current_chunk.to_datetime64() not in meta:
print(data)
data_sel = data.sel(current_indexers)
# TODO: Issue here is that we call a dask computation within a dask computation
self.to_zarr(data_sel, request)
# TODO: include this metadata array into the frozen dataset?
# meta_list.append(
# current_chunk
# )
if meta is None:
meta = zarr.open_array(self.config["store"],
path=self.config["groupname"] + "_meta", mode='w', shape=1, fill_value=current_chunk.to_datetime64(), dtype='M8[ns]',synchronizer=self.config["synchronizer"])
meta[:] = np.array([current_chunk.to_datetime64()])
else:
meta.append(np.array([current_chunk.to_datetime64()]))
# new_meta = xr.Dataset(
# {"timestamps": ([dim], [0])}, coords={dim: [current_chunk]}
# )
# new_meta.to_zarr(
# self.config["store"],
# group=self.config["groupname"] + "_meta",
# append_dim=dim,
# synchronizer=self.config["synchronizer"],
# )
else:
data_sel = self.open_zarr(request, current_indexers)
data = data.sel(indexers)
else:
data = self.open_zarr(request)
print('::::::::::::::::::meta')
print(meta)
print(meta[:])
print(meta.shape)
chunk_list.append(data_sel)
current_chunk = current_chunk + offset
# print('list',meta_list)
# TODO: sort meta data zarr storage
# meta_update = pd.DataFrame(meta_list,columns=[dim])
# meta = pd.concat([meta,meta_update])
# to_csv_with_store(self.config['store'],self.config['groupname']+'.csv',meta,{'index':False})
# print(meta)
new_data = xr.concat(chunk_list, dim=dim)
# TODO: this might be inefficient (depending on how xarray sorts)
new_data = new_data.sortby(dim)
data = new_data.sel(indexers)
return data
......@@ -1452,7 +1556,6 @@ def get_dataset_slices(dims, dataset_slice, stride={}):
return np.array(all_slices)
# from numba import jit
# @jit(nopython=True)
......
......@@ -31,6 +31,7 @@ import xarray as xr
import scipy.signal
import pandas as pd
import lttb
import warnings
class MinMaxDownsampling(StuettNode):
......@@ -55,14 +56,11 @@ class MinMaxDownsampling(StuettNode):
x_min = rolling.min().dropna(dim)
x_max = rolling.max().dropna(dim)
# TODO: better interleave instead of concat
x_ds = xr.concat(
[x_min, x_max], dim
)
x_ds = xr.concat([x_min, x_max], dim)
# TODO: try to avoid this by using interleaving
x_ds = x_ds.sortby(dim)
x_ds = x_ds.sortby(dim)
return x_ds
......@@ -105,17 +103,19 @@ class LTTBDownsampling(StuettNode):
# adapt downsampling rate
n_out = data.sizes[dim] / self.rate
# print('n_out',n_out)
rate = np.ceil(data.sizes[dim] / (n_out-2)).astype(np.int)
rate = np.ceil(data.sizes[dim] / (n_out - 2)).astype(np.int)
# print(rate)
# rate = data.sizes[dim] // (n_out-2)
# rate = self.rate
end = data.isel({dim:-1})
data = data.isel({dim:slice(0,-1)})
rolling = data.rolling({dim: rate},stride=rate)
end = data.isel({dim: -1})
data = data.isel({dim: slice(0, -1)})
rolling = data.rolling(dim={dim: rate}, stride=rate)
# print(rolling)
# print(data.sizes['time']//self.rate)
index_bins = data[dim].rolling({dim: rate},stride=rate).construct("buckets")
index_bins = (
data[dim].rolling(dim={dim: rate}, stride=rate).construct("buckets")
)
# print('index',index_bins)
value_bins = rolling.construct("buckets")
......@@ -137,8 +137,8 @@ class LTTBDownsampling(StuettNode):
prev = None
argmaxes = []
for i in range(value_bins.sizes[dim]):
current_value = value_bins.isel({dim:i})
current_index = index_bins.isel({dim:i})
current_value = value_bins.isel({dim: i})
current_index = index_bins.isel({dim: i})
# print(i,current_value.sizes['buckets'])
# print('cv',current_value)
if i == 0:
......@@ -150,14 +150,14 @@ class LTTBDownsampling(StuettNode):
continue
# prev_data/time contains only one entry on `dim`-axis
a = prev_value
a_time = prev_index
a = prev_value
a_time = prev_index
# current_data/time contains multiple entries on `dim`-axis
bs = current_value
bs = current_value
bs_time = current_index
if i < value_bins.sizes[dim]-1:
next_value = value_mean.isel({dim:i+1})
next_index = index_mean.isel({dim:i+1})
if i < value_bins.sizes[dim] - 1:
next_value = value_mean.isel({dim: i + 1})
next_index = index_mean.isel({dim: i + 1})
else:
next_value = end
next_index = end[dim]
......@@ -166,11 +166,11 @@ class LTTBDownsampling(StuettNode):
bs_minus_a = bs - a
c_minus_p_value = current_value - prev_value
c_minus_p_index = current_index - prev_index
a_minus_bs = a - bs
p_minus_c_value = prev_value - current_value
p_minus_c_index = prev_index - current_index
P_i, P_v = prev_index.astype(np.float), prev_value
C_i, C_v = current_index.astype(np.float), current_value
N_i, N_v = next_index.astype(np.float), next_value
......@@ -188,16 +188,21 @@ class LTTBDownsampling(StuettNode):
# print(current_value.shape)
# print(areas.shape)
# axis_num = current_value.get_axis_num('buckets')
arg = areas.argmax('buckets')
# arg = areas.argmax()
# print('c',current_value)
# print('oi',arg)
c = current_value.isel({'buckets':arg})
try:
arg = areas.argmax("buckets", skipna=False)
except ValueError as e:
warnings.warn(e)
c = 0 # TODO: find a better solution to the ValueError("All-NaN slice encountered")
c = current_value.isel({"buckets": arg})
# print(c)
argmaxes.append(c)
argmaxes.append(end)
array = xr.concat(argmaxes,'time')
array = xr.concat(argmaxes, "time")
# print(array)
# TODO: integrate nicely
......@@ -207,7 +212,9 @@ class LTTBDownsampling(StuettNode):
if len(data.squeeze().dims) > 1:
raise RuntimeError("Can only work with arrays of one dimension")
d = np.array([data["time"].values.astype(np.int64), data.squeeze().values]).T
d = np.array(
[data["time"].values.astype(np.int64), data.squeeze().values]
).T
small_data = lttb.downsample(d, n_out=n_out)
array = xr.DataArray(
......@@ -219,6 +226,7 @@ class LTTBDownsampling(StuettNode):
array.attrs = data.attrs
return array
class Downsampling(StuettNode):
def __init__(self