'''MIT License Copyright (c) 2019, Swiss Federal Institute of Technology (ETH Zurich), Matthias Meyer Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.''' import stuett from tests.stuett.sample_data import * import datetime as dt import pandas as pd import base64 import os import pytest import zarr from pathlib import Path test_data_dir = Path(__file__).absolute().parent.joinpath("..", "data") stuett.global_config.set_setting("user_dir", test_data_dir.joinpath("user_dir/")) class TestSeismicSource(object): @pytest.mark.slow def test_seismic_source(self): # first test without config seismic_source = stuett.data.SeismicSource() # with config seismic_source = stuett.data.SeismicSource(config) @pytest.mark.slow def test_call(self): # first test without config seismic_source = stuett.data.SeismicSource(use_arclink=True) x = seismic_source(config) x = x.compute() print(x) return assert x.mean() == -55.73599312780376 # with config seismic_source = stuett.data.SeismicSource(config, use_arclink=True) request = {"start_time": start_time + offset, "end_time": end_time + offset} x = seismic_source(request) x = x.compute() assert x.mean() == -61.23491399790934 # TODO: make test to compare start_times @pytest.mark.slow def test_gsn(): # test_data = pd.read_csv(test_data_dir + 'matterhorn_27_temperature_rock.csv',index_col='time') gsn_node = stuett.data.GSNDataSource( deployment="matterhorn", position=30, vsensor="temperature_rock" ) x = gsn_node({"start_time": "2017-07-01", "end_time": "2017-07-02"}) assert x.sum() == 1600959.34 # TODO: proper testing # test_gsn() def test_delayed(): filename = Path(test_data_dir).joinpath( "timeseries", "MH30_temperature_rock_2017.csv" ) node = stuett.data.CsvSource(filename) x = node(delayed=True) from dask.delayed import Delayed assert isinstance(x, Delayed) x = x.compute() non_delayed = node(delayed=False) assert x.mean() == 0.16201109532441677 assert non_delayed.mean() == 0.16201109532441677 def test_freeze(): # with config # seismic_source = stuett.data.SeismicSource(config,use_arclink=True) filename = Path(test_data_dir).joinpath( "timeseries", "MH30_temperature_rock_2017.csv" ) node = stuett.data.CsvSource(filename) user_dir = stuett.global_config.get_setting("user_dir") store_name = user_dir.joinpath("frozen", "test.zarr") import shutil shutil.rmtree(store_name, ignore_errors=True) store = zarr.DirectoryStore(store_name) # account_name = stuett.global_config.get_setting('azure')['account_name'] if stuett.global_config.setting_exists('azure') else "storageaccountperma8980" # account_key = stuett.global_config.get_setting('azure')['account_key'] if stuett.global_config.setting_exists('azure') else None # store = zarr.ABSStore(container='hackathon-on-permafrost', prefix='dataset/test.zarr', account_name=account_name, account_key=account_key, blob_service_kwargs={}) freezer = stuett.data.Freezer(store) request = {"start_time": "2017-07-01", "end_time": "2017-08-01"} x = node(request=request) x = freezer(x) request = {"start_time": "2017-09-01", "end_time": "2017-10-01"} x = freezer(node(request=request)) # x = freezer() print("final", x) shutil.rmtree(store_name, ignore_errors=True) # test_freeze() def test_image_filenames(): # first test without config node = stuett.data.MHDSLRFilenames(base_directory=test_data_dir.joinpath("MHDSLR")) start_time = dt.datetime(2017, 8, 6, 9, 56, 12, tzinfo=dt.timezone.utc) end_time = dt.datetime(2017, 8, 6, 10, 14, 10, tzinfo=dt.timezone.utc) offset = dt.timedelta(days=1) config_0 = { "channel": channels[0], "station": stations[0], "start_time": start_time, "end_time": end_time, } data = node(request=config_0) config_1 = config.copy() # this should return and empty list data = node(config_1) # Test if we do not provide a end_time del config_0["end_time"] data = node(config_0) del config_1["end_time"] data = node(config_1) config_1["start_time"] = dt.datetime(2018, 8, 6, 20, 0, 0, tzinfo=dt.timezone.utc) data = node(config_1) # test_image_filenames() def test_mhdslrimage(): base_dir = Path(test_data_dir).joinpath("MHDSLR") node = stuett.data.MHDSLRImages(base_directory=base_dir) start_time = dt.datetime(2017, 8, 6, 9, 50, 12, tzinfo=dt.timezone.utc) end_time = dt.datetime(2017, 8, 6, 10, 12, 10, tzinfo=dt.timezone.utc) offset = dt.timedelta(days=1) config = { "start_time": start_time, "end_time": end_time, } data = node(config) config["output_format"] = "base64" data = node(config) # TODO: assert data from PIL import Image img = Image.open(base_dir.joinpath("2017-08-06", "20170806_095212.JPG")) img_base64 = base64.b64encode(img.tobytes()) assert data[0].values == img_base64 # test_mhdslrimage() def test_csv(): filename = Path(test_data_dir).joinpath( "timeseries", "MH30_temperature_rock_2017.csv" ) node = stuett.data.CsvSource(filename) x = node() assert len(x) == 8760 assert x.mean() == 0.16201109532441677 # Test with store directory = Path(test_data_dir).joinpath( "timeseries") store = stuett.DirectoryStore(directory) filename = "MH30_temperature_rock_2017.csv" node = stuett.data.CsvSource(filename=filename,store=store) x = node() assert len(x) == 8760 assert x.mean() == 0.16201109532441677 # TODO: test with start and end time test_csv() def test_annotations(): filename = Path(test_data_dir).joinpath("annotations", "boundingbox_timeseries.csv") node = stuett.data.BoundingBoxAnnotation(filename) targets = node() assert targets[0] == 'no_visibility' filename = Path(test_data_dir).joinpath("annotations", "boundingbox_images.csv") node = stuett.data.BoundingBoxAnnotation(filename) targets = node() targets = targets.swap_dims({"index": "start_time"}) targets = targets.sortby("start_time") assert targets.sel(start_time='2017-08-04T08:12:11') == 'mountaineer' assert targets['start_y'][1] == 16 # test_annotations() def test_datasets(): filename = Path(test_data_dir).joinpath("annotations", "boundingbox_timeseries.csv") label = stuett.data.BoundingBoxAnnotation(filename) filename = Path(test_data_dir).joinpath( "timeseries", "MH30_temperature_rock_2017.csv" ) data = stuett.data.CsvSource(filename) dataset = stuett.data.SegmentedDataset( data, label, dataset_slice={"time": slice("2017-08-01", "2017-08-03")}, batch_dims={"time": pd.to_timedelta(24, "m")}, ) x = dataset[0] test_datasets()