collection.py 2.57 KB
Newer Older
matthmey's avatar
matthmey committed
1
2
3
4
5
6
7
8
import numpy as np
import pandas as pd
import warnings
import datetime as dt

from .management import DataSource
from ..core import configuration

matthmey's avatar
matthmey committed
9

matthmey's avatar
matthmey committed
10
11
12
13
14
15
16
17
18
19
20
21
22
class DataCollector(DataSource):
    def __init__(self, data_paths=[], granularities=[]):
        """Add and choose data path according to its granularity.
           The data collector returns the data path given an index segment (index_end - index_start).
           The index segment is compared against the given granularities and the mapped data path is
           returned. For example, for a time series where the index is a datetime object, the timedelta
           of (end_time - start_time) is compared against the given list of granularity timedeltas.
        
        Keyword Arguments:
            datapaths {list}        -- a list of data paths, e.g. the leafs of a dask graph (default: {[]})
            granularities {list}    -- a list of sorted granularities (default: {[]})
        """
        super().__init__()
matthmey's avatar
matthmey committed
23

matthmey's avatar
matthmey committed
24
25
26
        self.data_paths = data_paths
        self.granularities = granularities

matthmey's avatar
matthmey committed
27
28
29
30
        if len(self.data_paths) != len(self.granularities):
            raise ValueError(
                "Each granularity is supposed to have its corresponding data manager"
            )
matthmey's avatar
matthmey committed
31
        if len(self.granularities) > 1 and not self.is_sorted(self.granularities):
matthmey's avatar
matthmey committed
32
            raise ValueError("Granularities should be sorted")
matthmey's avatar
matthmey committed
33
34

    def forward(self, data=None, request=None):
matthmey's avatar
matthmey committed
35
36
37
38
        if len(self.data_paths) != len(self.granularities):
            raise ValueError(
                "Each granularity is supposed to have its corresponding data manager"
            )
matthmey's avatar
matthmey committed
39
        if len(self.granularities) > 1 and not self.is_sorted(self.granularities):
matthmey's avatar
matthmey committed
40
            raise ValueError("Granularities should be sorted")
matthmey's avatar
matthmey committed
41
42

        # TODO: change to generic indices or slices
matthmey's avatar
matthmey committed
43
        granularity = request["end_time"] - request["start_time"]
matthmey's avatar
matthmey committed
44
45
46

        data_path = None
        for i in range(len(self.granularities)):
matthmey's avatar
matthmey committed
47
            print(i, granularity, "<", self.granularities[i], self.data_paths[i])
matthmey's avatar
matthmey committed
48
49
50
51
52
            if granularity < self.granularities[i]:
                data_path = self.data_paths[i]
                break

        if data_path is None:
matthmey's avatar
matthmey committed
53
54
            raise AttributeError("No data manager can be used for this timeframe")

matthmey's avatar
matthmey committed
55
56
57
58
59
60
61
62
63
64
65
        return data_path

    def is_sorted(self, l):
        """Check whether a list is sorted
        
        Arguments:
            l {list} -- the list to be determined whether sorted
        
        Returns:
            [bool] -- if the list is sorted, return true
        """
matthmey's avatar
matthmey committed
66
        return all(a <= b for a, b in zip(l, l[1:]))