To receive notifications about scheduled maintenance, please subscribe to the mailing-list gitlab-operations@sympa.ethz.ch. You can subscribe to the mailing-list at https://sympa.ethz.ch

Commit c50bbfda authored by matthmey's avatar matthmey
Browse files

fix image loader timezone issues

parent aae5d294
...@@ -224,8 +224,9 @@ def configuration(delayed, request, keys=None, default_merge=None): ...@@ -224,8 +224,9 @@ def configuration(delayed, request, keys=None, default_merge=None):
else: else:
new_request = requests[k] new_request = requests[k]
if (new_request[0] is not None and if (
"requires_request" in new_request[0] new_request[0] is not None
and "requires_request" in new_request[0]
and new_request[0]["requires_request"] == True and new_request[0]["requires_request"] == True
): ):
del new_request[0]["requires_request"] del new_request[0]["requires_request"]
...@@ -240,7 +241,7 @@ def configuration(delayed, request, keys=None, default_merge=None): ...@@ -240,7 +241,7 @@ def configuration(delayed, request, keys=None, default_merge=None):
remove[d] = remove[d] and (new_request[0] is None) remove[d] = remove[d] and (new_request[0] is None)
else: else:
requests[d] = new_request requests[d] = new_request
# if we received None # if we received None
remove[d] = new_request[0] is None remove[d] = new_request[0] is None
# only configure each node once in a round! # only configure each node once in a round!
...@@ -258,11 +259,11 @@ def configuration(delayed, request, keys=None, default_merge=None): ...@@ -258,11 +259,11 @@ def configuration(delayed, request, keys=None, default_merge=None):
for k in input_requests: for k in input_requests:
# Here we assume that we always receive the same tuple of (bound method, data, request) # Here we assume that we always receive the same tuple of (bound method, data, request)
# If the interface changes this will break #TODO: check for all cases # If the interface changes this will break #TODO: check for all cases
if isinstance(out[k][2], tuple): if isinstance(out[k][2], tuple):
# TODO: find a better inversion of to_task_dask() # TODO: find a better inversion of to_task_dask()
if out[k][2][0] == dict: if out[k][2][0] == dict:
my_dict = {item[0]:item[1] for item in out[k][2][1] } my_dict = {item[0]: item[1] for item in out[k][2][1]}
my_dict.update(input_requests[k]) my_dict.update(input_requests[k])
out[k] = out[k][:2] + (my_dict,) out[k] = out[k][:2] + (my_dict,)
else: else:
......
...@@ -646,7 +646,9 @@ class MHDSLRFilenames(DataSource): ...@@ -646,7 +646,9 @@ class MHDSLRFilenames(DataSource):
# TODO: make the index timezone aware # TODO: make the index timezone aware
imglist_df.set_index("start_time", inplace=True) imglist_df.set_index("start_time", inplace=True)
imglist_df.index = pd.to_datetime(imglist_df.index, utc=True) imglist_df.index = pd.to_datetime(imglist_df.index, utc=True).tz_localize(
None
) # TODO: change when xarray #3291 is fixed
imglist_df.sort_index(inplace=True) imglist_df.sort_index(inplace=True)
set_setting("image_list_df", imglist_df) set_setting("image_list_df", imglist_df)
......
...@@ -113,7 +113,7 @@ def test_merging(): ...@@ -113,7 +113,7 @@ def test_merging():
x = bypass(x) x = bypass(x)
x = node(x, delayed=True) x = node(x, delayed=True)
x_b = node(x, delayed=True) x_b = node(x, delayed=True)
x = merge([x_b, x],delayed=True) x = merge([x_b, x], delayed=True)
# create a configuration file # create a configuration file
config = {"start_time": 0, "end_time": 1} config = {"start_time": 0, "end_time": 1}
...@@ -127,26 +127,25 @@ def test_merging(): ...@@ -127,26 +127,25 @@ def test_merging():
# TODO: Test default_merge # TODO: Test default_merge
# TODO: # TODO:
try: try:
''' """
This still fails. Configuration cannot handle "apply" nodes in the dask graph. This still fails. Configuration cannot handle "apply" nodes in the dask graph.
The problem arises if we have a branch that needs to merge request and the The problem arises if we have a branch that needs to merge request and the
apply node is the one which receives the two requests. Then it requires a apply node is the one which receives the two requests. Then it requires a
default merge handler, which is not supplied (and does not need to) because the default merge handler, which is not supplied (and does not need to) because the
underlying function in this case supplies the merger. underlying function in this case supplies the merger.
''' """
x = source(delayed=True)(dask_key_name="source") x = source(delayed=True)(dask_key_name="source")
x = bypass(x)(dask_key_name="bypass") x = bypass(x)(dask_key_name="bypass")
x = node(x, delayed=True)(dask_key_name="node") x = node(x, delayed=True)(dask_key_name="node")
x_b = node(x, delayed=True)(dask_key_name="branch") # branch x_b = node(x, delayed=True)(dask_key_name="branch") # branch
x = merge([x_b, x],delayed=True)(dask_key_name="merge") x = merge([x_b, x], delayed=True)(dask_key_name="merge")
# create a configuration file # create a configuration file
config = {"start_time": 0, "end_time": 1} config = {"start_time": 0, "end_time": 1}
import dask import dask
dsk, dsk_keys = dask.base._extract_graph_and_keys([x]) dsk, dsk_keys = dask.base._extract_graph_and_keys([x])
...@@ -163,4 +162,5 @@ def test_merging(): ...@@ -163,4 +162,5 @@ def test_merging():
print(e) print(e)
pass pass
test_merging()
\ No newline at end of file test_merging()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment