import numpy as np import pandas as pd import warnings import datetime as dt from .management import DataSource from ..core import configuration class DataCollector(DataSource): def __init__(self, data_paths=[], granularities=[]): """Add and choose data path according to its granularity. The data collector returns the data path given an index segment (index_end - index_start). The index segment is compared against the given granularities and the mapped data path is returned. For example, for a time series where the index is a datetime object, the timedelta of (end_time - start_time) is compared against the given list of granularity timedeltas. Keyword Arguments: datapaths {list} -- a list of data paths, e.g. the leafs of a dask graph (default: {[]}) granularities {list} -- a list of sorted granularities (default: {[]}) """ super().__init__() self.data_paths = data_paths self.granularities = granularities if len(self.data_paths) != len(self.granularities): raise ValueError( "Each granularity is supposed to have its corresponding data manager" ) if len(self.granularities) > 1 and not self.is_sorted(self.granularities): raise ValueError("Granularities should be sorted") def forward(self, data=None, request=None): if len(self.data_paths) != len(self.granularities): raise ValueError( "Each granularity is supposed to have its corresponding data manager" ) if len(self.granularities) > 1 and not self.is_sorted(self.granularities): raise ValueError("Granularities should be sorted") # TODO: change to generic indices or slices granularity = request["end_time"] - request["start_time"] data_path = None for i in range(len(self.granularities)): print(i, granularity, "<", self.granularities[i], self.data_paths[i]) if granularity < self.granularities[i]: data_path = self.data_paths[i] break if data_path is None: raise AttributeError("No data manager can be used for this timeframe") return data_path def is_sorted(self, l): """Check whether a list is sorted Arguments: l {list} -- the list to be determined whether sorted Returns: [bool] -- if the list is sorted, return true """ return all(a <= b for a, b in zip(l, l[1:]))