Source code for pyacs.gts.lib.l1trend.optimal_l1trend_workflow

"""
Optimal L1-trend workflow for GPS time series analysis.

This module provides a complete workflow that combines L1-trend filtering,
simplification, and refinement for optimal results.
"""

import time
import numpy as np
import pyacs.message.verbose_message as VERBOSE
import logging
import os


[docs] def l1trend_optimal_workflow(self, criterion='AICc', component='NEU', log_dir=None): """ Optimal workflow for l1 trend analysis. This workflow combines multiple L1-trend processing steps for optimal results: 1. Initial L1-trend filtering with AICc criterion 2. Breakpoint simplification to remove redundant breakpoints 3. Local refinement using original data for better fit 4. Final simplification to clean up the refined result Parameters ---------- criterion : str, optional Criterion for L1-trend optimization ('AICc', 'BIC', 'Cp') component : str, optional Components to process ('N', 'E', 'U', 'NE', 'NU', 'EU', 'NEU'). Default is 'NEU' log_dir : str, optional Directory for Gts-specific logging. If provided, creates a log file named {self.code}.log Returns ------- Gts Final optimized L1-trend time series """ # import import pyacs.lib.astrotime as at start_time = time.time() # Setup Gts-specific logging if log_dir is provided gts_logger = None if log_dir is not None: # Create log directory if it doesn't exist os.makedirs(log_dir, exist_ok=True) # Create Gts-specific log file log_file = os.path.join(log_dir, f"{self.code}.log") # Configure Gts-specific logger gts_logger = logging.getLogger(f'pyacs.gts.{self.code}') if not gts_logger.handlers: gts_logger.setLevel(logging.INFO) # Prevent propagation to parent loggers to avoid duplicate logging gts_logger.propagate = False file_handler = logging.FileHandler(log_file, mode='a') # append mode file_handler.setLevel(logging.INFO) # Custom formatter with [Gts.code] prefix formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(name)s] %(message)s') file_handler.setFormatter(formatter) gts_logger.addHandler(file_handler) # Log workflow start with [Gts.code] prefix gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Starting workflow") gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Parameters: criterion={criterion}, component={component}") # Validate component parameter and handle equivalent combinations valid_components = ['N', 'E', 'U', 'NE', 'EN', 'NU', 'UN', 'EU', 'UE', 'NEU', 'NUE', 'ENU', 'EUN', 'UNE', 'UEN'] if component not in valid_components: raise ValueError(f"Invalid component '{component}'. Must be one of: {valid_components}") # Convert component string to list of individual components and sort for consistency component_list = sorted(list(set(component))) # Remove duplicates and sort if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Processing components: {component_list}") # Calculate time series statistics n_points = len(self.data) time_span_years = self.data[-1, 0] - self.data[0, 0] # Convert start and end dates to human readable format start_date_decyear = self.data[0, 0] end_date_decyear = self.data[-1, 0] start_date_readable = at.decyear2datetime(start_date_decyear).strftime('%Y-%m-%d') end_date_readable = at.decyear2datetime(end_date_decyear).strftime('%Y-%m-%d') VERBOSE("Starting optimal L1trend_optimal_workflow for %s %d points, %.2f years span (%s to %s)" % (self.code, n_points, time_span_years, start_date_readable, end_date_readable)) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] {n_points} epochs, {time_span_years:.2f} years span ({start_date_readable} to {end_date_readable})") # Step 1: remove large uncertainty points that are likely outliers VERBOSE("Step 1: Removing large uncertainty points that are likely outliers") #if gts_logger: # gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 1: Removing large uncertainty points") ts_no_large_uncertainty = self.find_large_uncertainty().remove_outliers() outliers_removed = self.data.shape[0] - ts_no_large_uncertainty.data.shape[0] VERBOSE("Removed %d outliers" % outliers_removed) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 1: Removed {outliers_removed} large uncertainty outliers") # Step 2: detrend the time series VERBOSE("Step 2: Detrending the time series") #if gts_logger: # gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 2: Detrending time series") if ts_no_large_uncertainty.data[-1,0] - ts_no_large_uncertainty.data[0,0] > 1: try: detrended_ts = ts_no_large_uncertainty.detrend_median() if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 2: Used detrend_median method") except: VERBOSE("detrend_median failed, using detrend method") detrended_ts = ts_no_large_uncertainty.detrend() else: detrended_ts = ts_no_large_uncertainty.detrend() if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 2: Used detrend method (fallback)") # Step 3: l1trendi with criterion step3_start_time = time.time() VERBOSE("Step 3: Initial L1-trend filtering with criterion %s" % criterion) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 3: Initial L1-trend filtering with criterion {criterion}") # Create a copy for processing only specified components detrended_ts_processed = detrended_ts.copy() # Zero out components that are not being processed component_indices = {'N': 1, 'E': 2, 'U': 3} for comp in ['N', 'E', 'U']: if comp not in component_list: detrended_ts_processed.data[:, component_indices[comp]] = 0.0 l1_aicc = detrended_ts_processed.l1trendi(criterion=criterion, refine=False, component=component) step3_time = time.time() - step3_start_time VERBOSE("Step 3 completed in %.2f seconds" % step3_time) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 3: Completed in {step3_time:.2f} seconds") # Step 3 statistics VERBOSE("Step 3 - Initial L1-trend Statistics:") #if gts_logger: # gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 3: Computing statistics") # Number of breakpoints per component in a single line bp_step3 = l1_aicc.l1trend_to_breakpoints(tol='auto') bp_counts = [] for comp in component_list: if comp in bp_step3: n_bp = len(bp_step3[comp][0]) - 2 # Subtract start and end points bp_counts.append(f"{comp}:{n_bp}") else: bp_counts.append(f"{comp}:0") VERBOSE(" Breakpoints: %s" % ", ".join(bp_counts)) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 3: Breakpoints found: {', '.join(bp_counts)}") # Offset dates if hasattr(l1_aicc, 'offsets_dates') and l1_aicc.offsets_dates: VERBOSE(" Offset dates: %s" % [f"{date:.3f}" for date in l1_aicc.offsets_dates]) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 3: Offset dates: {[f'{date:.3f}' for date in l1_aicc.offsets_dates]}") # Convert to human readable format offset_dates_readable = [] for date in l1_aicc.offsets_dates: try: date_dt = at.decyear2datetime(date) if hasattr(date_dt, '__len__'): date_str = date_dt[0].isoformat()[:10] else: date_str = date_dt.isoformat()[:10] offset_dates_readable.append(date_str) except: offset_dates_readable.append(f"{date:.3f}") VERBOSE(" Offset dates (readable): %s" % offset_dates_readable) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 3: Offset dates: {offset_dates_readable}") else: VERBOSE(" No offset dates found") if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 3: No offset dates found") # Step 4: remove obvious outliers VERBOSE("Step 4: Removing obvious outliers using flag_outliers_using_l1trend") #if gts_logger: # gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 4: Flagging outliers using L1-trend") detrended_ts_outliers_flagged = detrended_ts_processed.flag_outliers_using_l1trend(l1_aicc, threshold=20) # Count outliers by component if detrended_ts_outliers_flagged.outliers: outlier_indices = np.array(detrended_ts_outliers_flagged.outliers) outlier_counts = [] for comp in component_list: comp_idx = {'N': 1, 'E': 2, 'U': 3}[comp] # Count outliers where this component exceeds threshold component_outliers = np.sum(np.abs(detrended_ts_outliers_flagged.data[outlier_indices, comp_idx]) > 0.020) # 20mm threshold outlier_counts.append(f"{comp}:{component_outliers}") VERBOSE(" Outliers removed by component: %s" % ", ".join(outlier_counts)) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 4: Outliers removed by component: {', '.join(outlier_counts)}") else: VERBOSE(" No outliers found") if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 4: No outliers found") dts_no_outliers = detrended_ts_outliers_flagged.remove_outliers() l1_aicc_no_outliers = l1_aicc.copy() l1_aicc_no_outliers.outliers = detrended_ts_outliers_flagged.outliers l1_aicc_no_outliers = l1_aicc_no_outliers.remove_outliers() # Step 5: simplify l1 trend VERBOSE("Step 5: Simplifying L1-trend breakpoints") #if gts_logger: # gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 5: Simplifying L1-trend breakpoints") l1_simplified = l1_aicc_no_outliers.simplify_l1trend() # Get breakpoints after simplification bp_step5 = l1_simplified.l1trend_to_breakpoints(tol='auto') # Count simplified breakpoints by component bp_counts_step5 = [] for comp in component_list: if comp in bp_step5: n_bp = len(bp_step5[comp][0]) - 2 # Subtract start and end points bp_counts_step5.append(f"{comp}:{n_bp}") else: bp_counts_step5.append(f"{comp}:0") VERBOSE(" Simplified breakpoints: %s" % ", ".join(bp_counts_step5)) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 5: New breakpoints: {', '.join(bp_counts_step5)}") # Step 6: refine l1 trend with the original data step6_start_time = time.time() VERBOSE("Step 6: Refining L1-trend using original data") if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 6: Refining L1-trend using original data") # Instead of splitting into segments, process the entire time series # but ensure the refine_l1trend function handles offsets properly VERBOSE("Step 6.1: Refining entire time series with offset awareness") if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 6.1: Refining entire time series with offset awareness") # Refine the entire simplified L1-trend using the full detrended time series l1_refined, H_period, H_cp, H_cp_pb = l1_simplified.refine_l1trend(dts_no_outliers, min_samples_per_segment=4,output='both') # Get refinement information to display refined periods #H_period, H_cp, H_cp_pb = l1_simplified.refine_l1trend(dts_no_outliers, min_samples_per_segment=4, output='info') # Display refined periods in human-readable format if H_period: VERBOSE("Refined periods:") for comp in component_list: if comp in H_period and H_period[comp]: VERBOSE(" %s component:" % comp) for i, period in enumerate(H_period[comp]): start_date = period[0] end_date = period[1] try: start_dt = at.decyear2datetime(start_date) end_dt = at.decyear2datetime(end_date) if hasattr(start_dt, '__len__'): start_str = start_dt[0].isoformat()[:10] else: start_str = start_dt.isoformat()[:10] if hasattr(end_dt, '__len__'): end_str = end_dt[0].isoformat()[:10] else: end_str = end_dt.isoformat()[:10] VERBOSE(" Period %d: %s to %s" % (i+1, start_str, end_str)) except: VERBOSE(" Period %d: %.3f to %.3f" % (i+1, start_date, end_date)) else: VERBOSE(" %s component: No periods refined" % comp) else: VERBOSE("No periods were refined") VERBOSE("Refinement completed on entire time series") if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 6: Refinement completed on entire time series") step6_time = time.time() - step6_start_time if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 6: Completed in {step6_time:.2f} seconds") # Step 7: simplify again VERBOSE("Step 7: Final simplification of refined L1-trend") #if gts_logger: # gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 7: Final simplification of refined L1-trend") # Get breakpoints before simplification bp_before = l1_refined.l1trend_to_breakpoints(tol='auto') l1_final_detrended = l1_refined.simplify_l1trend() # Get breakpoints after simplification bp_after = l1_final_detrended.l1trend_to_breakpoints(tol='auto') # Count breakpoints removed by component bp_removed = [] for comp in component_list: if comp in bp_before and comp in bp_after: n_before = len(bp_before[comp][0]) - 2 # Subtract start and end points n_after = len(bp_after[comp][0]) - 2 n_removed = n_before - n_after bp_removed.append(f"{comp}:{n_removed}") elif comp in bp_before: n_before = len(bp_before[comp][0]) - 2 bp_removed.append(f"{comp}:{n_before}") else: bp_removed.append(f"{comp}:0") VERBOSE(" Breakpoints removed by component: %s" % ", ".join(bp_removed)) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 7: Breakpoints removed by component: {', '.join(bp_removed)}") # Step 8: apply back the velocity removed at step #0 VERBOSE("Step 8: Applying back the velocity removed at step #0") #if gts_logger: # gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 8: Applying back the velocity removed at step #0") velocity = detrended_ts.velocity l1_final = l1_final_detrended.remove_velocity(velocity) # compute a median bias with respect to the original data and remove it if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 8.1: Computing median bias") median_bias = np.median(l1_final.substract_ts_daily(self).data[:, 1:4],axis=0) l1_final.data[:, 1:4] -= median_bias if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 8: Median bias removed: [{median_bias[0]*1000:.3f}, {median_bias[1]*1000:.3f}, {median_bias[2]*1000:.3f}] mm") # Step 9 interpolate the final l1 trend to the original time series VERBOSE("Step 9: Interpolating the final L1-trend to the original time series") if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Step 9: Interpolating the final L1-trend to the original time series") #from pyacs.gts.Gts import Gts #l1_final_interpolated = Gts(code=self.code) l1_final_interpolated = self.copy() l1_final_interpolated.data = np.zeros((self.data.shape[0], 10)) l1_final_interpolated.data[:,4:7] = 0.001 l1_final_interpolated.data[:, 0] = self.data[:, 0] # Interpolate only the processed components, preserve original data for unprocessed components component_indices = {'N': 1, 'E': 2, 'U': 3} for comp in ['N', 'E', 'U']: comp_idx = component_indices[comp] if comp in component_list: # Interpolate the processed component l1_final_interpolated.data[:, comp_idx] = np.interp(self.data[:, 0], l1_final.data[:, 0], l1_final.data[:, comp_idx]) else: # Preserve original data for unprocessed components l1_final_interpolated.data[:, comp_idx] = self.data[:, comp_idx] # Calculate and display final statistics VERBOSE("----------------------------------------------------------") VERBOSE("FINAL STATISTICS") VERBOSE("----------------------------------------------------------") if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Final statistics") # Number of breakpoints per component (from the L1-trend, not the velocity-adjusted version) bp_final = l1_final_detrended.l1trend_to_breakpoints(tol='auto') for comp in component_list: if comp in bp_final: n_bp = len(bp_final[comp][0]) - 2 # Subtract start and end points VERBOSE("Number of breakpoints for %s component: %d" % (comp, n_bp)) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Final: Number of breakpoints for {comp} component: {n_bp}") else: VERBOSE("Number of breakpoints for %s component: 0" % comp) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Final: Number of breakpoints for {comp} component: 0") # Offset dates (from the L1-trend, not the velocity-adjusted version) if hasattr(l1_final_detrended, 'offsets_dates') and l1_final_detrended.offsets_dates: VERBOSE("Offset dates: %s" % [f"{date:.3f}" for date in l1_final_detrended.offsets_dates]) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Final: Offset dates: {[f'{date:.3f}' for date in l1_final_detrended.offsets_dates]}") # Convert to human readable format offset_dates_readable = [] for date in l1_final_detrended.offsets_dates: try: date_dt = at.decyear2datetime(date) if hasattr(date_dt, '__len__'): date_str = date_dt[0].isoformat()[:10] else: date_str = date_dt.isoformat()[:10] offset_dates_readable.append(date_str) except: offset_dates_readable.append(f"{date:.3f}") VERBOSE("Offset dates (readable): %s" % offset_dates_readable) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Final: Offset dates (readable): {offset_dates_readable}") else: VERBOSE("No offset dates found") if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Final: No offset dates found") # Median of fit (residuals between final L1-trend and original data) residuals = l1_final_interpolated.substract_ts_daily(self).data[:, 1:4] * 1000 # Convert to mm # Calculate statistics only for processed components component_indices = {'N': 0, 'E': 1, 'U': 2} # 0-based indices for residuals array processed_residuals = [] processed_comp_names = [] for comp in ['N', 'E', 'U']: if comp in component_list: comp_idx = component_indices[comp] processed_residuals.append(residuals[:, comp_idx]) processed_comp_names.append(comp) if processed_residuals: processed_residuals = np.column_stack(processed_residuals) median_residuals = np.median(processed_residuals, axis=0) mad_residuals = np.median(np.abs(processed_residuals - median_residuals), axis=0) rms_residuals = np.sqrt(np.mean(processed_residuals**2, axis=0)) # Create formatted strings for display median_str = ", ".join([f"{comp}:{val:.2f}" for comp, val in zip(processed_comp_names, median_residuals)]) mad_str = ", ".join([f"{comp}:{val:.2f}" for comp, val in zip(processed_comp_names, mad_residuals)]) rms_str = ", ".join([f"{comp}:{val:.2f}" for comp, val in zip(processed_comp_names, rms_residuals)]) VERBOSE("Median residuals (%s): [%s] mm" % (", ".join(processed_comp_names), median_str)) VERBOSE("MAD residuals (%s): [%s] mm" % (", ".join(processed_comp_names), mad_str)) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Final: MAD residuals ({', '.join(processed_comp_names)}): [{mad_str}] mm") VERBOSE("RMS residuals (%s): [%s] mm" % (", ".join(processed_comp_names), rms_str)) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Final: RMS residuals ({', '.join(processed_comp_names)}): [{rms_str}] mm") else: VERBOSE("No components processed, no residual statistics calculated") if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Final: No components processed, no residual statistics calculated") total_time = time.time() - start_time VERBOSE("Optimal L1-trend workflow completed") VERBOSE("Total workflow time: %.2f seconds" % total_time) if gts_logger: gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Workflow completed successfully") gts_logger.info(f"[{self.code}] [optimal_l1trend_workflow] Total workflow time: {total_time:.2f} seconds") #VERBOSE("Step 6 (refinement) completed in %.2f seconds" % step6_time) return l1_final_interpolated