test_collection.py 1.56 KB
Newer Older
matthmey's avatar
matthmey committed
1
2
import stuett
from pathlib import Path
matthmey's avatar
matthmey committed
3
import pandas as pd
matthmey's avatar
matthmey committed
4
5
6
7
8
9
10
11
12
13
14
15
16

test_data_dir = Path(__file__).absolute().parent.joinpath("..", "data")
stuett.global_config.set_setting("user_dir", test_data_dir.joinpath("user_dir/"))


def test_collector():
    filename = Path(test_data_dir).joinpath(
        "timeseries", "MH30_temperature_rock_2017.csv"
    )

    node = stuett.data.CsvSource(filename)
    minmax_rate2 = stuett.data.MinMaxDownsampling(rate=2, dim="time")

matthmey's avatar
matthmey committed
17
    print("creating delayed node")
matthmey's avatar
matthmey committed
18
19
    x = node(delayed=True)

matthmey's avatar
matthmey committed
20
21
    print("downsampled delayed node")
    downsampled = minmax_rate2(x, delayed=True)
matthmey's avatar
matthmey committed
22

matthmey's avatar
matthmey committed
23
24
25
    print("DataCollector node")
    data_paths = [x, downsampled]
    granularities = [stuett.to_timedelta(180, "s"), stuett.to_timedelta(2, "d")]
matthmey's avatar
matthmey committed
26
27
    collector_node = stuett.data.DataCollector(data_paths, granularities)

matthmey's avatar
matthmey committed
28
29
    print("Instatiating DataCollector node")
    request = {"start_time": "2017-08-01", "end_time": "2017-08-02"}
matthmey's avatar
matthmey committed
30
31
32
    path = collector_node(request=request)
    # print(type(path))

matthmey's avatar
matthmey committed
33
    print("Configuration node")
matthmey's avatar
matthmey committed
34
35
36

    import dask

matthmey's avatar
matthmey committed
37
    dsk, dsk_keys = dask.base._extract_graph_and_keys([path])
38
    print(dict(dsk))
matthmey's avatar
matthmey committed
39

matthmey's avatar
matthmey committed
40
    path = stuett.core.configuration(path, request)
matthmey's avatar
matthmey committed
41
42
43
44
45

    dsk, dsk_keys = dask.base._extract_graph_and_keys([path])
    print(dsk)

    print(type(path))
matthmey's avatar
matthmey committed
46
    print("executing delayed node")
matthmey's avatar
matthmey committed
47
48
49
50
51
52
53
    print(path.compute())

    # request = {'start_time':'2017-08-01 10:01:00', 'end_time':'2017-08-01 10:02:00'}
    # path = collector_node(request=request)
    # path = stuett.core.configuration(path,request)
    # print(path.compute())

matthmey's avatar
matthmey committed
54
55

test_collector()