collection.py 3.68 KB
Newer Older
matthmey's avatar
matthmey committed
1
2
'''MIT License

matthmey's avatar
matthmey committed
3
4
Copyright (c) 2019, Swiss Federal Institute of Technology (ETH Zurich), Matthias Meyer

matthmey's avatar
matthmey committed
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.'''

matthmey's avatar
matthmey committed
24
25
26
27
28
29
30
31
import numpy as np
import pandas as pd
import warnings
import datetime as dt

from .management import DataSource
from ..core import configuration

matthmey's avatar
matthmey committed
32

matthmey's avatar
matthmey committed
33
34
35
36
37
38
39
40
41
42
43
44
45
class DataCollector(DataSource):
    def __init__(self, data_paths=[], granularities=[]):
        """Add and choose data path according to its granularity.
           The data collector returns the data path given an index segment (index_end - index_start).
           The index segment is compared against the given granularities and the mapped data path is
           returned. For example, for a time series where the index is a datetime object, the timedelta
           of (end_time - start_time) is compared against the given list of granularity timedeltas.
        
        Keyword Arguments:
            datapaths {list}        -- a list of data paths, e.g. the leafs of a dask graph (default: {[]})
            granularities {list}    -- a list of sorted granularities (default: {[]})
        """
        super().__init__()
matthmey's avatar
matthmey committed
46

matthmey's avatar
matthmey committed
47
48
49
        self.data_paths = data_paths
        self.granularities = granularities

matthmey's avatar
matthmey committed
50
51
52
53
        if len(self.data_paths) != len(self.granularities):
            raise ValueError(
                "Each granularity is supposed to have its corresponding data manager"
            )
matthmey's avatar
matthmey committed
54
        if len(self.granularities) > 1 and not self.is_sorted(self.granularities):
matthmey's avatar
matthmey committed
55
            raise ValueError("Granularities should be sorted")
matthmey's avatar
matthmey committed
56
57

    def forward(self, data=None, request=None):
matthmey's avatar
matthmey committed
58
59
60
61
        if len(self.data_paths) != len(self.granularities):
            raise ValueError(
                "Each granularity is supposed to have its corresponding data manager"
            )
matthmey's avatar
matthmey committed
62
        if len(self.granularities) > 1 and not self.is_sorted(self.granularities):
matthmey's avatar
matthmey committed
63
            raise ValueError("Granularities should be sorted")
matthmey's avatar
matthmey committed
64
65

        # TODO: change to generic indices or slices
matthmey's avatar
matthmey committed
66
        granularity = request["end_time"] - request["start_time"]
matthmey's avatar
matthmey committed
67
68
69

        data_path = None
        for i in range(len(self.granularities)):
matthmey's avatar
matthmey committed
70
            print(i, granularity, "<", self.granularities[i], self.data_paths[i])
matthmey's avatar
matthmey committed
71
72
73
74
75
            if granularity < self.granularities[i]:
                data_path = self.data_paths[i]
                break

        if data_path is None:
matthmey's avatar
matthmey committed
76
77
            raise AttributeError("No data manager can be used for this timeframe")

matthmey's avatar
matthmey committed
78
79
80
81
82
83
84
85
86
87
88
        return data_path

    def is_sorted(self, l):
        """Check whether a list is sorted
        
        Arguments:
            l {list} -- the list to be determined whether sorted
        
        Returns:
            [bool] -- if the list is sorted, return true
        """
matthmey's avatar
matthmey committed
89
        return all(a <= b for a, b in zip(l, l[1:]))