"""
Simplification functions for L1-trend filtered time series.
"""
import numpy as np
#from pyacs.gts.Gts import Gts
from scipy.stats import f
import logging
import pyacs.message.message as MESSAGE
import pyacs.message.verbose_message as VERBOSE
import pyacs.message.error as ERROR
import pyacs.message.warning as WARNING
import pyacs.message.debug_message as DEBUG
[docs]
def simplify_l1trend(self, tolerance=.5, components='ENU'):
"""
Remove unnecessary breakpoints from an L1-trend filtered time series.
This function iteratively removes breakpoints and tests if the simplified model
still fits the original time series within a specified tolerance.
Parameters
----------
tolerance : float
Maximum allowed difference (in mm) between original and simplified model.
Default is 0.5 mm.
components : str
Components to process. Default is 'ENU'.
Returns
-------
pyacs.gts.Gts.Gts
Simplified Gts object with unnecessary breakpoints removed.
"""
VERBOSE("Simplifying l1trend for %s with tolerance %.1f mm and components %s" % (self.code, tolerance, components))
# Convert tolerance from mm to m for consistency with PYACS Gts format
tolerance_m = tolerance / 1000.0
# Get breakpoints from the current L1-trend
bp = self.l1trend_to_breakpoints(tol=0.1)
# Count initial breakpoints for each component
initial_bp_counts = {}
for comp in components:
if comp in ['N', 'E', 'U']:
initial_bp_counts[comp] = len(bp[comp][0]) - 2 # Subtract 2 for start and end points
# Initialize new data array
new_data = np.zeros_like(self.data)
new_data[:,0] = self.data[:,0] # Copy dates
new_data[:,4:8] = self.data[:,4:8] # Copy uncertainties
# Process each component
for comp in components:
if comp not in ['N', 'E', 'U']:
continue
comp_idx = {'N': 1, 'E': 2, 'U': 3}[comp]
# Get breakpoints for this component
bp_dates = bp[comp][0]
bp_values = bp[comp][1]
if len(bp_dates) <= 2: # Only start and end points
# No breakpoints to simplify, just interpolate
new_data[:, comp_idx] = np.interp(new_data[:,0], bp_dates, bp_values)
continue
# Start with all breakpoints
current_dates = bp_dates.copy()
current_values = bp_values.copy()
initial_bp_count = len(current_dates) - 2 # Subtract 2 for start and end points
DEBUG("Processing %s component: starting with %d breakpoints" % (comp, initial_bp_count))
# Iteratively remove breakpoints (except first and last)
removed_any = True
iteration = 0
while removed_any and len(current_dates) > 2:
removed_any = False
iteration += 1
# Try removing each breakpoint (except first and last)
for i in range(1, len(current_dates) - 1):
# Create test breakpoints without the current breakpoint
test_dates = np.delete(current_dates, i)
test_values = np.delete(current_values, i)
# Interpolate test model on original dates
test_interp = np.interp(self.data[:,0], test_dates, test_values)
# Compare with original L1-trend values
original_values = self.data[:, comp_idx]
max_diff = np.max(np.abs(test_interp - original_values))
# If within tolerance, remove this breakpoint
if max_diff <= tolerance_m:
current_dates = test_dates
current_values = test_values
removed_any = True
DEBUG("Iteration %d: removed breakpoint %d from %s component (max_diff: %.3f mm)" %
(iteration, i, comp, max_diff * 1000.0))
break # Start over with the new set of breakpoints
# Interpolate final simplified breakpoints on original dates
new_data[:, comp_idx] = np.interp(new_data[:,0], current_dates, current_values)
final_bp_count = len(current_dates) - 2 # Subtract 2 for start and end points
VERBOSE("Initial/Final breakpoints for %s component: %d -> %d" % (comp, initial_bp_counts[comp], final_bp_count))
# Create new Gts object
simplified_gts = self.copy()
simplified_gts.data = new_data
return simplified_gts
[docs]
def simplify_l1trend_with_fisher_test(self, rawts, components='ENU', alpha=0.05):
"""
Simplify an L1-trend filtered time series by comparing to the unfiltered time series using Fisher-Snedecor tests.
This function iteratively removes breakpoints and tests their usefulness by comparing
the fit quality with and without each breakpoint using Fisher-Snedecor tests.
Parameters
----------
rawts : pyacs.gts.Gts.Gts
The unfiltered (raw) time series to compare against.
components : str
Components to process. Default is 'ENU'.
alpha : float
Significance level for the Fisher-Snedecor test. Default is 0.05.
Returns
-------
pyacs.gts.Gts.Gts
Simplified Gts object with unnecessary breakpoints removed.
"""
VERBOSE("Simplifying l1trend with Fisher test for %s using raw time series %s" % (self.code, rawts.code))
VERBOSE("Components: %s, Significance level: %.3f" % (components, alpha))
# Check that self and rawts have the same data dimension
if self.data.shape != rawts.data.shape:
ERROR("l1trend and rawts have different data dimensions for %s" % (self.code), exit=True)
# Check that dates match
if not np.allclose(self.data[:, 0], rawts.data[:, 0], rtol=1e-10):
ERROR("l1trend and rawts have different dates for %s" % (self.code), exit=True)
# Get breakpoints from the current L1-trend
bp = self.l1trend_to_breakpoints(tol='auto')
# Count initial breakpoints for each component
initial_bp_counts = {}
for comp in components:
if comp in ['N', 'E', 'U']:
initial_bp_counts[comp] = len(bp[comp][0]) - 2 # Subtract 2 for start and end points
# Initialize new data array
new_data = np.zeros_like(self.data)
new_data[:, 0] = self.data[:, 0] # Copy dates
new_data[:, 4:8] = self.data[:, 4:8] # Copy uncertainties
# Process each component
for comp in components:
if comp not in ['N', 'E', 'U']:
continue
comp_idx = {'N': 1, 'E': 2, 'U': 3}[comp]
# Get breakpoints for this component
bp_dates = bp[comp][0]
bp_values = bp[comp][1]
if len(bp_dates) <= 2: # Only start and end points
# No breakpoints to simplify, just interpolate
new_data[:, comp_idx] = np.interp(new_data[:, 0], bp_dates, bp_values)
continue
# Start with all breakpoints
current_dates = bp_dates.copy()
current_values = bp_values.copy()
initial_bp_count = len(current_dates) - 2 # Subtract 2 for start and end points
VERBOSE("Processing %s component: starting with %d breakpoints" % (comp, initial_bp_count))
# Iteratively remove breakpoints (except first and last)
removed_any = True
iteration = 0
while removed_any and len(current_dates) > 2:
removed_any = False
iteration += 1
# Try removing each breakpoint (except first and last)
for i in range(1, len(current_dates) - 1):
# Get the segment defined by previous and next breakpoints
sbp_idx = i - 1 # Start breakpoint index
ebp_idx = i + 1 # End breakpoint index
sbp_date = current_dates[sbp_idx]
ebp_date = current_dates[ebp_idx]
# Find indices for this segment in the time series
segment_mask = (self.data[:, 0] >= sbp_date) & (self.data[:, 0] <= ebp_date)
segment_indices = np.where(segment_mask)[0]
if len(segment_indices) < 3: # Need at least 3 points for meaningful test
continue
# Extract data for this segment
segment_dates = self.data[segment_indices, 0]
segment_raw = rawts.data[segment_indices, comp_idx]
segment_l1 = self.data[segment_indices, comp_idx]
# Calculate straight line between sbp and ebp
sbp_value = current_values[sbp_idx]
ebp_value = current_values[ebp_idx]
# Linear interpolation between sbp and ebp
straight_line = np.interp(segment_dates, [sbp_date, ebp_date], [sbp_value, ebp_value])
# Calculate chi2 for both models
# Model 1: straight line between sbp and ebp
chi2_straight = np.sum((segment_raw - straight_line)**2)
# Model 2: current L1-trend (with the breakpoint)
chi2_l1trend = np.sum((segment_raw - segment_l1)**2)
# Calculate degrees of freedom
n_points = len(segment_indices)
df_straight = n_points - 2 # 2 parameters for straight line (slope, intercept)
df_l1trend = n_points - 3 # 3 parameters for L1-trend with breakpoint (slope1, slope2, breakpoint)
# Ensure degrees of freedom are positive
if df_straight <= 0 or df_l1trend <= 0:
continue
# Calculate F-statistic
if chi2_l1trend > 0:
f_stat = (chi2_straight / df_straight) / (chi2_l1trend / df_l1trend)
else:
f_stat = float('inf')
# Calculate p-value using F-distribution
from scipy.stats import f
try:
p_value = 1 - f.cdf(f_stat, df_straight, df_l1trend)
except:
p_value = 1.0 # Conservative approach
# Test if breakpoint is useful
breakpoint_useful = p_value < alpha
VERBOSE("Breakpoint %d (%.3f): F=%.3f, p=%.3f, useful=%s" %
(i, current_dates[i], f_stat, p_value, breakpoint_useful))
# If breakpoint is not useful, remove it
if not breakpoint_useful:
current_dates = np.delete(current_dates, i)
current_values = np.delete(current_values, i)
removed_any = True
VERBOSE("Iteration %d: removed breakpoint %d from %s component (p=%.3f)" %
(iteration, i, comp, p_value))
break # Start over with the new set of breakpoints
# Interpolate final simplified breakpoints on original dates
new_data[:, comp_idx] = np.interp(new_data[:, 0], current_dates, current_values)
final_bp_count = len(current_dates) - 2 # Subtract 2 for start and end points
VERBOSE("Initial/Final breakpoints for %s component: %d -> %d" % (comp, initial_bp_counts[comp], final_bp_count))
# Create new Gts object
simplified_gts = Gts(code=self.code, data=new_data)
return simplified_gts