Commit 2bfd459b authored by matthmey's avatar matthmey

minor fixes

parent c9acd43b
......@@ -1153,7 +1153,7 @@ class Freezer(StuettNode):
# Note: the chunks used in this freeze module is different from the chunks used to store the data
# One freeze_chunk can include many down to zero data points. Thus the availability of a freeze_chunk
# Does not necessarily mean that there is data available. It means that we have tried to load the data
# Does not necessarily mean that there is data available. It means that we have tried to load the data
# from the underlying source once before and stored the result (no matter how much data the source provided)
meta = read_csv_with_store(self.config['store'],self.config['groupname']+'.csv')
......@@ -1168,6 +1168,12 @@ class Freezer(StuettNode):
reload = True
current_chunk = current_chunk + offset
# TODO: create a new path in the graph for every chunk we need to compute. Then, we can let dask handle the
# multiprocessing
# TODO: with only one start/end_time (as done below) it is inefficient for the case where [unavailable,available,unavailable]
# since we need to load everything, also data which is already stored
# one option could be to duplicate the graph by returning multiple requests...
if not reload:
# if we return None the previous node will be cut out of the graph
# and this freeze node gets the request to provide the data
......@@ -1182,9 +1188,7 @@ class Freezer(StuettNode):
requests["requires_request"] = True
# TODO: check how we need to update the boundaries such that we get data that fits and that is available
# TODO: with only one start/end_time it might be inefficient for the case where [unavailable,available,unavailable] since we need to load everything
# one option could be to duplicate the graph by returning multiple requests...
# TODO: add node specific hash to freeze_output_start_time (there might be multiple in the graph) <- probably not necessary because we receive a copy of the request which is unique to this node
# TODO: maybe the configuration method must add (and delete) the node name in the request?
......@@ -1203,7 +1207,9 @@ class Freezer(StuettNode):
ds_zarr = ds_zarr.sortby('time')
indexers = request[f"{self.config['groupname']}_indexers"]
ds_zarr = ds_zarr.sel(indexers)
return ds_zarr
da = xr.DataArray(ds_zarr['frozen'])
return da
def forward(self, data=None, request=None):
if request is not None and 'bypass' in request and request['bypass']:
......@@ -1223,12 +1229,10 @@ class Freezer(StuettNode):
print(current_chunk, end_chunk)
if current_chunk not in meta[dim]:
meta_list.append(current_chunk)
if current_chunk+offset != end_chunk:
# avoid having data points twice
off = offset - pd.to_timedelta('1 ns')
else:
off = offset
# avoid having data points twice
off = offset - pd.to_timedelta('1 ns')
current_indexers = {'time':slice(current_chunk,current_chunk+off)}
# TODO: Issue here is that we call a dask computation within a dask computation
self.to_zarr(data.sel(current_indexers),request)
current_chunk = current_chunk + offset
......@@ -1241,8 +1245,6 @@ class Freezer(StuettNode):
data = self.open_zarr(request)
return data
# TODO: check request start_time and load the data which is available, store the data which is not available
# TODO: crop
class CsvSource(DataSource):
......
......@@ -136,6 +136,13 @@ def test_freeze():
x = freezer(delayed=True)
x = stuett.core.configuration(x, request)
print("from disk", x.compute())
# trying to load a fraction from disk
request = {"start_time": "2017-07-02", "end_time": "2017-07-03"}
x = freezer(delayed=True)
x = stuett.core.configuration(x, request)
print("from disk", x.compute())
exit()
# Note there is a gap
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment