Source code for pyacs.gts.lib.l1trend.refinement

"""
Main refinement function for L1-trend analysis.
"""

import numpy as np
import pyacs.lib.astrotime as at
#from pyacs.gts.Gts import Gts

import logging
import pyacs.message.message as MESSAGE
import pyacs.message.verbose_message as VERBOSE
import pyacs.message.error as ERROR
import pyacs.message.warning as WARNING
import pyacs.message.debug_message as DEBUG

from .check_trend import check_l1_trend
from .optimal_refinement import optimal_pwlf_refinement, optimal_pwlf_refinement_fast

########################################################################################################################
[docs] def refine_l1trend(self, rawts, lcomponent='ENU', min_samples_per_segment=10, threshold_bias_res_detect=10, threshold_vel=8, min_sample_segment_refinement=1, norm='L2', output='ts'): ######################################################################################################################## """ Refine results from l1trend by optimizing the date of breakpoints for suspected periods. The algorithm first identifies periods where the l1trend model might be improved. For every candidate period, the algorithm checks different metrics to decide if the model should be refined. Finally, for the selected periods, it optimizes the date of breakpoints. Parameters ---------- rawts : pyacs.gts.Gts.Gts Raw time series, originally processed by l1trend lcomponent : str Component to refine (default: 'ENU') min_samples_per_segment : int Minimum number of samples for a segment to be considered as a candidate for improvement (default: 10) threshold_bias_res_detect : float Threshold to detect bias residuals (default: 10) threshold_vel : float Threshold to detect high velocities (default: 8) min_sample_segment_refinement : int Minimum number of samples for a segment in the refined model (default: 1) norm : str Norm to be minimized for improvement (default: 'L2') output : str Output type 'ts' for the refined Gts, 'info' for the periods suspected, 'both' for both (default: 'ts') Returns ------- pyacs.gts.Gts.Gts or tuple Refined Gts object or information about suspected periods """ # check that self and rawts have the same data dimension if self.data.shape != rawts.data.shape: ERROR("l1trend and rawts have different data dimensions for %s" % (self.code), exit=True) # too short time series if self.data.shape[0] < 3: WARNING("Time series is too short for refinement for %s. Keeping untouched" % (self.code)) H_period, H_cp, H_cp_pb = {}, {}, {} if output == 'ts': return self if output == 'both': return self, H_period, H_cp, H_cp_pb if output == 'info': return H_period, H_cp, H_cp_pb VERBOSE("----------------------------------------------------------") VERBOSE("Refining l1trend results using the new strategy for %s" % (self.code)) VERBOSE("----------------------------------------------------------") VERBOSE("Parameters used for refinement") VERBOSE("min_samples_per_segment: %d" % min_samples_per_segment) VERBOSE("threshold_bias_res_detect: %d" % threshold_bias_res_detect) VERBOSE("threshold_vel: %d" % threshold_vel) VERBOSE("min_sample_segment_refinement: %d" % min_sample_segment_refinement) VERBOSE("norm: %s" % norm) VERBOSE("----------------------------------------------------------") # Simplify l1trend first # JMN 15/08/2025 : removed, this function should refine the actual time series provided by the user #VERBOSE("Cleaning l1trend breakpoints for %s" % (self.code)) #l1ts = self.simplify_l1trend(tolerance=.5, components='NEU') l1ts = self.copy() # Check for suspicious periods H_period, H_cp, H_cp_pb = check_l1_trend(rawts, l1ts, component=lcomponent, min_samples_per_segment=min_samples_per_segment, threshold_bias_res_detect=threshold_bias_res_detect, threshold_vel=threshold_vel) # Add periods with large offsets to suspicious periods VERBOSE("Checking for large offsets to add to suspicious periods") large_offset_threshold = 5.0 # mm - threshold for considering an offset as "large" for component in lcomponent: if component not in H_period: H_period[component] = [] # Check if there are offsets in the time series if hasattr(rawts, 'offsets_values') and rawts.offsets_values is not None: comp_idx = {'N': 1, 'E': 2, 'U': 3}[component] # Get offset values for this component offset_values = rawts.offsets_values[:, comp_idx] * 1000 # Convert to mm # Find large offsets large_offset_indices = np.where(np.abs(offset_values) > large_offset_threshold)[0] if len(large_offset_indices) > 0: VERBOSE("Found %d large offsets (>%.1f mm) for component %s" % (len(large_offset_indices), large_offset_threshold, component)) # Get offset dates offset_dates = rawts.offsets_dates for idx in large_offset_indices: if idx < len(offset_dates): offset_date = offset_dates[idx] offset_magnitude = offset_values[idx] # Create a period around the offset (e.g., ±30 days) offset_period_days = 30 offset_period_years = offset_period_days / 365.25 period_start = offset_date - offset_period_years period_end = offset_date + offset_period_years # Ensure period is within time series bounds period_start = max(period_start, rawts.data[0, 0]) period_end = min(period_end, rawts.data[-1, 0]) # Add to suspicious periods if not already present new_period = [period_start, period_end] # Check if this period overlaps significantly with existing periods period_exists = False for existing_period in H_period[component]: # Check for overlap overlap_start = max(new_period[0], existing_period[0]) overlap_end = min(new_period[1], existing_period[1]) if overlap_start < overlap_end: # There is overlap, check if it's significant overlap_duration = overlap_end - overlap_start new_period_duration = new_period[1] - new_period[0] if overlap_duration / new_period_duration > 0.5: # More than 50% overlap period_exists = True break if not period_exists: H_period[component].append(new_period) VERBOSE("Added offset period for %s component: [%.3f, %.3f] (offset: %.1f mm)" % (component, period_start, period_end, offset_magnitude)) else: VERBOSE("Offset period overlaps with existing suspicious period, skipping") else: VERBOSE("No large offsets found for component %s" % component) else: VERBOSE("No offset information available for component %s" % component) # Convert H_period decimal years to seconds H_period_seconds = {} for component in H_period.keys(): H_period_seconds[component] = [] for period in H_period[component]: start_seconds = at.decyear2seconds(period[0]) end_seconds = at.decyear2seconds(period[1]) H_period_seconds[component].append([start_seconds, end_seconds]) # Convert H_cp to seconds H_cp_seconds = {} for component in H_cp.keys(): H_cp_start_end = l1ts.data[H_cp[component],0] # Insert l1ts.data[0,0] at the first and l1ts.data[-1,0] at the last element H_cp_start_end = np.insert(H_cp_start_end, 0, l1ts.data[0,0]) H_cp_start_end = np.insert(H_cp_start_end, -1, l1ts.data[-1,0]) H_cp_seconds[component] = at.decyear2seconds(H_cp_start_end) # Convert l1ts.data[:,0] from decimal year to seconds for time step calculation t_seconds = at.decyear2seconds(l1ts.data[:,0]) # Calculate time step for limiting search windows if len(t_seconds) > 1: time_step = np.median(np.diff(t_seconds)) max_extension = 50 * time_step # 50 samples worth of time #VERBOSE("Time step: %.1f seconds, max extension: %.1f seconds (50 samples)" % (time_step, max_extension)) else: max_extension = None # Build H_period_test to add adjacent segments to the suspected periods H_period_test = {} for component in H_period_seconds.keys(): H_period_test[component] = [] for period in H_period_seconds[component]: start_seconds = period[0] end_seconds = period[1] # Find the previous and next elements from H_cp_seconds cp_seconds = H_cp_seconds[component] # Find the index of the closest breakpoint before start_seconds prev_idx = 0 for i, cp_time in enumerate(cp_seconds): if cp_time < start_seconds: prev_idx = i else: break # Find the index of the closest breakpoint after end_seconds next_idx = len(cp_seconds) - 1 for i, cp_time in enumerate(cp_seconds): if cp_time > end_seconds: next_idx = i break # Replace start and end with previous and next breakpoints new_start = cp_seconds[prev_idx] new_end = cp_seconds[next_idx] # Apply 30-sample restriction to prevent excessive search windows if max_extension is not None: # Limit new_start to max_extension before start_seconds min_start = start_seconds - max_extension if new_start < min_start: new_start = min_start VERBOSE("Limited new_start to 30 samples before original period start: %.1f -> %.1f seconds" % (cp_seconds[prev_idx], new_start)) # Limit new_end to max_extension after end_seconds max_end = end_seconds + max_extension if new_end > max_end: new_end = max_end VERBOSE("Limited new_end to 30 samples after original period end: %.1f -> %.1f seconds" % (cp_seconds[next_idx], new_end)) H_period_test[component].append([new_start, new_end]) # Merge periods in H_period_test only if corresponding periods in H_period_seconds are contiguous H_period_for_test = {} for component in H_period_test.keys(): H_period_for_test[component] = [] if len(H_period_test[component]) == 0: continue # Get corresponding periods from H_period_seconds seconds_periods = H_period_seconds[component] # Sort periods by start time (both H_period_test and seconds periods) sorted_test_periods = sorted(H_period_test[component], key=lambda x: x[0]) sorted_seconds_periods = sorted(seconds_periods, key=lambda x: x[0]) # Initialize with the first period merged_periods = [sorted_test_periods[0]] merged_seconds_indices = [0] # Track which seconds periods are merged # Check each subsequent period for contiguity in H_period_seconds for i, current_test_period in enumerate(sorted_test_periods[1:], 1): last_merged = merged_periods[-1] last_seconds_idx = merged_seconds_indices[-1] # Check if corresponding periods in H_period_seconds are contiguous tolerance = 1e-6 # 1 microsecond tolerance in seconds current_seconds_period = sorted_seconds_periods[i] last_seconds_period = sorted_seconds_periods[last_seconds_idx] if abs(current_seconds_period[0] - last_seconds_period[1]) <= tolerance: # Seconds periods are contiguous, merge the test periods merged_periods[-1] = [last_merged[0], current_test_period[1]] merged_seconds_indices[-1] = i # Update to the latest merged index else: # Seconds periods are not contiguous, add as new period merged_periods.append(current_test_period) merged_seconds_indices.append(i) H_period_for_test[component] = merged_periods # Copy l1ts to refine_l1ts for refinement refine_l1ts = l1ts.copy() # Process each component for component in H_period_for_test.keys(): if len(H_period_for_test[component]) == 0: VERBOSE("No test periods for component %s" % component) continue VERBOSE("----------------------------------------------------------") VERBOSE("Processing component %s with %d test periods" % (component, len(H_period_for_test[component]))) VERBOSE("----------------------------------------------------------") # Get component index comp_idx = {'N': 1, 'E': 2, 'U': 3}[component] for i, period in enumerate(H_period_for_test[component]): start_seconds = float(period[0]) # Ensure scalar value end_seconds = float(period[1]) # Ensure scalar value # Convert seconds to datetime for human-readable output start_dt = at.seconds2datetime(start_seconds) end_dt = at.seconds2datetime(end_seconds) VERBOSE("----------------------------------------------------------") VERBOSE("Processing period %d/%d for component %s: [%s, %s]" % (i+1, len(H_period_for_test[component]), component, start_dt.isoformat()[:10], end_dt.isoformat()[:10])) VERBOSE("----------------------------------------------------------") # Extract time window corresponding to the tested period mask = (t_seconds >= start_seconds) & (t_seconds <= end_seconds) period_indices = np.where(mask)[0] if len(period_indices) < 3: VERBOSE("Period too short, skipping") continue # Extract data for the time window t_period = t_seconds[period_indices] y_period = rawts.data[period_indices, comp_idx] * 1E3 # Convert to mm l1_period = l1ts.data[period_indices, comp_idx] * 1E3 # Convert to mm # Get start and end values from l1ts y0 = l1_period[0] yn = l1_period[-1] #VERBOSE("Time window: %d points, start: %.3f mm, end: %.3f mm" % # (len(period_indices), y0, yn)) # Find the indexes in t_seconds corresponding to bp_dates bp_dates = H_cp_seconds[component] bp_indices = [] for bp_time in bp_dates: # Find the closest index in t_seconds closest_idx = np.argmin(np.abs(t_seconds - bp_time)) bp_indices.append(closest_idx) bp_values = l1ts.data[bp_indices, comp_idx] # Find breakpoints within the period period_bp_indices = [] for i, bp_idx in enumerate(bp_indices): if bp_idx >= period_indices[0] and bp_idx <= period_indices[-1]: period_bp_indices.append(i) period_bp_dates = bp_dates[period_bp_indices] period_bp_values = bp_values[period_bp_indices] # Count interior breakpoints (excluding start and end of the period) # The period boundaries are defined by the previous and next breakpoints from H_cp_seconds period_start = t_period[0] period_end = t_period[-1] # Count breakpoints that are strictly within the period (not at boundaries) interior_bp_count = 0 for bp_date in period_bp_dates: if period_start < bp_date < period_end: interior_bp_count += 1 nbp = interior_bp_count VERBOSE("Number of interior breakpoints in period: %d" % nbp) if nbp <= 0: VERBOSE("No breakpoints to refine, skipping") continue # Store original nbp for potential retry original_nbp = nbp # Limit nbp for performance - exhaustive search becomes too slow for n>=4 max_nbp = 3 # Limit to 3 breakpoints for reasonable performance if nbp > max_nbp: VERBOSE("Too many breakpoints (%d), limiting to %d for performance" % (nbp, max_nbp)) nbp = max_nbp # Create weights for error calculation based on bias detection # Points in segments where bias was detected get higher weight weights = np.ones(len(period_indices)) # Check if this period overlaps with any bias-detected segments period_start = t_period[0] period_end = t_period[-1] # Get bias-detected segments for this component if component in H_period_seconds: bias_segments = H_period_seconds[component] for bias_start, bias_end in bias_segments: # Check if there's overlap between the current period and bias segment overlap_start = max(period_start, bias_start) overlap_end = min(period_end, bias_end) if overlap_start < overlap_end: # There is overlap # Find indices within the overlap overlap_mask = (t_period >= overlap_start) & (t_period <= overlap_end) overlap_indices = np.where(overlap_mask)[0] # Give higher weight to points in bias-detected segments weights[overlap_indices] = 5.0 # 5x higher weight # Handle both array and single datetime object cases bias_start_dt = at.seconds2datetime(bias_start) bias_end_dt = at.seconds2datetime(bias_end) bias_start_str = bias_start_dt[0].isoformat()[:10] if hasattr(bias_start_dt, '__len__') else bias_start_dt.isoformat()[:10] bias_end_str = bias_end_dt[0].isoformat()[:10] if hasattr(bias_end_dt, '__len__') else bias_end_dt.isoformat()[:10] VERBOSE("Period overlaps with bias segment (%s to %s), applying 5x weight to %d points" % (bias_start_str, bias_end_str, len(overlap_indices))) # Function to run optimization with given number of breakpoints def run_optimization(n_breakpoints): if n_breakpoints <= 3 and len(period_indices) < 200: VERBOSE("Running optimal_pwlf_refinement (exhaustive search) with %d breakpoints" % n_breakpoints) return optimal_pwlf_refinement(t_period, y_period, y0, yn, n_breakpoints, weights) else: VERBOSE("Running optimal_pwlf_refinement_fast because exhaustive search would be too slow with %d breakpoints and %d points" % (n_breakpoints, len(period_indices))) return optimal_pwlf_refinement_fast(t_period, y_period, y0, yn, weights) # First optimization attempt with original nbp try: optimal_bp, optimal_vals, min_error = run_optimization(nbp) except Exception as e: VERBOSE("Error in optimal_pwlf_refinement: %s" % str(e)) import sys; sys.exit() continue original_error = np.sum(weights * (y_period - l1_period)**2) VERBOSE("Original error: %.6f, Optimal error with %d breakpoints: %.6f" % (original_error, nbp, min_error)) # Test if optimal_pwlf_refinement significantly improves fit improvement_ratio = original_error / min_error if min_error > 0 else float('inf') significant_improvement = improvement_ratio > 1.10 # 10% improvement threshold # Show breakpoint dates in human readable format bp_dates_readable = [] for bp in optimal_bp: bp_dt = at.seconds2datetime(bp) bp_str = bp_dt[0].isoformat()[:10] if hasattr(bp_dt, '__len__') else bp_dt.isoformat()[:10] bp_dates_readable.append(bp_str) VERBOSE("Optimal breakpoints for %s component: %s" % (component, bp_dates_readable)) if significant_improvement: VERBOSE("Significant improvement detected (ratio: %.2f), updating refine_l1ts" % improvement_ratio) # Interpolate optimal solution back to the period indices optimal_interp = np.interp(t_period, optimal_bp, optimal_vals) # Insert optimal solution into refine_l1ts refine_l1ts.data[period_indices, comp_idx] = optimal_interp / 1E3 # Convert back to m VERBOSE("Successfully updated refine_l1ts for period %d" % (i+1)) else: VERBOSE("No significant improvement with %d breakpoints (ratio: %.2f), trying with %d breakpoints" % (nbp, improvement_ratio, nbp + 1)) # Try with one more breakpoint if we haven't reached the limit if nbp + 1 <= max_nbp and nbp + 1 < len(period_indices) - 1: try: optimal_bp_plus1, optimal_vals_plus1, min_error_plus1 = run_optimization(nbp + 1) improvement_ratio_plus1 = original_error / min_error_plus1 if min_error_plus1 > 0 else float('inf') significant_improvement_plus1 = improvement_ratio_plus1 > 1.20 VERBOSE("Error with %d breakpoints: %.6f (ratio: %.2f)" % (nbp + 1, min_error_plus1, improvement_ratio_plus1)) # Show breakpoint dates in human readable format bp_dates_readable = [] for bp in optimal_bp_plus1: bp_dt = at.seconds2datetime(bp) bp_str = bp_dt[0].isoformat()[:10] if hasattr(bp_dt, '__len__') else bp_dt.isoformat()[:10] bp_dates_readable.append(bp_str) VERBOSE("Optimal breakpoints for %s component: %s" % (component, bp_dates_readable)) if significant_improvement_plus1 and improvement_ratio_plus1 > improvement_ratio: VERBOSE("Better improvement with %d breakpoints (ratio: %.2f), updating refine_l1ts" % (nbp + 1, improvement_ratio_plus1)) # Use the solution with nbp + 1 breakpoints optimal_interp = np.interp(t_period, optimal_bp_plus1, optimal_vals_plus1) refine_l1ts.data[period_indices, comp_idx] = optimal_interp / 1E3 # Convert back to m VERBOSE("Successfully updated refine_l1ts for period %d with %d breakpoints" % (i+1, nbp + 1)) else: VERBOSE("No better improvement with %d breakpoints, keeping original" % (nbp + 1)) except Exception as e: VERBOSE("Error in optimal_pwlf_refinement with %d breakpoints: %s" % (nbp + 1, str(e))) VERBOSE("Keeping original solution") else: VERBOSE("Cannot try %d breakpoints (limit reached or too many points), keeping original" % (nbp + 1)) # Track which segments have been improved improved_segments = set() # Simplify using Fisher test only on improved segments VERBOSE("Simplifying using Fisher test on improved segments") # Create a copy of refine_l1ts for simplification simplified_ts = refine_l1ts.copy() # For each component, identify segments that were improved and apply Fisher test for comp in ['N', 'E', 'U']: comp_idx = {'N': 1, 'E': 2, 'U': 3}[comp] # Get breakpoints for this component bp = simplified_ts.l1trend_to_breakpoints(tol='auto') bp_dates = bp[comp][0] bp_values = bp[comp][1] if len(bp_dates) <= 2: # Only start and end points continue # For each breakpoint (except first and last), check if it's in an improved segment for i in range(1, len(bp_dates) - 1): # Get the segment defined by previous and next breakpoints sbp_date = bp_dates[i - 1] ebp_date = bp_dates[i + 1] # Find indices for this segment in the time series segment_mask = (simplified_ts.data[:, 0] >= sbp_date) & (simplified_ts.data[:, 0] <= ebp_date) segment_indices = np.where(segment_mask)[0] if len(segment_indices) < 3: # Need at least 3 points for meaningful test continue # Check if this segment was improved during refinement # We can do this by comparing the current values with the original l1ts values segment_current = simplified_ts.data[segment_indices, comp_idx] segment_original = self.data[segment_indices, comp_idx] # Calculate if there was significant change in this segment segment_diff = np.sum((segment_current - segment_original)**2) segment_improvement = segment_diff > 1e-12 # Small threshold to detect changes if segment_improvement: improved_segments.add((comp, sbp_date, ebp_date)) VERBOSE("Segment %s (%.3f to %.3f) was improved, will apply Fisher test" % (comp, sbp_date, ebp_date)) # Apply Fisher test only on improved segments improved_segments = False if improved_segments: VERBOSE("Applying Fisher test on %d improved segments" % len(improved_segments)) # For each improved segment, create a temporary Gts and apply Fisher test for comp, sbp_date, ebp_date in improved_segments: comp_idx = {'N': 1, 'E': 2, 'U': 3}[comp] # Find indices for this segment segment_mask = (simplified_ts.data[:, 0] >= sbp_date) & (simplified_ts.data[:, 0] <= ebp_date) segment_indices = np.where(segment_mask)[0] if len(segment_indices) < 3: continue # Create temporary Gts for this segment segment_l1ts = simplified_ts.copy() segment_l1ts.data = simplified_ts.data[segment_indices, :] segment_rawts = rawts.copy() segment_rawts.data = rawts.data[segment_indices, :] # Apply Fisher test on this segment try: segment_simplified = segment_l1ts.simplify_l1trend_with_fisher_test( rawts=segment_rawts, components=comp, alpha=0.05 ) # Update the main simplified_ts with the results from this segment simplified_ts.data[segment_indices, comp_idx] = segment_simplified.data[:, comp_idx] VERBOSE("Fisher test applied to segment %s (%.3f to %.3f)" % (comp, sbp_date, ebp_date)) except Exception as e: VERBOSE("Error applying Fisher test to segment %s (%.3f to %.3f): %s" % (comp, sbp_date, ebp_date, str(e))) VERBOSE("Keeping original segment values") else: VERBOSE("No improved segments found, skipping Fisher test") VERBOSE("Refinement completed for %s" % (self.code)) if output == 'ts': return simplified_ts if output == 'both': return simplified_ts, H_period, H_cp, H_cp_pb if output == 'info': return H_period, H_cp, H_cp_pb