"""
Main L1-trend filtering function.
"""
import numpy as np
try:
from trendfilter import trend_filter
except:
from pathlib import Path
theme_path = 'bokeh_theme.yaml'
if not theme_path.exists():
theme_path.write_text(
"attrs:\n"
" Axis:\n"
" major_label_text_font_size: '12pt'\n"
" axis_label_text_font_size: '14pt'\n"
" Line:\n"
" line_width: 3\n"
" Title:\n"
" text_font_size: '12pt'\n"
" Legend:\n"
" background_fill_alpha: 0.8\n",
encoding='utf-8'
)
from trendfilter import trend_filter
import pyacs.lib.astrotime as at
import logging
import pyacs.message.message as MESSAGE
import pyacs.message.verbose_message as VERBOSE
import pyacs.message.error as ERROR
import pyacs.message.warning as WARNING
import pyacs.message.debug_message as DEBUG
from time import time
import os
from datetime import datetime
[docs]
def l1trendi(self,
alpha='auto',
ndays_mf=1,
criterion='BIC',
lcomponent='ENU',
algo='golden',
pre_process=4500,
bounds=[-2, 1],
tol=0.01,
refine=True,
min_samples_per_segment=5,
threshold_bias_res_detect=5,
threshold_vel=3,
min_sample_segment_refinement=1,
norm='L2',
verbose=False,
log_dir=None,
component=None):
"""
Performs l1 trend filter on GPS time series. For each component, optimal filter parameter is searched using specified criterion.
Uses https://pypi.org/project/trendfilter/
Parameters
----------
alpha : float or str
Either hyperparameter as float or 'auto' to find optimal filtering using criterion
ndays_mf : int
Parameter for a median filter to be applied prior to l1-trend filtering
criterion : str
Criterion to select automatic optimal l1-trend filtering. Options: 'BIC' (default), 'AICc', 'Cp'
lcomponent : str
List of components to be filtered. Other components will remain unchanged
algo : str
Algorithm to find optimal alpha. Options: 'golden' (default) or 'custom'
pre_process : float
Pre-process time series to avoid convergence problems in l1trend. It corrects for unrealistically large
velocity using the threshold provided (mm/yr). Correction is then added again after l1_trend.
Default is 3600 (3.6 m/yr)
bounds : list
Bounds for golden section search algorithm. Default is [-2, 1]
tol : float
Tolerance for golden section search algorithm. Default is 0.01
refine : bool
If True, will run refinement after l1 trend filtering
min_samples_per_segment : int
Minimum number of samples per segment for refinement
threshold_bias_res_detect : float
Threshold for bias detection in refinement
threshold_vel : float
Threshold for velocity detection in refinement
min_sample_segment_refinement : int
Minimum number of samples per segment in refinement
norm : str
Norm to use for refinement. Options: 'L2' (default)
verbose : bool
Enable verbose output for debugging
log_dir : str or None
If provided, directory where the log file will be created. The log file will be named 'SITE_l1trendi.log'
where SITE is the site code. If None, no file logging is performed.
component : str or None
Components to process ('N', 'E', 'U', 'NE', 'EN', 'NU', 'UN', 'EU', 'UE', 'NEU', etc.).
If None, uses lcomponent parameter. Used for component-specific statistics calculation.
Returns
-------
Gts
l1-trend filtered time series as new Gts instance
Raises
------
ValueError
If invalid parameters are provided
RuntimeError
If processing fails
"""
from .preprocessing import pre_process_test
from .optimization import best_l1trend_golden, best_l1trend_custom
# Setup Gts-specific logging if log_dir is provided
gts_logger = None
if log_dir is not None:
# Create log directory if it doesn't exist
os.makedirs(log_dir, exist_ok=True)
# Create Gts-specific log file
log_file = os.path.join(log_dir, f"{self.code}.log")
# Configure Gts-specific logger
gts_logger = logging.getLogger(f'pyacs.gts.{self.code}')
if not gts_logger.handlers:
gts_logger.setLevel(logging.INFO)
# Prevent propagation to parent loggers to avoid duplicate logging
gts_logger.propagate = False
file_handler = logging.FileHandler(log_file, mode='a') # append mode
file_handler.setLevel(logging.INFO)
# Custom formatter with [Gts.code] prefix
formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(name)s] %(message)s')
file_handler.setFormatter(formatter)
gts_logger.addHandler(file_handler)
# Log method start with [Gts.code] prefix
gts_logger.info(f"[{self.code}] [l1trendi] Starting method")
gts_logger.info(f"[{self.code}] [l1trendi] Parameters: criterion={criterion}, alpha={alpha}, algo={algo}")
# Log start of processing
str_sdate = at.decyear2datetime(self.data[0,0]).isoformat()[:10]
str_edate = at.decyear2datetime(self.data[-1,0]).isoformat()[:10]
# Get time period string for logging
str_period = f"[{str_sdate} - {str_edate}]"
VERBOSE(f"Starting l1trend processing for {self.code} {str_period} {lcomponent} {criterion} at {datetime.now().isoformat()}")
# Validate input parameters
if criterion not in ['BIC', 'AICc', 'Cp']:
error_msg = f"Invalid criterion: {criterion}. Must be one of ['BIC', 'AICc', 'Cp']"
ERROR(error_msg)
raise ValueError(error_msg)
if algo not in ['golden', 'custom']:
error_msg = f"Invalid algorithm: {algo}. Must be one of ['golden', 'custom']"
ERROR(error_msg)
raise ValueError(error_msg)
if not isinstance(ndays_mf, int) or ndays_mf < 1:
error_msg = f"Invalid ndays_mf: {ndays_mf}. Must be a positive integer"
ERROR(error_msg)
raise ValueError(error_msg)
if not isinstance(pre_process, (int, float)) or pre_process <= 0:
error_msg = f"Invalid pre_process: {pre_process}. Must be positive"
ERROR(error_msg)
raise ValueError(error_msg)
# Component mapping
COMPONENT_MAP = {
1: {'full': 'North', 'short': 'N'},
2: {'full': 'East', 'short': 'E'},
3: {'full': 'Up', 'short': 'U'}
}
# Determine which components to process for statistics
if component is not None:
# Validate component parameter
valid_components = ['N', 'E', 'U', 'NE', 'EN', 'NU', 'UN', 'EU', 'UE', 'NEU', 'NUE', 'ENU', 'EUN', 'UNE', 'UEN']
if component not in valid_components:
raise ValueError(f"Invalid component '{component}'. Must be one of: {valid_components}")
stats_components = sorted(list(set(component))) # Remove duplicates and sort
else:
stats_components = sorted(list(set(lcomponent))) # Use lcomponent if component is None
# Criterion index mapping
CRITERION_INDEX = {
'BIC': -1,
'AICc': -2,
'Cp': -3
}
# Start timing
start_time = time()
try:
# Create working copy
gts = self.copy()
# Apply median filter if requested
if ndays_mf > 1:
VERBOSE(f"Applying median filter with window size {ndays_mf}")
gts = self.median_filter(ndays_mf)
# Create output Gts
ngts = gts.copy()
x = ngts.data[:, 0]
# Validate data
if ngts.data.shape[0] <= 2:
warning_msg = f"Too few data points ({ngts.data.shape[0]}) in Gts {ngts.code}. Returning original Gts {str_period}"
WARNING(warning_msg)
return ngts
# Process each component
for i in [1, 2, 3]:
if COMPONENT_MAP[i]['short'] not in lcomponent:
continue
comp_name = COMPONENT_MAP[i]['full']
VERBOSE(f"Processing {comp_name} component for {gts.code} {str_period}")
# Handle user-defined alpha
if isinstance(alpha, float):
VERBOSE(f"Using fixed alpha={alpha} for {comp_name} component")
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] Using fixed alpha={alpha} for {comp_name} component")
ngts.data[:, i] = trend_filter(x, gts.data[:, i] * 1.E3, l_norm=1, alpha_2=alpha)['y_fit'] * 1.E-3
continue
# Automatic alpha search
if alpha == 'auto':
criterion_idx = CRITERION_INDEX[criterion]
VERBOSE(f"Searching optimal alpha using {criterion} criterion for {comp_name} component")
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] Searching optimal alpha using {criterion} criterion for {comp_name} component")
# Pre-process if requested
if pre_process > 0:
VERBOSE(f"Optimal alpha: Running pre-process detection for large offsets in {comp_name} component")
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] Optimal alpha: Running pre-process detection for large offsets in {comp_name} component")
lidx_offset = pre_process_test(gts, COMPONENT_MAP[i]['short'], threshold=pre_process)
if len(lidx_offset) > 0:
# Handle time series split
idx_offset = lidx_offset[0]
date_offset = at.decyear2datetime(gts.data[idx_offset,0]).isoformat()[:19]
warning_msg = (f"Optimal alpha: Splitting Gts {self.code} {comp_name} component at {date_offset}")
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] {warning_msg}")
else:
VERBOSE(warning_msg)
# Process each segment
segments = [
(gts.data[:idx_offset, :], "before"),
(gts.data[idx_offset:, :], "after")
]
results = []
for segment_data, position in segments:
segment_gts = gts.copy()
segment_gts.data = segment_data
result = segment_gts.l1trendi(
alpha=alpha, ndays_mf=ndays_mf, criterion=criterion,
lcomponent=COMPONENT_MAP[i]['short'], algo=algo,
pre_process=pre_process, bounds=bounds, tol=tol,
refine=False,
min_samples_per_segment=min_samples_per_segment,
threshold_bias_res_detect=threshold_bias_res_detect,
threshold_vel=threshold_vel,
min_sample_segment_refinement=min_sample_segment_refinement,
norm=norm,
log_dir=log_dir
)
results.append(result)
# Merge results
ngts.data[:, i] = np.hstack((results[0].data[:, i], results[1].data[:, i]))
date_offset_dec_year = (ngts.data[idx_offset,0]+ngts.data[idx_offset-1,0])/2
date_offset_str = at.decyear2datetime(date_offset_dec_year).isoformat()[:19]
ngts.offsets_dates.append(date_offset_dec_year)
continue
# Process without splitting
idx_offset = []
try:
# Create component mask for this specific component
current_comp = COMPONENT_MAP[i]['short']
component_mask = np.ones(len(x), dtype=bool) # Default: all points active
# If this component is not in the stats_components, create a mask that excludes zero values
if current_comp not in stats_components:
# Only consider points where this component has non-zero values
component_mask = np.abs(gts.data[:, i]) > 1e-10
if algo == 'golden':
ngts.data[:, i] = best_l1trend_golden(x, gts.data[:, i] * 1.E3, criterion_idx, bounds=bounds, tol=tol, component_mask=component_mask) * 1.E-3
VERBOSE(f"Optimal alpha: Successfully processed {comp_name} component with algo golden")
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] Optimal alpha: Successfully processed {comp_name} component with algo golden")
else:
ngts.data[:, i] = best_l1trend_custom(x, gts.data[:, i] * 1.E3, criterion_idx, component_mask=component_mask)[0] * 1.E-3
VERBOSE(f"Optimal alpha: Successfully processed {comp_name} component with algo custom")
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] Optimal alpha: Successfully processed {comp_name} component with algo custom")
except Exception as e:
warning_msg = f"Failed in algo {algo} search to process {comp_name} component: {str(e)}"
if gts_logger:
gts_logger.error(f"[{self.code}] [l1trendi] {warning_msg}")
ERROR(warning_msg, exit=True)
continue
# print offsets_dates
ngts.offsets_dates = sorted(set(ngts.offsets_dates))
mesg = f"List of offsets detected for {ngts.code}"
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] {mesg}")
else:
VERBOSE(mesg)
for date_offset in ngts.offsets_dates:
mesg = f"{date_offset:.5f} {at.decyear2datetime(date_offset).isoformat()[:19]}"
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] {mesg}")
VERBOSE(mesg)
# Apply refinement if requested
if refine:
VERBOSE(f"Applying refinement for {ngts.code} {lcomponent} {str_period}")
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] Applying refinement for {ngts.code} {lcomponent} {str_period}")
ngts = ngts.refine_l1trend(
self,
min_samples_per_segment=min_samples_per_segment,
threshold_bias_res_detect=threshold_bias_res_detect,
threshold_vel=threshold_vel,
min_sample_segment_refinement=min_sample_segment_refinement,
norm=norm,
lcomponent=lcomponent
)
# Log completion
elapsed = time() - start_time
VERBOSE(f"Completed l1trendi processing for {self.code} in {elapsed:.1f} seconds")
if gts_logger:
gts_logger.info(f"[{self.code}] [l1trendi] Completed processing in {elapsed:.1f} seconds")
gts_logger.info(f"[{self.code}] [l1trendi] Method completed successfully")
return ngts
except Exception as e:
error_msg = f"Failed to process Gts {self.code}: {str(e)}"
if gts_logger:
gts_logger.error(f"[{self.code}] [l1trendi] {error_msg}")
raise RuntimeError(f"l1trend processing failed: {str(e)}")
ERROR(error_msg, exit=True)