"""MIT License Copyright (c) 2019, Swiss Federal Institute of Technology (ETH Zurich), Matthias Meyer Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.""" from ..global_config import get_setting, setting_exists, set_setting from ..core.graph import StuettNode, configuration from ..convenience import read_csv_with_store, to_csv_with_store, DirectoryStore from ..convenience import indexers_to_request as i2r import os import dask import logging import obspy from obspy.clients.fdsn import Client from obspy import UTCDateTime from obsplus import obspy_to_array from copy import deepcopy import io from tqdm import tqdm import zarr import xarray as xr from PIL import Image import base64 import re from pathlib import Path import warnings # TODO: revisit the following packages import numpy as np import pandas as pd import datetime as dt from pathlib import Path class DataSource(StuettNode): def __init__(self, **kwargs): super().__init__(kwargs=kwargs) def __call__(self, data=None, request=None, delayed=False): if data is not None: if request is None: request = data else: warnings.warning( "Two inputs (data, request) provided to the DataSource but it can only handle a request. Choosing request. " ) # DataSource only require a request # Therefore merge permanent-config and request config = self.config.copy() # TODO: do we need a deep copy? if request is not None: config.update(request) # TODO: change when rewriting for general indices if "start_time" in config and config["start_time"] is not None: config["start_time"] = pd.to_datetime( config["start_time"], utc=True ).tz_localize( None ) # TODO: change when xarray #3291 is fixed if "end_time" in config and config["end_time"] is not None: config["end_time"] = pd.to_datetime( config["end_time"], utc=True ).tz_localize( None ) # TODO: change when xarray #3291 is fixed if delayed: return dask.delayed(self.forward)(None, config) else: return self.forward(None, config) def configure(self, requests=None): """ Default configure for DataSource nodes Same as configure from StuettNode but adds is_source flag Arguments: request {list} -- List of requests Returns: dict -- Original request or merged requests """ requests = super().configure(requests) # merging request here requests["requires_request"] = True return requests class GSNDataSource(DataSource): def __init__( self, deployment=None, vsensor=None, position=None, start_time=None, end_time=None, **kwargs, ): super().__init__( deployment=deployment, position=position, vsensor=vsensor, start_time=start_time, end_time=end_time, kwargs=kwargs, ) def forward(self, data=None, request=None): # code from Samuel Weber #### 1 - DEFINE VSENSOR-DEPENDENT COLUMNS #### metadata = Path(get_setting("metadata_directory")).joinpath( "vsensor_metadata/{:s}_{:s}.csv".format( request["deployment"], request["vsensor"] ) ) # if not metadata.exists(): colnames = pd.read_csv(metadata, skiprows=0,) columns_old = colnames["colname_old"].values columns_new = colnames["colname_new"].values columns_unit = colnames["unit"].values if len(columns_old) != len(columns_new): warnings.warn( "WARNING: Length of 'columns_old' ({:d}) is not equal length of 'columns_new' ({:d})".format( len(columns_old), len(columns_new) ) ) if len(columns_old) != len(columns_unit): warnings.warn( "WARNING: Length of 'columns_old' ({:d}) is not equal length of 'columns_new' ({:d})".format( len(columns_old), len(columns_unit) ) ) unit = dict(zip(columns_new, columns_unit)) #### 2 - DEFINE CONDITIONS AND CREATE HTTP QUERY #### # Set server server = get_setting("permasense_server") # Create virtual_sensor virtual_sensor = request["deployment"] + "_" + request["vsensor"] # Create query and add time as well as position selection query = ( "vs[1]={:s}" "&time_format=iso" "&timeline=generation_time" "&field[1]=All" "&from={:s}" "&to={:s}" "&c_vs[1]={:s}" "&c_join[1]=and" "&c_field[1]=position" "&c_min[1]={:02d}" "&c_max[1]={:02d}" ).format( virtual_sensor, pd.to_datetime(request["start_time"], utc=True).strftime( "%d/%m/%Y+%H:%M:%S" ), pd.to_datetime(request["end_time"], utc=True).strftime("%d/%m/%Y+%H:%M:%S"), virtual_sensor, int(request["position"]) - 1, request["position"], ) # query extension for images if request["vsensor"] == "binary__mapped": query = ( query + "&vs[2]={:s}&field[2]=relative_file&c_join[2]=and&c_vs[2]={:s}&c_field[2]=file_complete&c_min[2]=0&c_max[2]=1&vs[3]={:s}&field[3]=generation_time&c_join[3]=and&c_vs[3]={:s}&c_field[3]=file_size&c_min[3]=2000000&c_max[3]=%2Binf&download_format=csv".format( virtual_sensor, virtual_sensor, virtual_sensor, virtual_sensor ) ) # Construct url: url = server + "multidata?" + query # if self.verbose: # print('The GSN http-query is:\n{:s}'.format(url)) #### 3 - ACCESS DATA AND CREATE PANDAS DATAFRAME #### d = [] d = pd.read_csv( url, skiprows=2 ) # skip header lines (first 2) for import: skiprows=2 df = pd.DataFrame(columns=columns_new) df.time = pd.to_datetime(d.generation_time, utc=True) # if depo in ['mh25', 'mh27old', 'mh27new', 'mh30', 'jj22', 'jj25', 'mh-nodehealth']: # d = d.convert_objects(convert_numeric=True) # TODO: rewrite to remove warning for k in list(df): df[k] = pd.to_numeric(df[k], errors="ignore") # df = pd.DataFrame(columns=columns_new) # df.time = timestamp2datetime(d['generation_time']/1000) for i in range(len(columns_old)): if columns_new[i] != "time": setattr(df, columns_new[i], getattr(d, columns_old[i])) df = df.sort_values(by="time") # Remove columns with names 'delete' try: df.drop(["delete"], axis=1, inplace=True) except: pass # Remove columns with only 'null' df = df.replace(r"null", np.nan, regex=True) isnull = df.isnull().all() [df.drop([col_name], axis=1, inplace=True) for col_name in df.columns[isnull]] df = df.set_index("time") df = df.sort_index(axis=1) x = xr.DataArray(df, dims=["time", "name"], name="CSV") try: unit_coords = [] for name in x.coords["name"].values: # name = re.sub(r"\[.*\]", "", name).lstrip().rstrip() u = unit[str(name)] u = re.findall(r"\[(.*)\]", u)[0] # name_coords.append(name) unit_coords.append(u) x = x.assign_coords({"unit": ("name", unit_coords)}) except: # TODO: add a warning or test explicitly if units exist pass return x class SeismicSource(DataSource): def __init__( self, path=None, store=None, station=None, channel=None, start_time=None, end_time=None, use_arclink=False, return_obspy=False, **kwargs, ): # TODO: update description """ Seismic data source to get data from permasense The user can predefine the source's settings or provide them in a request Predefined setting should never be updated (to stay thread safe), but will be ignored if request contains settings Keyword Arguments: config {dict} -- Configuration for the seismic source (default: {{}}) use_arclink {bool} -- If true, downloads the data from the arclink service (authentication required) (default: {False}) return_obspy {bool} -- By default an xarray is returned. If true, an obspy stream will be returned (default: {False}) """ super().__init__( path=path, store=store, station=station, channel=channel, start_time=start_time, end_time=end_time, use_arclink=use_arclink, return_obspy=return_obspy, kwargs=kwargs, ) def forward(self, data=None, request=None): config = request if config["use_arclink"]: try: arclink = get_setting("arclink") except KeyError as err: raise RuntimeError( f"The following error occured \n{err}. " "Please provide either the credentials to access arclink or a path to the dataset" ) arclink_user = arclink["user"] arclink_password = arclink["password"] fdsn_client = Client( base_url="http://arclink.ethz.ch", user=arclink_user, password=arclink_password, ) x = fdsn_client.get_waveforms( network="4D", station=config["station"], location="A", channel=config["channel"], starttime=UTCDateTime(config["start_time"]), endtime=UTCDateTime(config["end_time"]), attach_response=True, ) # TODO: potentially resample else: # 20180914 is last full day available in permasense_vault # logging.info('Loading seismic with fdsn') x = self.get_obspy_stream( config, config["path"], config["start_time"], config["end_time"], config["station"], config["channel"], ) x = x.slice(UTCDateTime(config["start_time"]), UTCDateTime(config["end_time"])) # TODO: remove response x.remove_response(output=vel) # x = self.process_seismic_data(x) if not config["return_obspy"]: x = obspy_to_array(x) # we assume that all starttimes are equal starttime = x.starttime.values.reshape((-1,))[0] for s in x.starttime.values.reshape((-1,)): if s != starttime: warnings.warn("Not all starttimes of each seismic channel is equal") # raise RuntimeError( # "Please make sure that starttime of each seismic channel is equal" # ) # change time coords from relative to absolute time starttime = obspy.UTCDateTime(starttime).datetime starttime = pd.to_datetime(starttime, utc=True).tz_localize( None ) # TODO: change when xarray #3291 is fixed timedeltas = pd.to_timedelta(x["time"].values, unit="seconds") xt = starttime + timedeltas x["time"] = pd.to_datetime(xt, utc=True).tz_localize( None ) # TODO: change when xarray #3291 is fixed del x.attrs["stats"] # x.rename({'seed_id':'channels'}) #TODO: rename seed_id to channels return x def process_seismic_data( self, stream, remove_response=True, unit="VEL", station_inventory=None, detrend=True, taper=False, pre_filt=(0.025, 0.05, 45.0, 49.0), water_level=60, apply_filter=True, freqmin=0.002, freqmax=50, resample=False, resample_rate=250, rotation_angle=None, ): # author: Samuel Weber if station_inventory is None: station_inventory = Path(get_setting("metadata_directory")).joinpath( "inventory_stations__MH.xml" ) # print(station_inventory) inv = obspy.read_inventory(str(station_inventory)) # st = stream.copy() st = stream st.attach_response(inv) if detrend: st.detrend("demean") st.detrend("linear") if taper: st.taper(max_percentage=0.05) if remove_response: if hasattr(st[0].stats, "response"): st.remove_response( output=unit, pre_filt=pre_filt, plot=False, zero_mean=False, taper=False, water_level=water_level, ) else: st.remove_response( output=unit, inventory=inv, pre_filt=pre_filt, plot=False, zero_mean=False, taper=False, water_level=water_level, ) if detrend: st.detrend("demean") st.detrend("linear") if taper: st.taper(max_percentage=0.05) if apply_filter: st.filter("bandpass", freqmin=freqmin, freqmax=freqmax) if resample: st.resample(resample_rate) if rotation_angle is None: rotation_angle = np.nan if not np.isnan(rotation_angle): st.rotate("NE->RT", back_azimuth=int(rotation_angle), inventory=inv) return st def get_obspy_stream( self, request, path, start_time, end_time, station, channels, pad=False, verbose=False, fill=0, fill_sampling_rate=1000, old_stationname=False, ): """ Loads the microseismic data for the given timeframe into a miniseed file. Arguments: start_time {datetime} -- start timestamp of the desired obspy stream end_time {datetime} -- end timestamp of the desired obspy stream Keyword Arguments: pad {bool} -- If padding is true, the data will be zero padded if the data is not consistent fill {} -- If numpy.nan or fill value: error in the seismic stream will be filled with the value. If None no fill will be used verbose {bool} -- If info should be printed Returns: obspy stream -- obspy stream with up to three channels the stream's channels will be sorted alphabetically """ if not isinstance(channels, list): channels = [channels] # We will get the full hours seismic data and trim it to the desired length afterwards tbeg_hours = pd.to_datetime(start_time).replace( minute=0, second=0, microsecond=0 ) timerange = pd.date_range(start=tbeg_hours, end=end_time, freq="H") non_existing_files_ts = [] # keep track of nonexisting files # drawback of memmap files is that we need to calculate the size beforehand stream = obspy.Stream() idx = 0 ## loop through all hours for i in range(len(timerange)): # start = time.time() h = timerange[i] st_list = obspy.Stream() datayear = timerange[i].strftime("%Y") if old_stationname: station = ( "MHDL" if station == "MH36" else "MHDT" ) # TODO: do not hardcode it filenames = {} for channel in channels: # filenames[channel] = Path(station).joinpath( # datayear, # "%s.D/" % channel, # "4D.%s.A.%s.D." % (station, channel) # + timerange[i].strftime("%Y%m%d_%H%M%S") # + ".miniseed", # ) filenames[channel] = [ station, datayear, "%s.D" % channel, "4D.%s.A.%s.D." % (station, channel) + timerange[i].strftime("%Y%m%d_%H%M%S") + ".miniseed", ] # print(filenames[channel]) # Load either from store or from filename if request["store"] is not None: # get the file relative to the store store = request["store"] filename = "/".join(filenames[channel]) st = obspy.read(io.BytesIO(store[str(filename)])) else: if not Path(path).isdir(): # TODO: should this be an error or only a warning. In a period execution this could stop the whole script raise IOError( "Cannot find the path {}. Please provide a correct path to the permasense geophone directory".format( datadir ) ) datadir = Path(path) if not datadir.joinpath(*filenames[channel]).exists(): non_existing_files_ts.append(timerange[i]) warnings.warn( RuntimeWarning( "Warning file not found for {} {}".format( timerange[i], datadir.joinpath(filenames[channel]) ) ) ) if fill is not None: # create empty stream of fill value arr = ( np.ones( ( int( np.ceil( fill_sampling_rate * dt.timedelta(hours=1).total_seconds() ) ), ) ) * fill ) st = obspy.Stream([obspy.Trace(arr)]) else: continue else: filename = str(datadir.joinpath(*filenames[channel])) st = obspy.read(filename) st_list += st stream_h = st_list.merge(method=0, fill_value=fill) start_time_0 = start_time if i == 0 else h end_time_0 = ( end_time if i == len(timerange) - 1 else h + dt.timedelta(hours=1) ) segment_h = stream_h.trim( obspy.UTCDateTime(start_time_0), obspy.UTCDateTime(end_time_0), pad=pad, fill_value=fill, ) stream += segment_h stream = stream.merge(method=0, fill_value=fill) stream.sort( keys=["channel"] ) # TODO: change this so that the order of the input channels list is maintained return stream class MHDSLRFilenames(DataSource): def __init__( self, base_directory=None, store=None, method="directory", start_time=None, end_time=None, force_write_to_remote=False, as_pandas=True, ): """ Fetches the DSLR images from the Matterhorn deployment, returns the image filename(s) corresponding to the end and start time provided in either the config dict or as a request to the __call__() function. Arguments: StuettNode {[type]} -- [description] Keyword Arguments: base_directory {[type]} -- [description] method {str} -- [description] (default: {'directory'}) """ if store is None and base_directory is not None: store = DirectoryStore(base_directory) super().__init__( base_directory=base_directory, store=store, method=method, start_time=start_time, end_time=end_time, force_write_to_remote=force_write_to_remote, as_pandas=as_pandas, ) def forward(self, data=None, request=None): """Retrieves the images for the selected time period from the server. If only a start_time timestamp is provided, the file with the corresponding date will be loaded if available. For periods (when start and end time are given) all available images are indexed first to provide an efficient retrieval. Arguments: start_time {datetime} -- If only start_time is given the neareast available image is return. If also end_time is provided the a dataframe is returned containing image filenames from the first image after start_time until the last image before end_time. Keyword Arguments: end_time {datetime} -- end time of the selected period. see start_time for a description. (default: {None}) Returns: dataframe -- Returns containing the image filenames of the selected period. """ config = request methods = ["directory", "web"] if config["method"].lower() not in methods: raise RuntimeError( f"The {config['method']} output_format is not supported. Allowed formats are {methods}" ) if ( config["base_directory"] is None and config["store"] is None ) and output_format.lower() != "web": raise RuntimeError("Please provide a base_directory containing the images") if config["method"].lower() == "web": # TODO: implement raise NotImplementedError("web fetch has not been implemented yet") start_time = config["start_time"] end_time = config["end_time"] if start_time is None: raise RuntimeError("Please provide a least start time") # if it is not timezone aware make it # if start_time.tzinfo is None: # start_time = start_time.replace(tzinfo=timezone.utc) # If there is no tmp_dir we can try to load the file directly, otherwise # there will be a warning later in this function and the user should # set a tmp_dir if end_time is None and ( not setting_exists("user_dir") or not os.path.isdir(get_setting("user_dir")) ): image_filename = self.get_image_filename(start_time) if image_filename is not None: return image_filename # If we have already loaded the dataframe in the current session we can use it if setting_exists("image_list_df"): imglist_df = get_setting("image_list_df") else: filename = "image_integrity.csv" success = False # first try to load it from remote via store if config["store"] is not None: if filename in config["store"]: imglist_df = read_csv_with_store(config["store"], filename) success = True elif config["force_write_to_remote"]: # try to reload it and write to remote imglist_df = self.image_integrity_store(config["store"]) try: to_csv_with_store(config["store"], filename, imglist_df) success = True except Exception as e: print(e) # Otherwise we need to load the filename dataframe from disk if ( not success and setting_exists("user_dir") and os.path.isdir(get_setting("user_dir")) ): imglist_filename = os.path.join(get_setting("user_dir"), "") + filename # If it does not exist in the temporary folder of our application # We are going to create it if os.path.isfile(imglist_filename): # imglist_df = pd.read_parquet( # imglist_filename # ) # TODO: avoid too many different formats imglist_df = pd.read_csv(imglist_filename) else: # we are going to load the full list => no arguments imglist_df = self.image_integrity_store(config["store"]) # imglist_df.to_parquet(imglist_filename) imglist_df.to_csv(imglist_filename, index=False) elif not success: # if there is no tmp_dir we can load the image list but # we should warn the user that this is inefficient imglist_df = self.image_integrity_store(config["store"]) warnings.warn( "No temporary directory was set. You can speed up multiple runs of your application by setting a temporary directory" ) # TODO: make the index timezone aware imglist_df.set_index("start_time", inplace=True) imglist_df.index = pd.to_datetime(imglist_df.index, utc=True).tz_localize( None ) # TODO: change when xarray #3291 is fixed imglist_df.sort_index(inplace=True) set_setting("image_list_df", imglist_df) output_df = None if end_time is None: if start_time < imglist_df.index[0]: start_time = imglist_df.index[0] loc = imglist_df.index.get_loc(start_time, method="nearest") output_df = imglist_df.iloc[loc : loc + 1] else: # if end_time.tzinfo is None: # end_time = end_time.replace(tzinfo=timezone.utc) if start_time > imglist_df.index[-1] or end_time < imglist_df.index[0]: # return empty dataframe output_df = imglist_df[0:0] else: if start_time < imglist_df.index[0]: start_time = imglist_df.index[0] if end_time > imglist_df.index[-1]: end_time = imglist_df.index[-1] output_df = imglist_df.iloc[ imglist_df.index.get_loc( start_time, method="bfill" ) : imglist_df.index.get_loc(end_time, method="ffill") + 1 ] if not request["as_pandas"]: output_df = output_df[["filename"]] # TODO: do not get rid of end_time output_df.index.rename("time", inplace=True) # output = output_df.to_xarray(dims=["time"]) output = xr.Dataset.from_dataframe(output_df).to_array() # print(output) # output = xr.DataArray(output_df['filename'], dims=["time"]) else: output = output_df return output # TODO: write test for image_integrity_store def image_integrity_store( self, store, start_time=None, end_time=None, delta_seconds=0 ): """ Checks which images are available on the permasense server Keyword Arguments: start_time {[type]} -- datetime object giving the lower bound of the time range which should be checked. If None there is no lower bound. (default: {None}) end_time {[type]} -- datetime object giving the upper bound of the time range which should be checked. If None there is no upper bound (default: {None}) delta_seconds {int} -- Determines the 'duration' of an image in the output dataframe. start_time = image_time+delta_seconds end_time = image_time-delta_seconds (default: {0}) Returns: DataFrame -- Returns a pandas dataframe with a list containing filename relative to self.base_directory, start_time and end_time start_time and end_time can vary depending on the delta_seconds parameter """ """ Checks which images are available on the permasense server Arguments: start_time: end_time: delta_seconds: Determines the 'duration' of an image in the output dataframe. start_time = image_time+delta_seconds end_time = image_time-delta_seconds Returns: DataFrame -- """ if start_time is None: # a random year which is before permasense installation started start_time = dt.datetime(1900, 1, 1) if end_time is None: end_time = dt.datetime.utcnow() tbeg_days = start_time.replace(hour=0, minute=0, second=0) tend_days = end_time.replace(hour=23, minute=59, second=59) delta_t = dt.timedelta(seconds=delta_seconds) num_filename_errors = 0 images_list = [] for key in store.keys(): try: pathkey = Path(key) datekey = pathkey.parent.name dir_date = pd.to_datetime(str(datekey), format="%Y-%m-%d") except: # we do not care for files not matching our format continue if pd.isnull(dir_date): continue # limit the search to the explicit time range if dir_date < tbeg_days or dir_date > tend_days: continue # print(file.stem) start_time_str = pathkey.stem try: _start_time = pd.to_datetime(start_time_str, format="%Y%m%d_%H%M%S") if start_time <= _start_time and _start_time <= end_time: images_list.append( { "filename": str(key), "start_time": _start_time - delta_t, "end_time": _start_time + delta_t, } ) except ValueError: # try old naming convention try: start_time = pd.to_datetime(start_time_str, format="%Y%m%d_%H%M%S") if start_time <= _start_time and _start_time <= end_time: images_list.append( { "filename": str(img_file.relative_to(base_directory)), "start_time": _start_time - delta_t, "end_time": _start_time + delta_t, } ) except ValueError: num_filename_errors += 1 warnings.warn( "Permasense data integrity, the following is not a valid image filename and will be ignored: %s" % img_file ) continue segments = pd.DataFrame(images_list) segments.drop_duplicates(inplace=True, subset="start_time") segments.start_time = pd.to_datetime(segments.start_time, utc=True) segments.end_time = pd.to_datetime(segments.end_time, utc=True) segments.sort_values("start_time") return segments def image_integrity( self, base_directory, start_time=None, end_time=None, delta_seconds=0 ): store = DirectoryStore(base_directory) return self.image_integrity_store(store, start_time, end_time, delta_seconds) def get_image_filename(self, timestamp): """ Checks wether an image exists for exactly the time of timestamp and returns its filename timestamp: datetime object for which the filename should be returned # Returns The filename if the file exists, None if there is no file """ datadir = self.base_directory new_filename = ( datadir + timestamp.strftime("%Y-%m-%d") + "/" + timestamp.strftime("%Y%m%d_%H%M%S") + ".JPG" ) old_filename = ( datadir + timestamp.strftime("%Y-%m-%d") + "/" + timestamp.strftime("%Y-%m-%d_%H%M%S") + ".JPG" ) if os.path.isfile(new_filename): return new_filename elif os.path.isfile(old_filename): return old_filename else: return None def get_nearest_image_url(self, IMGparams, imgdate, floor=False): if floor: date_beg = imgdate - dt.timedelta(hours=4) date_end = imgdate else: date_beg = imgdate date_end = imgdate + dt.timedelta(hours=4) vs = [] # predefine vs list field = [] # predefine field list c_vs = [] c_field = [] c_join = [] c_min = [] c_max = [] vs = vs + ["matterhorn_binary__mapped"] field = field + ["ALL"] # select only data from one sensor (position 3) c_vs = c_vs + ["matterhorn_binary__mapped"] c_field = c_field + ["position"] c_join = c_join + ["and"] c_min = c_min + ["18"] c_max = c_max + ["20"] c_vs = c_vs + ["matterhorn_binary__mapped"] c_field = c_field + ["file_complete"] c_join = c_join + ["and"] c_min = c_min + ["0"] c_max = c_max + ["1"] # create url which retrieves the csv data file url = "http://data.permasense.ch/multidata?" url = url + "time_format=" + "iso" url = url + "&from=" + date_beg.strftime("%d/%m/%Y+%H:%M:%S") url = url + "&to=" + date_end.strftime("%d/%m/%Y+%H:%M:%S") for i in range(0, len(vs), 1): url = url + "&vs[%d]=%s" % (i, vs[i]) url = url + "&field[%d]=%s" % (i, field[i]) for i in range(0, len(c_vs), 1): url = url + "&c_vs[%d]=%s" % (i, c_vs[i]) url = url + "&c_field[%d]=%s" % (i, c_field[i]) url = url + "&c_join[%d]=%s" % (i, c_join[i]) url = url + "&c_min[%d]=%s" % (i, c_min[i]) url = url + "&c_max[%d]=%s" % (i, c_max[i]) url = url + "&timeline=%s" % ("generation_time") # print(url) d = pd.read_csv(url, skiprows=2) # print(d) # print(type(d['#data'].values)) d["data"] = [s.replace("&", "&") for s in d["data"].values] d.sort_values(by="generation_time") d["generation_time"] = pd.to_datetime(d["generation_time"], utc=True) if floor: data_str = d["data"].iloc[0] data_filename = d["relative_file"].iloc[0] # print(d['generation_time'].iloc[0]) img_timestamp = d["generation_time"].iloc[0] else: data_str = d["data"].iloc[-1] data_filename = d["relative_file"].iloc[-1] # print(d['generation_time'].iloc[-1]) img_timestamp = d["generation_time"].iloc[-1] file_extension = data_filename[-3:] base_url = "http://data.permasense.ch" # print(base_url + data_str) return base_url + data_str, img_timestamp, file_extension class MHDSLRImages(MHDSLRFilenames): def __init__( self, base_directory=None, store=None, method="directory", output_format="xarray", start_time=None, end_time=None, ): if store is None and base_directory is not None: store = DirectoryStore(base_directory) super().__init__( base_directory=None, store=store, method=method, start_time=start_time, end_time=end_time, ) self.config["output_format"] = output_format def forward(self, data=None, request=None): filenames = super().forward(request=request) if request["output_format"] is "xarray": return self.construct_xarray(filenames) elif request["output_format"] is "base64": return self.construct_base64(filenames) else: output_formats = ["xarray", "base64"] raise RuntimeError( f"The {request['output_format']} output_format is not supported. Allowed formats are {output_formats}" ) def construct_xarray(self, filenames): images = [] times = [] for timestamp, element in filenames.iterrows(): key = element.filename img = Image.open(io.BytesIO(self.config["store"][key])) img = np.array(img.convert("RGB")) images.append(np.array(img)) times.append(timestamp) if images: images = np.array(images) else: images = np.empty((0, 0, 0, 0)) data = xr.DataArray( images, coords={"time": times}, dims=["time", "x", "y", "channels"], name="Image", ) data.attrs["format"] = "jpg" return data def construct_base64(self, filenames): images = [] times = [] for timestamp, element in filenames.iterrows(): key = element.filename img = Image.open(io.BytesIO(self.config["store"][key])) img = np.array(img.convert("RGB")) img_base64 = base64.b64encode(img.tobytes()) images.append(img_base64) times.append(timestamp) images = np.array(images).reshape((-1, 1)) data = xr.DataArray( images, coords={"time": times}, dims=["time", "base64"], name="Base64Image" ) data.attrs["format"] = "jpg" return data class Freezer(StuettNode): def __init__(self, store): self.store = store def configure(self, requests): """ Arguments: request {list} -- List of requests Returns: dict -- Original, updated or merged request(s) """ requests = super().configure(requests) # TODO: check if data is available for requested time period # TODO: check how we need to update the boundaries such that we get data that fits and that is available # TODO: with only one start/end_time it might be inefficient for the case where [unavailable,available,unavailable] since we need to load everything # one option could be to duplicate the graph by returning multiple requests... # TODO: make a distinction between requested start_time and freeze_output_start_time # TODO: add node specific hash to freeze_output_start_time (there might be multiple in the graph) <- probably not necessary because we receive a copy of the request which is unique to this node # TODO: maybe the configuration method must add (and delete) the node name in the request? # we always require a request to crop out the right time period requests["requires_request"] = True return requests def to_zarr(self, x): x = x.to_dataset(name="frozen") # x = x.chunk({name: x[name].shape for name in list(x.dims)}) # zarr_dataset = zarr.open(self.store, mode='r') x.to_zarr(self.store, append_dim="time") def open_zarr(self, requests): ds_zarr = xr.open_zarr(self.store) print("read", ds_zarr) def forward(self, data=None, request=None): self.to_zarr(data) self.open_zarr(request) return data # TODO: check request start_time and load the data which is available, store the data which is not available # TODO: crop class CsvSource(DataSource): def __init__( self, filename=None, store=None, start_time=None, end_time=None, **kwargs ): super().__init__( filename=filename, store=store, start_time=start_time, end_time=end_time, kwargs=kwargs, ) def forward(self, data=None, request=None): # TODO: Implement properly if request["store"] is not None: # get the file relative to the store store = request["store"] filename = request["filename"] # csv = pd.read_csv(io.StringIO(str(store[filename],'utf-8'))) csv = read_csv_with_store(store, filename) else: csv = pd.read_csv(request["filename"]) csv.set_index("time", inplace=True) csv.index = pd.to_datetime(csv.index, utc=True).tz_localize( None ) # TODO: change when xarray #3291 is fixed x = xr.DataArray(csv, dims=["time", "name"], name="CSV") try: unit_coords = [] name_coords = [] for name in x.coords["name"].values: unit = re.findall(r"\[(.*)\]", name)[0] name = re.sub(r"\[.*\]", "", name).lstrip().rstrip() name_coords.append(name) unit_coords.append(unit) x.coords["name"] = name_coords x = x.assign_coords({"unit": ("name", unit_coords)}) except: # TODO: add a warning or test explicitly if units exist pass if "start_time" not in request: request["start_time"] = x.coords["time"][0] if "end_time" not in request: request["end_time"] = x.coords["time"][-1] x = x.sel(time=slice(request["start_time"], request["end_time"])) return x def to_datetime(x): return pd.to_datetime(x, utc=True).tz_localize( None ) # TODO: change when xarray #3291 is fixed class BoundingBoxAnnotation(DataSource): def __init__( self, filename=None, store=None, converters={"start_time": to_datetime, "end_time": to_datetime}, **kwargs, ): super().__init__( filename=filename, store=store, converters=converters, kwargs=kwargs, ) def forward(self, data=None, request=None): if request["store"] is not None: csv = read_csv_with_store(request["store"], request["filename"]) else: csv = pd.read_csv(request["filename"]) targets = xr.DataArray(csv["__target"], dims=["index"], name="Annotation") for key in csv: if key == "__target": continue targets = targets.assign_coords({key: ("index", csv[key])}) for key in request["converters"]: if key in csv: converter = request["converters"][key] if not callable(converter): raise RuntimeError("Please provide a callable as column converter") targets = targets.assign_coords( {key: ("index", converter(targets[key]))} ) return targets def check_overlap(data0, data1, index_dim, dims=[]): """ checks the overlap of two lists of xarray indexers (dicts with slices per dimension). Both list must be sorted by the index_dim dimension! Arguments: data0 {list} -- First list of xarray indexers data1 {list} -- Second list of xarray indexers. Must be sorted by primarily by slice.start and secondary by slice.stop index_dim {string} -- Default dimension to iterate over. Indexers lists must be sorted by this dimension Keyword Arguments: dims {list} -- List of additional dimensions which should be checked for overlap (default: {[]}) Returns: list -- List of indices of which items overlap. [...,[i,j],...] where item i of indexers0 overlaps with item j of indexers1 """ overlap_indices = [] # print(data0.head()) num_overlaps = 0 start_idx = 0 for i in range(len(data0)): # data0_df = data0.iloc[i] data0_start = data0[i][index_dim].start data0_end = data0[i][index_dim].stop # print(data0_df['start_time']) ext = [] for j in range(start_idx, len(data1)): # data1_df = data1.iloc[j] data1_start = data1[j][index_dim].start data1_end = data1[j][index_dim].stop cond0 = data0_end < data1_start if cond0 == True: break # second condition: data0 is after data1, all items before data1 can be ignored (sorted list data0) cond1 = data0_start > data1_end if cond1: # This only holds if data1 is sorted by both start and end index start_idx = j if not (cond0 or cond1): # overlap on index dimension # check other dimensions overlap = True for dim in dims: if dim == index_dim: continue d0_start = data0[i][dim].start d0_end = data0[i][dim].stop d1_start = data1[j][dim].start d1_end = data1[j][dim].stop if (d0_end < d1_start) or (d0_start > d1_end): overlap = False if overlap: overlap_indices.append([int(i), int(j)]) return overlap_indices def get_dataset_slices(dims, dataset_slice, stride={}): # thanks to xbatcher: https://github.com/rabernat/xbatcher/ dim_slices = [] for dim in dims: # if dataset_slice is None: # segment_start = 0 # segment_end = ds.sizes[dim] # else: segment_start = dataset_slice[dim].start segment_end = dataset_slice[dim].stop size = dims[dim] _stride = stride.get(dim, size) if isinstance(dims[dim], pd.Timedelta) or isinstance(dims[dim], dt.timedelta): # TODO: change when xarray #3291 is fixed iterator = pd.date_range( segment_start, segment_end, freq=_stride ).tz_localize(None) segment_end = pd.to_datetime(segment_end).tz_localize(None) else: iterator = range(segment_start, segment_end, _stride) slices = [] # TODO include hopsize/overlapping windows for start in iterator: end = start + size if end <= segment_end: slices.append(slice(start, end)) dim_slices.append(slices) import itertools all_slices = [] for slices in itertools.product(*dim_slices): selector = {key: slice for key, slice in zip(dims, slices)} all_slices.append(selector) return np.array(all_slices) from torch.utils.data import Dataset import torch # from numba import jit # @jit(nopython=True) def annotations_to_slices(annotations): # build label slices label_coords = [] start_identifier = "start_" for coord in annotations.coords: # print(coord) if coord.startswith(start_identifier): label_coord = coord[len(start_identifier) :] if "end_" + label_coord in annotations.coords: label_coords += [label_coord] label_slices = [] for i in range(len(annotations)): # TODO: see if we can still optimzied this selector = {} for coord in label_coords: selector[coord] = slice( annotations["start_" + coord].values[i], annotations["end_" + coord].values[i], ) label_slices.append(selector) return label_coords, label_slices class SegmentedDataset(Dataset): def __init__( self, data, # TODO: currently it should be named data source label, index_dim="time", discard_empty=True, dataset_slice=None, batch_dims={}, segmentation_mode="segments", ): self.data = data self.label = label self.index_dim = index_dim self.discard_empty = discard_empty self.dataset_slice = dataset_slice self.batch_dims = batch_dims self.segmentation_mode = segmentation_mode def compute_label_list(self): """ discard_empty ... trim the dataset to the available labels dataset_slice: which part of the dataset to use from xarray documentation: [...] slices are treated as inclusive of both the start and stop values, unlike normal Python indexing. """ if self.dataset_slice is None or not isinstance(self.dataset_slice, dict): raise RuntimeError("No dataset_slice requested") # load annotation source and datasource # define an dataset index containing all indices of the datasource (e.g. timestamps or time period) which should be in this dataset # TODO: this is inefficient for large datasets if isinstance(self.label, StuettNode): l = self.label() else: l = self.label l = l.sortby([l["start_" + self.index_dim], l["end_" + self.index_dim]]) requested_coords = self.dataset_slice.keys() slices = get_dataset_slices(self.batch_dims, self.dataset_slice) label_coords, label_slices = annotations_to_slices(l) # filter out where we do not get data for the slice # TODO: Is there a way to not loop through the whole dataset? # and the whole label set? but still use xarray indexing label_dict = {} self.classes = [] if self.segmentation_mode == "segments": overlaps = check_overlap( slices, label_slices, self.index_dim, requested_coords ) for o in tqdm(overlaps): # if discard_empty and d.sel(slices[o[0]]).size == 0: if self.discard_empty: # we need to load every single piece to check if it is empty # TODO: loop through dims in batch_dim and check if they are correct try: if self.get_data(slices[o[0]]).size == 0: continue except Exception as e: print(e) continue # TODO: maybe this can be done faster (and cleaner) i = o[0] j = o[1] label = l[j].squeeze().values # print(label, type(label)) if l[j].notnull(): label = str(label) if label not in self.classes: self.classes.append(label) if i not in label_dict: label_dict[i] = {"indexers": slices[i], "labels": [label]} elif label not in label_dict[i]["labels"]: label_dict[i] = { "indexers": label_dict[i]["indexers"], "labels": label_dict[i]["labels"] + [label], } elif self.segmentation_mode == "points": for i in range(len(slices)): # x = d.sel(slices[i]) x = self.get_data(slices[i]) if x.size == 0: continue for j in range(len(label_slices)): # TODO: this might get really slow y = x.sel(label_slices[j]) if y.size > 0: # TODO: maybe this can be done faster (and cleaner) label = str(l[j].values) if label not in self.classes: self.classes.append(label) if i not in label_dict: label_dict[i] = { "indexers": slices[i], "labels": [label], } elif label not in label_dict[i]["labels"]: label_dict[i] = { "indexers": label_dict[i]["indexers"], "labels": label_dict[i]["labels"] + [label], } label_list = pd.DataFrame([label_dict[key] for key in label_dict]) self.classes = {class_name: i for i, class_name in enumerate(self.classes)} # print(label_list) return label_list def check(): from plotly.subplots import make_subplots import plotly.graph_objects as go fig = go.Figure( layout=dict( title=dict(text="A Bar Chart"), xaxis={"type": "date"}, xaxis_range=[ pd.to_datetime("2017-08-01"), pd.to_datetime("2017-08-06"), ], ) ) # fig = make_subplots(rows=3, cols=1, shared_xaxes=True, vertical_spacing = 0.) # fig.update_xaxes(range=[pd.to_datetime('2017-08-01'), pd.to_datetime('2017-08-03')]) # fig.update_yaxes(range=[0, 2]) for i, sl in enumerate([slices, label_slices, label_list]): for item in sl: label = "None" if "indexers" in item: label = item["labels"] item = item["indexers"] points = np.array( [ [pd.to_datetime(item["time"].start), 1], [pd.to_datetime(item["time"].stop), 0], ] ) # fig.add_shape( # # Line reference to the axes # go.layout.Shape( # type="rect", # xref="x", # yref="paper", # x0=pd.to_datetime(item['time'].start), # y0=0, # x1=pd.to_datetime(item['time'].stop), # y1=1, # fillcolor="LightSalmon", # opacity=0.5, # layer="below", # line_width=0, # # line=dict( # # color="LightSeaGreen", # # width=3, # # ), # )) fig.add_trace( go.Scatter( x=[ pd.to_datetime(item["time"].start), pd.to_datetime(item["time"].stop), pd.to_datetime(item["time"].stop), pd.to_datetime(item["time"].start), ], y=[0, 0, 1, 1], fill="toself", fillcolor="darkviolet", # marker={'size':0}, mode="lines", hoveron="points+fills", # select where hover is active line_color="darkviolet", showlegend=False, # line_width=0, opacity=0.5, text=str(label), hoverinfo="text+x+y", ) ) # fig.add_trace(go.Scatter(x=points[:,0], y=points[:,1], # fill=None, # mode='lines', # line_color=None, line_width=0, # name="hv", line_shape='hv', # showlegend=False, # hovertext=str(label), # hoveron = 'points+fills', # )) # fig.add_trace(go.Scatter( # x=points[:,0], # y=np.zeros_like(points[:,1]), # showlegend=False, # hovertext=str(label), # hoveron = 'points+fills', # fill='tonexty', # fill area between trace0 and trace1 # mode='lines', line_width=0)) fig.show() # TODO: verify that the segments have the same length? # TODO: get statistics to what was left out def get_data(self, indexers): # TODO: works only for datasources, not for chains if isinstance(self.data, StuettNode): request = i2r(indexers) d = self.data(request) return d.sel(indexers) else: # expecting a xarray d = self.data return d.sel(indexers) def __len__(self): raise NotImplementedError() return len(self.label_list) def __getitem__(self, idx): raise NotImplementedError() # if torch.is_tensor(idx): # idx = idx.tolist() # print(idx) # segment = self.label_list[idx] # print(segment) # indexers = segment["indexers"] # request = { # "start_time": segment["indexers"]["time"].start, # "end_time": segment["indexers"]["time"].stop, # } # data = configuration(self.data(delayed=True), request) # x = data.compute() # torch.Tensor(x.values) # print(x) # return segment # class PytorchDataset(DataSource): # TODO: extends pytorch dataset # def __init__(self, source=None): # """ Creates a pytorch like dataset from a data source and a label source. # Arguments: # DataSource {[type]} -- [description] # config {dict} -- configuration for labels # """ # super().__init__(source=source) # def build_dataset(self): # # load annotation source and datasource # # define an dataset index containing all indices of the datasource (e.g. timestamps or time period) which should be in this dataset # # go through dataset index and and check overlap of datasource indices and annotation indices # # generate new annotation set with regards to the datasourceindices (requires to generate and empty annotation set add new labels to the it) # # if wanted generate intermediate freeze results of datasource and annotations # # go through all items of the datasource # pass # def forward(self, data=None, request=None): # return x