management.py 52.4 KB
Newer Older
matthmey's avatar
black    
matthmey committed
1
"""MIT License
matthmey's avatar
matthmey committed
2

matthmey's avatar
matthmey committed
3
Copyright (c) 2019, Swiss Federal Institute of Technology (ETH Zurich), Matthias Meyer
matthmey's avatar
matthmey committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
matthmey's avatar
black    
matthmey committed
21
SOFTWARE."""
matthmey's avatar
matthmey committed
22

matthmey's avatar
matthmey committed
23
from ..global_config import get_setting, setting_exists, set_setting
matthmey's avatar
matthmey committed
24
25
from ..core.graph import StuettNode, configuration
from ..convenience import read_csv_with_store, to_csv_with_store, DirectoryStore
26

matthmey's avatar
matthmey committed
27
import os
28
29
30
31
32
33
34

import dask
import logging
import obspy
from obspy.clients.fdsn import Client
from obspy import UTCDateTime
from obsplus import obspy_to_array
matthmey's avatar
matthmey committed
35
from copy import deepcopy
36
import io
37

matthmey's avatar
matthmey committed
38
39
40
41
import zarr
import xarray as xr
from PIL import Image
import base64
matthmey's avatar
matthmey committed
42
import re
43

matthmey's avatar
matthmey committed
44
45
46
47
from pathlib import Path
import warnings

# TODO: revisit the following packages
48
49
import numpy as np
import pandas as pd
matthmey's avatar
matthmey committed
50
import datetime as dt
51
from pathlib import Path
matthmey's avatar
matthmey committed
52

53
54

class DataSource(StuettNode):
matthmey's avatar
matthmey committed
55
56
    def __init__(self, **kwargs):
        super().__init__(kwargs=kwargs)
57
58
59
60
61
62
63
64
65
66
67

    def __call__(self, data=None, request=None, delayed=False):
        if data is not None:
            if request is None:
                request = data
            else:
                warnings.warning(
                    "Two inputs (data, request) provided to the DataSource but it can only handle a request. Choosing request. "
                )

        # DataSource only require a request
matthmey's avatar
matthmey committed
68
        # Therefore merge permanent-config and request
69
70
71
72
        config = self.config.copy()  # TODO: do we need a deep copy?
        if request is not None:
            config.update(request)

matthmey's avatar
matthmey committed
73
        # TODO: change when rewriting for general indices
matthmey's avatar
matthmey committed
74
75
76
77
        if "start_time" in config and config["start_time"] is not None:
            config["start_time"] = pd.to_datetime(
                config["start_time"], utc=True
            ).tz_localize(
matthmey's avatar
matthmey committed
78
79
                None
            )  # TODO: change when xarray #3291 is fixed
matthmey's avatar
matthmey committed
80
81
82
83
        if "end_time" in config and config["end_time"] is not None:
            config["end_time"] = pd.to_datetime(
                config["end_time"], utc=True
            ).tz_localize(
matthmey's avatar
matthmey committed
84
                None
matthmey's avatar
matthmey committed
85
86
            )  # TODO: change when xarray #3291 is fixed

87
        if delayed:
matthmey's avatar
matthmey committed
88
            return dask.delayed(self.forward)(None, config)
89
        else:
matthmey's avatar
matthmey committed
90
            return self.forward(None, config)
91

matthmey's avatar
matthmey committed
92
    def configure(self, requests=None):
93
94
95
96
97
98
99
100
101
        """ Default configure for DataSource nodes
            Same as configure from StuettNode but adds is_source flag

        Arguments:
            request {list} -- List of requests

        Returns:
            dict -- Original request or merged requests 
        """
matthmey's avatar
matthmey committed
102
        requests = super().configure(requests)  # merging request here
matthmey's avatar
matthmey committed
103
        requests["requires_request"] = True
104
105
106

        return requests

matthmey's avatar
matthmey committed
107

108
109
class GSNDataSource(DataSource):
    def __init__(
matthmey's avatar
matthmey committed
110
111
112
113
114
115
116
        self,
        deployment=None,
        vsensor=None,
        position=None,
        start_time=None,
        end_time=None,
        **kwargs,
117
    ):
matthmey's avatar
matthmey committed
118
119
120
121
122
123
124
125
        super().__init__(
            deployment=deployment,
            position=position,
            vsensor=vsensor,
            start_time=start_time,
            end_time=end_time,
            kwargs=kwargs,
        )
126

matthmey's avatar
matthmey committed
127
    def forward(self, data=None, request=None):
matthmey's avatar
matthmey committed
128
        # code from Samuel Weber
129
        #### 1 - DEFINE VSENSOR-DEPENDENT COLUMNS ####
matthmey's avatar
matthmey committed
130
        metadata = Path(get_setting("metadata_directory")).joinpath(
matthmey's avatar
black    
matthmey committed
131
132
133
            "vsensor_metadata/{:s}_{:s}.csv".format(
                request["deployment"], request["vsensor"]
            )
134
        )
matthmey's avatar
black    
matthmey committed
135
136
137
        # if not metadata.exists():

        colnames = pd.read_csv(metadata, skiprows=0,)
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
        columns_old = colnames["colname_old"].values
        columns_new = colnames["colname_new"].values
        columns_unit = colnames["unit"].values
        if len(columns_old) != len(columns_new):
            warnings.warn(
                "WARNING: Length of 'columns_old' ({:d}) is not equal length of  'columns_new' ({:d})".format(
                    len(columns_old), len(columns_new)
                )
            )
        if len(columns_old) != len(columns_unit):
            warnings.warn(
                "WARNING: Length of 'columns_old' ({:d}) is not equal length of  'columns_new' ({:d})".format(
                    len(columns_old), len(columns_unit)
                )
            )

        unit = dict(zip(columns_new, columns_unit))
        #### 2 - DEFINE CONDITIONS AND CREATE HTTP QUERY ####

        # Set server
        server = get_setting("permasense_server")

        # Create virtual_sensor
        virtual_sensor = request["deployment"] + "_" + request["vsensor"]

        # Create query and add time as well as position selection
        query = (
            "vs[1]={:s}"
            "&time_format=iso"
            "&timeline=generation_time"
            "&field[1]=All"
            "&from={:s}"
            "&to={:s}"
            "&c_vs[1]={:s}"
            "&c_join[1]=and"
            "&c_field[1]=position"
            "&c_min[1]={:02d}"
            "&c_max[1]={:02d}"
        ).format(
            virtual_sensor,
matthmey's avatar
matthmey committed
178
179
180
181
            pd.to_datetime(request["start_time"], utc=True).strftime(
                "%d/%m/%Y+%H:%M:%S"
            ),
            pd.to_datetime(request["end_time"], utc=True).strftime("%d/%m/%Y+%H:%M:%S"),
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
            virtual_sensor,
            int(request["position"]) - 1,
            request["position"],
        )

        # query extension for images
        if request["vsensor"] == "binary__mapped":
            query = (
                query
                + "&vs[2]={:s}&field[2]=relative_file&c_join[2]=and&c_vs[2]={:s}&c_field[2]=file_complete&c_min[2]=0&c_max[2]=1&vs[3]={:s}&field[3]=generation_time&c_join[3]=and&c_vs[3]={:s}&c_field[3]=file_size&c_min[3]=2000000&c_max[3]=%2Binf&download_format=csv".format(
                    virtual_sensor, virtual_sensor, virtual_sensor, virtual_sensor
                )
            )

        # Construct url:
        url = server + "multidata?" + query
        # if self.verbose:
        #     print('The GSN http-query is:\n{:s}'.format(url))

        #### 3 - ACCESS DATA AND CREATE PANDAS DATAFRAME ####
        d = []
        d = pd.read_csv(
            url, skiprows=2
        )  # skip header lines (first 2) for import: skiprows=2
        df = pd.DataFrame(columns=columns_new)
matthmey's avatar
matthmey committed
207
        df.time = pd.to_datetime(d.generation_time, utc=True)
208
209
210
211

        # if depo in ['mh25', 'mh27old', 'mh27new', 'mh30', 'jj22', 'jj25', 'mh-nodehealth']:
        # d = d.convert_objects(convert_numeric=True)  # TODO: rewrite to remove warning
        for k in list(df):
matthmey's avatar
matthmey committed
212
            df[k] = pd.to_numeric(df[k], errors="ignore")
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234

        #        df = pd.DataFrame(columns=columns_new)
        #        df.time = timestamp2datetime(d['generation_time']/1000)
        for i in range(len(columns_old)):
            if columns_new[i] != "time":
                setattr(df, columns_new[i], getattr(d, columns_old[i]))

        df = df.sort_values(by="time")

        # Remove columns with names 'delete'
        try:
            df.drop(["delete"], axis=1, inplace=True)
        except:
            pass

        # Remove columns with only 'null'
        df = df.replace(r"null", np.nan, regex=True)
        isnull = df.isnull().all()
        [df.drop([col_name], axis=1, inplace=True) for col_name in df.columns[isnull]]

        df = df.set_index("time")
        df = df.sort_index(axis=1)
matthmey's avatar
matthmey committed
235

236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
        x = xr.DataArray(df, dims=["time", "name"], name="CSV")

        try:
            unit_coords = []
            for name in x.coords["name"].values:
                # name = re.sub(r"\[.*\]", "", name).lstrip().rstrip()
                u = unit[str(name)]
                u = re.findall(r"\[(.*)\]", u)[0]

                # name_coords.append(name)
                unit_coords.append(u)

            x = x.assign_coords({"unit": ("name", unit_coords)})
        except:
            # TODO: add a warning or test explicitly if units exist
            pass

        return x

255
256

class SeismicSource(DataSource):
matthmey's avatar
matthmey committed
257
258
    def __init__(
        self,
259
        path=None,
260
        store=None,
matthmey's avatar
matthmey committed
261
262
263
264
265
266
267
268
        station=None,
        channel=None,
        start_time=None,
        end_time=None,
        use_arclink=False,
        return_obspy=False,
        **kwargs,
    ):  # TODO: update description
269
270
271
272
        """ Seismic data source to get data from permasense
            The user can predefine the source's settings or provide them in a request
            Predefined setting should never be updated (to stay thread safe), but will be ignored
            if request contains settings
matthmey's avatar
matthmey committed
273
274
275
276
277

        Keyword Arguments:
            config {dict}       -- Configuration for the seismic source (default: {{}})
            use_arclink {bool}  -- If true, downloads the data from the arclink service (authentication required) (default: {False})
            return_obspy {bool} -- By default an xarray is returned. If true, an obspy stream will be returned (default: {False})
278
        """
matthmey's avatar
matthmey committed
279
        super().__init__(
280
            path=path,
281
            store=store,
matthmey's avatar
matthmey committed
282
283
284
285
286
287
288
289
            station=station,
            channel=channel,
            start_time=start_time,
            end_time=end_time,
            use_arclink=use_arclink,
            return_obspy=return_obspy,
            kwargs=kwargs,
        )
290

matthmey's avatar
matthmey committed
291
    def forward(self, data=None, request=None):
292
        config = request
293

matthmey's avatar
matthmey committed
294
        if config["use_arclink"]:
295
296
297
298
299
300
301
302
            try:
                arclink = get_setting("arclink")
            except KeyError as err:
                raise RuntimeError(
                    f"The following error occured \n{err}. "
                    "Please provide either the credentials to access arclink or a path to the dataset"
                )

matthmey's avatar
matthmey committed
303
304
            arclink_user = arclink["user"]
            arclink_password = arclink["password"]
matthmey's avatar
matthmey committed
305
            fdsn_client = Client(
matthmey's avatar
matthmey committed
306
307
308
309
                base_url="http://arclink.ethz.ch",
                user=arclink_user,
                password=arclink_password,
            )
matthmey's avatar
matthmey committed
310
            x = fdsn_client.get_waveforms(
matthmey's avatar
matthmey committed
311
312
313
314
315
316
317
318
                network="4D",
                station=config["station"],
                location="A",
                channel=config["channel"],
                starttime=UTCDateTime(config["start_time"]),
                endtime=UTCDateTime(config["end_time"]),
                attach_response=True,
            )
319

matthmey's avatar
matthmey committed
320
            # TODO: potentially resample
matthmey's avatar
matthmey committed
321

matthmey's avatar
matthmey committed
322
        else:  # 20180914 is last full day available in permasense_vault
323
            # logging.info('Loading seismic with fdsn')
matthmey's avatar
matthmey committed
324
            x = self.get_obspy_stream(
325
                config,
326
                config["path"],
matthmey's avatar
matthmey committed
327
328
329
330
331
332
                config["start_time"],
                config["end_time"],
                config["station"],
                config["channel"],
            )

333
334
335
336
337
        x = x.slice(UTCDateTime(config["start_time"]), UTCDateTime(config["end_time"]))

        # TODO: remove response x.remove_response(output=vel)
        # x = self.process_seismic_data(x)

matthmey's avatar
matthmey committed
338
        if not config["return_obspy"]:
matthmey's avatar
matthmey committed
339
340
            x = obspy_to_array(x)

341
            # we assume that all starttimes are equal
matthmey's avatar
matthmey committed
342
343
            starttime = x.starttime.values.reshape((-1,))[0]
            for s in x.starttime.values.reshape((-1,)):
344
345
346
347
348
                if s != starttime:
                    raise RuntimeError(
                        "Please make sure that starttime of each seimsic channel is equal"
                    )

matthmey's avatar
matthmey committed
349
            # change time coords from relative to absolute time
350
            starttime = obspy.UTCDateTime(starttime).datetime
matthmey's avatar
matthmey committed
351
352
353
            starttime = pd.to_datetime(starttime, utc=True).tz_localize(
                None
            )  # TODO: change when xarray #3291 is fixed
matthmey's avatar
matthmey committed
354
355
            timedeltas = pd.to_timedelta(x["time"].values, unit="seconds")
            xt = starttime + timedeltas
matthmey's avatar
matthmey committed
356
357
358
            x["time"] = pd.to_datetime(xt, utc=True).tz_localize(
                None
            )  # TODO: change when xarray #3291 is fixed
matthmey's avatar
matthmey committed
359
            del x.attrs["stats"]
360
361
362

        return x

363
364
365
366
367
368
369
370
    def process_seismic_data(
        self,
        stream,
        remove_response=True,
        unit="VEL",
        station_inventory=None,
        detrend=True,
        taper=False,
matthmey's avatar
matthmey committed
371
        pre_filt=(0.025, 0.05, 45.0, 49.0),
372
373
374
        water_level=60,
        apply_filter=True,
        freqmin=0.002,
matthmey's avatar
matthmey committed
375
        freqmax=50,
376
377
378
379
380
381
382
383
384
385
        resample=False,
        resample_rate=250,
        rotation_angle=None,
    ):
        # author: Samuel Weber
        if station_inventory is None:
            station_inventory = Path(get_setting("metadata_directory")).joinpath(
                "inventory_stations__MH.xml"
            )

386
        # print(station_inventory)
387
388
389
390
391
        inv = obspy.read_inventory(str(station_inventory))
        # st = stream.copy()
        st = stream
        st.attach_response(inv)

392
        if detrend:
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
            st.detrend("demean")
            st.detrend("linear")

        if taper:
            st.taper(max_percentage=0.05)

        if remove_response:
            if hasattr(st[0].stats, "response"):
                st.remove_response(
                    output=unit,
                    pre_filt=pre_filt,
                    plot=False,
                    zero_mean=False,
                    taper=False,
                    water_level=water_level,
                )
            else:
                st.remove_response(
                    output=unit,
                    inventory=inv,
                    pre_filt=pre_filt,
                    plot=False,
                    zero_mean=False,
                    taper=False,
                    water_level=water_level,
                )

420
        if detrend:
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
            st.detrend("demean")
            st.detrend("linear")

        if taper:
            st.taper(max_percentage=0.05)

        if apply_filter:
            st.filter("bandpass", freqmin=freqmin, freqmax=freqmax)

        if resample:
            st.resample(resample_rate)

        if rotation_angle is None:
            rotation_angle = np.nan
        if not np.isnan(rotation_angle):
            st.rotate("NE->RT", back_azimuth=int(rotation_angle), inventory=inv)

        return st

matthmey's avatar
matthmey committed
440
441
    def get_obspy_stream(
        self,
442
        request,
443
        path,
matthmey's avatar
matthmey committed
444
445
446
447
448
449
450
451
        start_time,
        end_time,
        station,
        channels,
        pad=False,
        verbose=False,
        fill=0,
        fill_sampling_rate=1000,
452
        old_stationname=False,
matthmey's avatar
matthmey committed
453
    ):
454
455
456
457
        """    
        Loads the microseismic data for the given timeframe into a miniseed file.

        Arguments:
matthmey's avatar
matthmey committed
458
459
            start_time {datetime} -- start timestamp of the desired obspy stream
            end_time {datetime} -- end timestamp of the desired obspy stream
460
461
462
463
464
465
466
467
468
469
470
        
        Keyword Arguments:
            pad {bool} -- If padding is true, the data will be zero padded if the data is not consistent
            fill {} -- If numpy.nan or fill value: error in the seismic stream will be filled with the value. If None no fill will be used
            verbose {bool} -- If info should be printed

        Returns:
            obspy stream -- obspy stream with up to three channels
                            the stream's channels will be sorted alphabetically
        """

matthmey's avatar
matthmey committed
471
        if not isinstance(channels, list):
472
473
474
            channels = [channels]

        # We will get the full hours seismic data and trim it to the desired length afterwards
matthmey's avatar
matthmey committed
475
476
477
478
        tbeg_hours = pd.to_datetime(start_time).replace(
            minute=0, second=0, microsecond=0
        )
        timerange = pd.date_range(start=tbeg_hours, end=end_time, freq="H")
479

matthmey's avatar
matthmey committed
480
        non_existing_files_ts = []  # keep track of nonexisting files
481
482
483
484
485
486
487
488
489
490

        # drawback of memmap files is that we need to calculate the size beforehand
        stream = obspy.Stream()

        idx = 0
        ## loop through all hours
        for i in range(len(timerange)):
            # start = time.time()
            h = timerange[i]

matthmey's avatar
matthmey committed
491
            st_list = obspy.Stream()
492

493
            datayear = timerange[i].strftime("%Y")
494
495
496
497
            if old_stationname:
                station = (
                    "MHDL" if station == "MH36" else "MHDT"
                )  # TODO: do not hardcode it
498
499
            filenames = {}
            for channel in channels:
500
501
502
503
504
505
506
                # filenames[channel] = Path(station).joinpath(
                #     datayear,
                #     "%s.D/" % channel,
                #     "4D.%s.A.%s.D." % (station, channel)
                #     + timerange[i].strftime("%Y%m%d_%H%M%S")
                #     + ".miniseed",
                # )
matthmey's avatar
black    
matthmey committed
507
508
                filenames[channel] = [
                    station,
509
                    datayear,
510
                    "%s.D" % channel,
511
                    "4D.%s.A.%s.D." % (station, channel)
matthmey's avatar
matthmey committed
512
                    + timerange[i].strftime("%Y%m%d_%H%M%S")
matthmey's avatar
black    
matthmey committed
513
514
                    + ".miniseed",
                ]
matthmey's avatar
matthmey committed
515
                # print(filenames[channel])
516

517
                # Load either from store or from filename
matthmey's avatar
black    
matthmey committed
518
                if request["store"] is not None:
519
520
                    # get the file relative to the store
                    store = request["store"]
matthmey's avatar
black    
matthmey committed
521
                    filename = "/".join(filenames[channel])
522
523
524
525
526
527
528
                    st = obspy.read(io.BytesIO(store[str(filename)]))
                else:
                    if not Path(path).isdir():
                        # TODO: should this be an error or only a warning. In a period execution this could stop the whole script
                        raise IOError(
                            "Cannot find the path {}. Please provide a correct path to the permasense geophone directory".format(
                                datadir
matthmey's avatar
matthmey committed
529
530
                            )
                        )
531
532
533
                    datadir = Path(path)
                    if not datadir.joinpath(*filenames[channel]).exists():
                        non_existing_files_ts.append(timerange[i])
534

535
536
537
538
                        warnings.warn(
                            RuntimeWarning(
                                "Warning file not found for {} {}".format(
                                    timerange[i], datadir.joinpath(filenames[channel])
matthmey's avatar
matthmey committed
539
540
541
                                )
                            )
                        )
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560

                        if fill is not None:
                            # create empty stream of fill value
                            arr = (
                                np.ones(
                                    (
                                        int(
                                            np.ceil(
                                                fill_sampling_rate
                                                * dt.timedelta(hours=1).total_seconds()
                                            )
                                        ),
                                    )
                                )
                                * fill
                            )
                            st = obspy.Stream([obspy.Trace(arr)])
                        else:
                            continue
561
                    else:
562
563
                        filename = str(datadir.joinpath(*filenames[channel]))
                        st = obspy.read(filename)
564
565
566

                st_list += st

matthmey's avatar
matthmey committed
567
568
569
570
571
572
573
574
575
576
577
            stream_h = st_list.merge(method=0, fill_value=fill)
            start_time_0 = start_time if i == 0 else h
            end_time_0 = (
                end_time if i == len(timerange) - 1 else h + dt.timedelta(hours=1)
            )
            segment_h = stream_h.trim(
                obspy.UTCDateTime(start_time_0),
                obspy.UTCDateTime(end_time_0),
                pad=pad,
                fill_value=fill,
            )
578
579
580
581
            stream += segment_h

        stream = stream.merge(method=0, fill_value=fill)

matthmey's avatar
matthmey committed
582
583
584
        stream.sort(
            keys=["channel"]
        )  # TODO: change this so that the order of the input channels list is maintained
585
586
587
588

        return stream


589
class MHDSLRFilenames(DataSource):
matthmey's avatar
matthmey committed
590
    def __init__(
matthmey's avatar
black    
matthmey committed
591
592
593
594
595
596
597
        self,
        base_directory=None,
        store=None,
        method="directory",
        start_time=None,
        end_time=None,
        force_write_to_remote=False,
matthmey's avatar
matthmey committed
598
599
600
601
602
603
604
605
606
607
608
609
    ):
        """ Fetches the DSLR images from the Matterhorn deployment, returns the image
            filename(s) corresponding to the end and start time provided in either the
            config dict or as a request to the __call__() function.          
        
        Arguments:
            StuettNode {[type]} -- [description]

        Keyword Arguments:
            base_directory {[type]} -- [description]
            method {str}     -- [description] (default: {'directory'})
        """
matthmey's avatar
matthmey committed
610
611
        super().__init__(
            base_directory=base_directory,
matthmey's avatar
black    
matthmey committed
612
            store=store,
matthmey's avatar
matthmey committed
613
614
615
            method=method,
            start_time=start_time,
            end_time=end_time,
matthmey's avatar
matthmey committed
616
            force_write_to_remote=force_write_to_remote,
matthmey's avatar
matthmey committed
617
        )
618

matthmey's avatar
matthmey committed
619
    def forward(self, data=None, request=None):
matthmey's avatar
matthmey committed
620
621
622
623
        """Retrieves the images for the selected time period from the server. If only a start_time timestamp is provided, 
          the file with the corresponding date will be loaded if available. For periods (when start and end time are given) 
          all available images are indexed first to provide an efficient retrieval.
        
624
        Arguments:
matthmey's avatar
matthmey committed
625
626
627
628
629
630
631
            start_time {datetime} -- If only start_time is given the neareast available image is return. If also end_time is provided the a dataframe is returned containing image filenames from the first image after start_time until the last image before end_time.
        
        Keyword Arguments:
            end_time {datetime} -- end time of the selected period. see start_time for a description. (default: {None})
        
        Returns:
            dataframe -- Returns containing the image filenames of the selected period.
632
        """
633
        config = request
matthmey's avatar
matthmey committed
634
635
636
637
638
639
        methods = ["directory", "web"]
        if config["method"].lower() not in methods:
            raise RuntimeError(
                f"The {config['method']} output_format is not supported. Allowed formats are {methods}"
            )

matthmey's avatar
black    
matthmey committed
640
641
642
        if (
            config["base_directory"] is None and config["store"] is None
        ) and output_format.lower() != "web":
matthmey's avatar
matthmey committed
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
            raise RuntimeError("Please provide a base_directory containing the images")

        if config["method"].lower() == "web":  # TODO: implement
            raise NotImplementedError("web fetch has not been implemented yet")

        start_time = config["start_time"]
        end_time = config["end_time"]

        if start_time is None:
            raise RuntimeError("Please provide a least start time")

        # if it is not timezone aware make it
        # if start_time.tzinfo is None:
        #     start_time = start_time.replace(tzinfo=timezone.utc)

        # If there is no tmp_dir we can try to load the file directly, otherwise
        # there will be a warning later in this function and the user should
        # set a tmp_dir
        if end_time is None and (
            not setting_exists("user_dir") or not os.path.isdir(get_setting("user_dir"))
        ):
            image_filename = self.get_image_filename(start_time)
            if image_filename is not None:
                return image_filename

        # If we have already loaded the dataframe in the current session we can use it
        if setting_exists("image_list_df"):
            imglist_df = get_setting("image_list_df")
        else:
matthmey's avatar
matthmey committed
672
673
674
675
676
            filename = "image_integrity.csv"
            success = False
            # first try to load it from remote via store
            if config["store"] is not None:
                if filename in config["store"]:
matthmey's avatar
black    
matthmey committed
677
                    imglist_df = read_csv_with_store(config["store"], filename)
matthmey's avatar
matthmey committed
678
                    success = True
matthmey's avatar
black    
matthmey committed
679
                elif config["force_write_to_remote"]:
matthmey's avatar
matthmey committed
680
681
682
                    # try to reload it and write to remote
                    imglist_df = self.image_integrity_store(config["store"])
                    try:
matthmey's avatar
matthmey committed
683
                        to_csv_with_store(config["store"], filename, imglist_df, dict(index=False))
matthmey's avatar
matthmey committed
684
685
686
687
688
                        success = True
                    except Exception as e:
                        print(e)

            # Otherwise we need to load the filename dataframe from disk
matthmey's avatar
black    
matthmey committed
689
690
691
692
693
694
            if (
                not success
                and setting_exists("user_dir")
                and os.path.isdir(get_setting("user_dir"))
            ):
                imglist_filename = os.path.join(get_setting("user_dir"), "") + filename
matthmey's avatar
matthmey committed
695
696
697
698

                # If it does not exist in the temporary folder of our application
                # We are going to create it
                if os.path.isfile(imglist_filename):
matthmey's avatar
matthmey committed
699
700
701
                    # imglist_df = pd.read_parquet(
                    #     imglist_filename
                    # )  # TODO: avoid too many different formats
matthmey's avatar
black    
matthmey committed
702
                    imglist_df = pd.read_csv(imglist_filename)
matthmey's avatar
matthmey committed
703
704
705
                else:
                    # we are going to load the full list => no arguments
                    imglist_df = self.image_integrity(config["base_directory"])
matthmey's avatar
matthmey committed
706
                    # imglist_df.to_parquet(imglist_filename)
matthmey's avatar
black    
matthmey committed
707
                    imglist_df.to_csv(imglist_filename, index=False)
matthmey's avatar
matthmey committed
708
            elif not success:
matthmey's avatar
matthmey committed
709
710
711
712
713
714
715
716
717
                # if there is no tmp_dir we can load the image list but
                # we should warn the user that this is inefficient
                imglist_df = self.image_integrity(config["base_directory"])
                warnings.warn(
                    "No temporary directory was set. You can speed up multiple runs of your application by setting a temporary directory"
                )

            # TODO: make the index timezone aware
            imglist_df.set_index("start_time", inplace=True)
matthmey's avatar
matthmey committed
718
719
720
            imglist_df.index = pd.to_datetime(imglist_df.index, utc=True).tz_localize(
                None
            )  # TODO: change when xarray #3291 is fixed
matthmey's avatar
matthmey committed
721
722
723
724
725
726
727
            imglist_df.sort_index(inplace=True)

            set_setting("image_list_df", imglist_df)

        if end_time is None:
            if start_time < imglist_df.index[0]:
                start_time = imglist_df.index[0]
728

matthmey's avatar
matthmey committed
729
730
731
732
733
734
            return imglist_df.iloc[
                imglist_df.index.get_loc(start_time, method="nearest")
            ]
        else:
            # if end_time.tzinfo is None:
            #     end_time = end_time.replace(tzinfo=timezone.utc)
735
736
737
            if start_time > imglist_df.index[-1] or end_time < imglist_df.index[0]:
                # return empty dataframe
                return imglist_df[0:0]
matthmey's avatar
matthmey committed
738
739
740
741
742
743
744
745
746
747
748
749
750

            if start_time < imglist_df.index[0]:
                start_time = imglist_df.index[0]
            if end_time > imglist_df.index[-1]:
                end_time = imglist_df.index[-1]

            return imglist_df.iloc[
                imglist_df.index.get_loc(
                    start_time, method="bfill"
                ) : imglist_df.index.get_loc(end_time, method="ffill")
                + 1
            ]

matthmey's avatar
matthmey committed
751
752
753
    # TODO: write test for image_integrity_store
    def image_integrity_store(
        self, store, start_time=None, end_time=None, delta_seconds=0
matthmey's avatar
matthmey committed
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
    ):
        """ Checks which images are available on the permasense server
        
        Keyword Arguments:
            start_time {[type]} -- datetime object giving the lower bound of the time range which should be checked. 
                                   If None there is no lower bound. (default: {None})
            end_time {[type]} --   datetime object giving the upper bound of the time range which should be checked.
                                   If None there is no upper bound (default: {None})
            delta_seconds {int} -- Determines the 'duration' of an image in the output dataframe.
                                   start_time  = image_time+delta_seconds
                                   end_time    = image_time-delta_seconds (default: {0})
        
        Returns:
            DataFrame -- Returns a pandas dataframe with a list containing filename relative to self.base_directory, 
                         start_time and end_time start_time and end_time can vary depending on the delta_seconds parameter
        """
        """ Checks which images are available on the permasense server
771

matthmey's avatar
matthmey committed
772
773
774
775
776
777
778
779
780
781
        Arguments:
            start_time:   
            end_time:   
            delta_seconds:  Determines the 'duration' of an image in the output dataframe.
                            start_time  = image_time+delta_seconds
                            end_time    = image_time-delta_seconds
        Returns:
            DataFrame -- 
        """
        if start_time is None:
matthmey's avatar
black    
matthmey committed
782
783
            # a random year which is before permasense installation started
            start_time = dt.datetime(1900, 1, 1)
matthmey's avatar
matthmey committed
784
785
786
787
788
789
790
791
792
793
        if end_time is None:
            end_time = dt.datetime.utcnow()

        tbeg_days = start_time.replace(hour=0, minute=0, second=0)
        tend_days = end_time.replace(hour=23, minute=59, second=59)

        delta_t = dt.timedelta(seconds=delta_seconds)
        num_filename_errors = 0
        images_list = []

matthmey's avatar
matthmey committed
794
795
796
797
798
799
800
801
        for key in store.keys():
            try:
                pathkey = Path(key)
                datekey = pathkey.parent.name
                dir_date = pd.to_datetime(str(datekey), format="%Y-%m-%d")
            except:
                # we do not care for files not matching our format
                continue
matthmey's avatar
black    
matthmey committed
802

matthmey's avatar
matthmey committed
803
804
            if pd.isnull(dir_date):
                continue
matthmey's avatar
matthmey committed
805
806
807
808
809

            # limit the search to the explicit time range
            if dir_date < tbeg_days or dir_date > tend_days:
                continue

matthmey's avatar
matthmey committed
810
811
812
813
814
815
816
817
818
819
820
821
822
823
            # print(file.stem)
            start_time_str = pathkey.stem
            try:
                _start_time = pd.to_datetime(start_time_str, format="%Y%m%d_%H%M%S")
                if start_time <= _start_time and _start_time <= end_time:
                    images_list.append(
                        {
                            "filename": str(key),
                            "start_time": _start_time - delta_t,
                            "end_time": _start_time + delta_t,
                        }
                    )
            except ValueError:
                # try old naming convention
matthmey's avatar
matthmey committed
824
                try:
matthmey's avatar
matthmey committed
825
826
827
                    start_time = pd.to_datetime(start_time_str, format="%Y%m%d_%H%M%S")

                    if start_time <= _start_time and _start_time <= end_time:
matthmey's avatar
matthmey committed
828
829
                        images_list.append(
                            {
matthmey's avatar
black    
matthmey committed
830
                                "filename": str(img_file.relative_to(base_directory)),
matthmey's avatar
matthmey committed
831
832
                                "start_time": _start_time - delta_t,
                                "end_time": _start_time + delta_t,
matthmey's avatar
matthmey committed
833
834
835
                            }
                        )
                except ValueError:
matthmey's avatar
matthmey committed
836
837
838
839
840
841
                    num_filename_errors += 1
                    warnings.warn(
                        "Permasense data integrity, the following is not a valid image filename and will be ignored: %s"
                        % img_file
                    )
                    continue
842

matthmey's avatar
matthmey committed
843
844
845
846
847
848
849
850
        segments = pd.DataFrame(images_list)
        segments.drop_duplicates(inplace=True, subset="start_time")
        segments.start_time = pd.to_datetime(segments.start_time, utc=True)
        segments.end_time = pd.to_datetime(segments.end_time, utc=True)
        segments.sort_values("start_time")

        return segments

matthmey's avatar
matthmey committed
851
852
853
854
    def image_integrity(
        self, base_directory, start_time=None, end_time=None, delta_seconds=0
    ):
        store = DirectoryStore(base_directory)
matthmey's avatar
black    
matthmey committed
855
        return self.image_integrity_store(store, start_time, end_time, delta_seconds)
matthmey's avatar
matthmey committed
856

matthmey's avatar
matthmey committed
857
858
859
860
    def get_image_filename(self, timestamp):
        """ Checks wether an image exists for exactly the time of timestamp and returns its filename

            timestamp: datetime object for which the filename should be returned
861

matthmey's avatar
matthmey committed
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
        # Returns
            The filename if the file exists, None if there is no file
        """

        datadir = self.base_directory
        new_filename = (
            datadir
            + timestamp.strftime("%Y-%m-%d")
            + "/"
            + timestamp.strftime("%Y%m%d_%H%M%S")
            + ".JPG"
        )
        old_filename = (
            datadir
            + timestamp.strftime("%Y-%m-%d")
            + "/"
            + timestamp.strftime("%Y-%m-%d_%H%M%S")
            + ".JPG"
        )

        if os.path.isfile(new_filename):
            return new_filename
        elif os.path.isfile(old_filename):
            return old_filename
        else:
            return None

    def get_nearest_image_url(self, IMGparams, imgdate, floor=False):
        if floor:
            date_beg = imgdate - dt.timedelta(hours=4)
            date_end = imgdate
        else:
            date_beg = imgdate
            date_end = imgdate + dt.timedelta(hours=4)

        vs = []
        # predefine vs list
        field = []
        # predefine field list
        c_vs = []
        c_field = []
        c_join = []
        c_min = []
        c_max = []

        vs = vs + ["matterhorn_binary__mapped"]
        field = field + ["ALL"]
        # select only data from one sensor (position 3)
        c_vs = c_vs + ["matterhorn_binary__mapped"]
        c_field = c_field + ["position"]
        c_join = c_join + ["and"]
        c_min = c_min + ["18"]
        c_max = c_max + ["20"]

        c_vs = c_vs + ["matterhorn_binary__mapped"]
        c_field = c_field + ["file_complete"]
        c_join = c_join + ["and"]
        c_min = c_min + ["0"]
        c_max = c_max + ["1"]

        # create url which retrieves the csv data file
        url = "http://data.permasense.ch/multidata?"
        url = url + "time_format=" + "iso"
        url = url + "&from=" + date_beg.strftime("%d/%m/%Y+%H:%M:%S")
        url = url + "&to=" + date_end.strftime("%d/%m/%Y+%H:%M:%S")
        for i in range(0, len(vs), 1):
            url = url + "&vs[%d]=%s" % (i, vs[i])
            url = url + "&field[%d]=%s" % (i, field[i])

        for i in range(0, len(c_vs), 1):
            url = url + "&c_vs[%d]=%s" % (i, c_vs[i])
            url = url + "&c_field[%d]=%s" % (i, c_field[i])
            url = url + "&c_join[%d]=%s" % (i, c_join[i])
            url = url + "&c_min[%d]=%s" % (i, c_min[i])
            url = url + "&c_max[%d]=%s" % (i, c_max[i])

        url = url + "&timeline=%s" % ("generation_time")

        # print(url)
        d = pd.read_csv(url, skiprows=2)

        # print(d)

        # print(type(d['#data'].values))
        d["data"] = [s.replace("&amp;", "&") for s in d["data"].values]

        d.sort_values(by="generation_time")
        d["generation_time"] = pd.to_datetime(d["generation_time"], utc=True)

        if floor:
            data_str = d["data"].iloc[0]
            data_filename = d["relative_file"].iloc[0]
            # print(d['generation_time'].iloc[0])
            img_timestamp = d["generation_time"].iloc[0]
        else:
            data_str = d["data"].iloc[-1]
            data_filename = d["relative_file"].iloc[-1]
            # print(d['generation_time'].iloc[-1])
            img_timestamp = d["generation_time"].iloc[-1]

        file_extension = data_filename[-3:]
        base_url = "http://data.permasense.ch"
        # print(base_url + data_str)

        return base_url + data_str, img_timestamp, file_extension


class MHDSLRImages(MHDSLRFilenames):
    def __init__(
        self,
        base_directory=None,
973
        store=None,
matthmey's avatar
matthmey committed
974
975
976
977
978
979
980
        method="directory",
        output_format="xarray",
        start_time=None,
        end_time=None,
    ):
        super().__init__(
            base_directory=base_directory,
981
            store=store,
matthmey's avatar
matthmey committed
982
983
984
985
986
987
            method=method,
            start_time=start_time,
            end_time=end_time,
        )
        self.config["output_format"] = output_format

matthmey's avatar
matthmey committed
988
    def forward(self, data=None, request=None):
989
        filenames = super().forward(request=request)
matthmey's avatar
matthmey committed
990

991
        if request["output_format"] is "xarray":
matthmey's avatar
matthmey committed
992
            return self.construct_xarray(filenames)
993
        elif request["output_format"] is "base64":
matthmey's avatar
matthmey committed
994
995
996
997
            return self.construct_base64(filenames)
        else:
            output_formats = ["xarray", "base64"]
            raise RuntimeError(
998
                f"The {request['output_format']} output_format is not supported. Allowed formats are {output_formats}"
matthmey's avatar
matthmey committed
999
1000
            )

For faster browsing, not all history is shown. View entire blame