Source code for pyacs.gts.lib.l1trend.sum_l1_trend

"""
L1-trend summary statistics.

This module provides functions to analyze and summarize L1-trend filtered time series,
including breakpoint statistics, velocity analysis, and segment duration information.
"""

import numpy as np
import pyacs.message.verbose_message as VERBOSE
import pyacs.lib.astrotime as at


[docs] def sum_l1_trend(self): """ Summarize L1-trend filtered time series with comprehensive statistics. This function analyzes a previously L1-trend filtered time series and provides detailed statistics including: - Time series period - Number of breakpoints per component - Velocity statistics (mean, median, maximum) - Median duration between successive breakpoints Parameters ---------- self : Gts object The L1-trend filtered time series to analyze. Returns ------- Gts The original time series object (unchanged) Notes ----- This method is designed to work on time series that have already been processed with L1-trend filtering. It extracts breakpoints and computes velocity statistics for each component (N, E, U). """ VERBOSE("Analyzing L1-trend filtered time series") # Get time series period start_date = self.data[0, 0] end_date = self.data[-1, 0] period_years = end_date - start_date VERBOSE("Time series period: %.3f to %.3f (%.2f years)" % (start_date, end_date, period_years)) # Component names components = ['N', 'E', 'U'] # Analyze each component for i, component in enumerate(components): VERBOSE("\n--- Component %s ---" % component) # Get component data (column i+1: N=1, E=2, U=3) component_data = self.data[:, i+1] # Find breakpoints by detecting significant changes in slope breakpoints = _find_breakpoints(self.data[:, 0], component_data) VERBOSE("Number of breakpoints: %d" % len(breakpoints)) if len(breakpoints) > 0: # Calculate velocities between breakpoints velocities = _calculate_velocities(self.data[:, 0], component_data, breakpoints) # Calculate durations between breakpoints durations = _calculate_durations(self.data[:, 0], breakpoints) # Statistics mean_vel = np.mean(velocities) median_vel = np.median(velocities) max_vel = np.max(np.abs(velocities)) median_duration = np.median(durations) VERBOSE("Velocity statistics:") VERBOSE(" Mean velocity: %.3f mm/year" % (mean_vel * 1000)) VERBOSE(" Median velocity: %.3f mm/year" % (median_vel * 1000)) VERBOSE(" Maximum |velocity|: %.3f mm/year" % (max_vel * 1000)) VERBOSE(" Median duration between breakpoints: %.1f days" % (median_duration * 365.25)) # Additional statistics VERBOSE("Additional statistics:") VERBOSE(" Number of segments: %d" % (len(breakpoints) + 1)) VERBOSE(" Velocity range: [%.3f, %.3f] mm/year" % (np.min(velocities) * 1000, np.max(velocities) * 1000)) VERBOSE(" Duration range: [%.1f, %.1f] days" % (np.min(durations) * 365.25, np.max(durations) * 365.25)) else: VERBOSE("No breakpoints detected - linear trend") # Calculate overall velocity overall_vel = (component_data[-1] - component_data[0]) / (end_date - start_date) VERBOSE("Overall velocity: %.3f mm/year" % (overall_vel * 1000)) VERBOSE("\nL1-trend analysis completed") return self
def _find_breakpoints(t, y, threshold=0.1): """ Find breakpoints in a time series by detecting significant changes in slope. Parameters ---------- t : array_like Time array (decimal years) y : array_like Data array threshold : float Threshold for detecting significant slope changes Returns ------- array Array of breakpoint indices """ if len(t) < 3: return np.array([]) # Calculate first differences (velocities) dt = np.diff(t) dy = np.diff(y) velocities = dy / dt # Calculate second differences (acceleration) d2v = np.diff(velocities) # Find significant changes in velocity # Use a threshold based on the standard deviation of second differences if len(d2v) > 0: std_d2v = np.std(d2v) significant_changes = np.abs(d2v) > threshold * std_d2v # Get indices of significant changes (add 1 because of diff) breakpoints = np.where(significant_changes)[0] + 1 # Filter out breakpoints too close to each other (minimum 5 points apart) if len(breakpoints) > 1: min_gap = 5 filtered_breakpoints = [breakpoints[0]] for bp in breakpoints[1:]: if bp - filtered_breakpoints[-1] >= min_gap: filtered_breakpoints.append(bp) breakpoints = np.array(filtered_breakpoints) else: breakpoints = np.array([]) return breakpoints def _calculate_velocities(t, y, breakpoints): """ Calculate velocities between breakpoints. Parameters ---------- t : array_like Time array (decimal years) y : array_like Data array breakpoints : array_like Array of breakpoint indices Returns ------- array Array of velocities between breakpoints """ velocities = [] # Add start and end points all_points = np.concatenate([[0], breakpoints, [len(t)-1]]) for i in range(len(all_points) - 1): start_idx = all_points[i] end_idx = all_points[i + 1] if end_idx > start_idx: dt = t[end_idx] - t[start_idx] dy = y[end_idx] - y[start_idx] velocity = dy / dt if dt > 0 else 0 velocities.append(velocity) return np.array(velocities) def _calculate_durations(t, breakpoints): """ Calculate durations between breakpoints. Parameters ---------- t : array_like Time array (decimal years) breakpoints : array_like Array of breakpoint indices Returns ------- array Array of durations between breakpoints (in years) """ durations = [] # Add start and end points all_points = np.concatenate([[0], breakpoints, [len(t)-1]]) for i in range(len(all_points) - 1): start_idx = all_points[i] end_idx = all_points[i + 1] if end_idx > start_idx: duration = t[end_idx] - t[start_idx] durations.append(duration) return np.array(durations)