Source code for pyacs.gts.lib.primitive.split

###################################################################
[docs] def split(self, split_dates, verbose=True): ################################################################### """Split time series at given dates. Dates <= split date define the left segment. Dates are converted to integer seconds via at.decyear2seconds. Parameters ---------- split_dates : list or numpy.ndarray Decimal-year dates for split points. verbose : bool, optional Verbose mode. Default is True. Returns ------- list of Gts Gts objects from the split. """ # import import inspect import numpy as np import pyacs.gts import pyacs.lib.astrotime as at import logging import pyacs.message.message as MESSAGE import pyacs.message.verbose_message as VERBOSE import pyacs.message.error as ERROR import pyacs.message.warning as WARNING import pyacs.message.debug_message as DEBUG # check data is not None from pyacs.gts.lib.errors import GtsInputDataNone try: if self.data is None: # raise exception raise GtsInputDataNone(inspect.stack()[0][3], __name__, self) except GtsInputDataNone as error: # print PYACS ERROR ERROR(error) return [] # check that data_xyz and data are consistent if self.data_xyz is not None: if self.data_xyz.shape != self.data.shape: ERROR(".data and .data_xyz are not consistent for time series %s. Returning empty list" % self.code) return [] # convert split_dates to numpy array if it's a list if isinstance(split_dates, list): split_dates = np.array(split_dates) # sort split_dates to ensure proper order split_dates = np.sort(split_dates) # convert all dates to integer seconds np_date_s = at.decyear2seconds(self.data[:, 0], rounding='second') split_dates_s = at.decyear2seconds(split_dates, rounding='second') # create list to store split Gts objects split_gts_list = [] # get start and end dates of the time series start_date_s = np_date_s[0] end_date_s = np_date_s[-1] # create segments based on split dates segment_boundaries = [start_date_s] + list(split_dates_s) + [end_date_s] if verbose: VERBOSE("Splitting time series %s into %d segments" % (self.code, len(segment_boundaries) - 1)) # create segments for i in range(len(segment_boundaries) - 1): start_boundary = segment_boundaries[i] end_boundary = segment_boundaries[i + 1] # find indices for this segment # convention: dates <= date_offset define the left segment if i == 0: # first segment: include data <= end_boundary segment_indices = np.where(np_date_s <= end_boundary)[0] elif i == len(segment_boundaries) - 2: # last segment: include data > start_boundary segment_indices = np.where(np_date_s > start_boundary)[0] else: # middle segments: include data > start_boundary and <= end_boundary segment_indices = np.where((np_date_s > start_boundary) & (np_date_s <= end_boundary))[0] # check if segment has data if len(segment_indices) == 0: if verbose: WARNING("Segment %d (%.3f to %.3f) has no data, skipping" % (i + 1, at.datetime2decyear(at.seconds2datetime(start_boundary)), at.datetime2decyear(at.seconds2datetime(end_boundary)))) continue # create new Gts for this segment new_gts = self.copy() # case .data_xyz if new_gts.data_xyz is not None: new_gts.data_xyz = self.data_xyz[segment_indices, :] # re-generate NEU time series new_gts.xyz2neu(corr=False) else: new_gts.data = self.data[segment_indices, :] # handle outliers: keep only outliers that are in this segment if hasattr(self, 'outliers') and self.outliers: segment_outliers = [] for outlier_idx in self.outliers: if outlier_idx in segment_indices: # adjust index for the new segment new_outlier_idx = np.where(segment_indices == outlier_idx)[0][0] segment_outliers.append(new_outlier_idx) new_gts.outliers = segment_outliers else: new_gts.outliers = [] # handle offsets_dates: keep only offsets that are in this segment if hasattr(self, 'offsets_dates') and self.offsets_dates: segment_offsets = [] for offset_date in self.offsets_dates: offset_date_s = at.decyear2seconds(offset_date, rounding='second') if start_boundary < offset_date_s <= end_boundary: segment_offsets.append(offset_date) new_gts.offsets_dates = segment_offsets else: new_gts.offsets_dates = [] # update code to indicate segment new_gts.code = f"{self.code}_segment_{i+1}" if verbose: VERBOSE("Created segment %d: %s with %d points (%.3f to %.3f)" % (i + 1, new_gts.code, len(segment_indices), at.datetime2decyear(at.seconds2datetime(start_boundary)), at.datetime2decyear(at.seconds2datetime(end_boundary)))) split_gts_list.append(new_gts) if verbose: VERBOSE("Split completed: %d segments created" % len(split_gts_list)) return split_gts_list