"""
Functions for checking and analyzing L1-trend model quality.
"""
import numpy as np
import pyacs.lib.astrotime as at
import logging
import pyacs.message.message as MESSAGE
import pyacs.message.verbose_message as VERBOSE
import pyacs.message.error as ERROR
import pyacs.message.warning as WARNING
import pyacs.message.debug_message as DEBUG
[docs]
def check_l1_trend(ts, l1ts, component='ENU', min_samples_per_segment=4, threshold_bias_res_detect=40, threshold_vel=8, plot=False):
"""
Inspect the result from l1trend model of a time series.
Returns a list periods ([sdate,edate]) where bad modelling is suspected.
Parameters
----------
ts : pyacs.gts.Gts.Gts
Raw time series
l1ts : pyacs.gts.Gts.Gts
L1-trend time series
component : str
Components to be analyzed. Default: 'EN'
min_samples_per_segment : int
Minimum number of samples for a segment to be inspected (default: 6)
threshold_bias_res_detect : float
Threshold to detect bias residuals (default: 40)
threshold_vel : float
Instantaneous velocity threshold for a segment to be considered in detection (default: 8 mm/yr)
plot : bool
Plot the results (default: False)
Returns
-------
tuple
(H_period, H_cp, H_cp_pb)
- H_period: Dictionary of suspicious periods for each component
- H_cp: Dictionary of breakpoint indices for each component
- H_cp_pb: Dictionary of problematic breakpoint indices for each component
"""
# initialize
H_period = {}
H_cp = {}
H_cp_pb = {}
VERBOSE("Searching suspicious periods for %s" % (l1ts.code))
# define n_median_filter
n_median_filter = np.min([11, ts.data.shape[0]-1])
if n_median_filter % 2 == 0:
n_median_filter = n_median_filter - 1
# East component
if 'E' in component:
VERBOSE('Component E')
t = ts.data[:, 0]
y = ts.data[:, 2] * 1.E3
l1y = l1ts.data[:, 2] * 1.E3
fy = ts.median_filter(n=n_median_filter).data[:, 2] * 1.E3
lidx = make_node_ts(t, l1y)
H_cp['E'] = lidx
VERBOSE("Searching suspicious periods for component E ")
H_period['E'], H_cp_pb['E'] = compute_measure_bias(t, l1y, fy - l1y, lidx,
min_samples_per_segment=min_samples_per_segment,
threshold_bias_res_detect=threshold_bias_res_detect,
threshold_vel=threshold_vel)
VERBOSE("#%d periods suspected for component E" % len(H_period['E']))
# North component
if 'N' in component:
VERBOSE('Component N')
t = ts.data[:, 0]
y = ts.data[:, 1] * 1.E3
l1y = l1ts.data[:, 1] * 1.E3
fy = ts.median_filter(n=n_median_filter).data[:, 1] * 1.E3
lidx = make_node_ts(t, l1y)
H_cp['N'] = lidx
VERBOSE("Searching suspicious periods for component N ")
H_period['N'], H_cp_pb['N'] = compute_measure_bias(t, l1y, fy - l1y, lidx,
min_samples_per_segment=min_samples_per_segment,
threshold_bias_res_detect=threshold_bias_res_detect,
threshold_vel=threshold_vel)
VERBOSE("#%d periods suspected for component N" % len(H_period['N']))
# Up component
if 'U' in component:
VERBOSE('Component U')
t = ts.data[:, 0]
y = ts.data[:, 3] * 1.E3
l1y = l1ts.data[:, 3] * 1.E3
fy = ts.median_filter(n=n_median_filter).data[:, 3] * 1.E3
lidx = make_node_ts(t, l1y)
H_cp['U'] = lidx
VERBOSE("Searching suspicious periods for component U ")
H_period['U'], H_cp_pb['U'] = compute_measure_bias(t, l1y, fy - l1y, lidx,
min_samples_per_segment=min_samples_per_segment,
threshold_bias_res_detect=threshold_bias_res_detect,
threshold_vel=threshold_vel)
VERBOSE("#%d periods suspected for component U" % len(H_period['U']))
if plot:
ts.plot(superimposed=l1ts, lperiod=H_period)
return H_period, H_cp, H_cp_pb
[docs]
def measure_bias(t, y):
"""
Provide a measure of potential bias in a residual time series (0-100).
Parameters
----------
t : ndarray
Time array.
y : ndarray
Residual values.
Returns
-------
tuple
(res, resv): bias score and median absolute residual.
"""
nsample = y.shape[0]
mid_date = (t[-1] + t[0]) / 2
lidx_1 = np.where(t < mid_date)[0]
lidx_2 = np.where(t > mid_date)[0]
n_positif1 = np.where(y[lidx_1] >= 0)[0].shape[0]
n_negatif2 = np.where(y[lidx_2] <= 0)[0].shape[0]
n_negatif1 = lidx_1.shape[0] - n_positif1
n_positif2 = lidx_2.shape[0] - n_negatif2
res = np.max([np.fabs(((n_positif1 + n_negatif2) / nsample * 100.) - 50.) * 2,
np.fabs(((n_negatif1 + n_positif2) / nsample * 100.) - 50.) * 2])
resv = np.median(np.fabs(y))
return res, resv
[docs]
def make_node_ts(x, y, threshold=.5):
"""
Find breakpoints assuming y is piecewise linear.
Parameters
----------
x : ndarray
Time/abscissa.
y : ndarray
Values (piecewise linear).
threshold : float, optional
Threshold on change in slope to declare breakpoint.
Returns
-------
ndarray
Indices of breakpoints in the time series.
"""
dy = np.diff(y) / (np.diff(x))
cp = np.where(np.fabs(np.diff(dy)) > threshold)[0]
return cp + 1
[docs]
def compute_measure_bias(t, l1y, y, cp, min_samples_per_segment=6, threshold_bias_res_detect=40, threshold_vel=8,
verbose=True):
"""
Compute fit indicators from residual (obs minus piecewise linear).
Parameters
----------
t : ndarray
Time array.
l1y : ndarray
L1-trend values.
y : ndarray
Residual (obs - piecewise_linear).
cp : ndarray
Breakpoint indices.
min_samples_per_segment : int, optional
Minimum samples per segment (default 6).
threshold_bias_res_detect : float, optional
Threshold for bias detection (default 40).
threshold_vel : float, optional
Velocity threshold in mm/yr (default 8).
verbose : bool, optional
Verbose mode.
Returns
-------
tuple
(list of suspicious periods [sdate, edate], list of problematic breakpoint indices).
"""
datetime_t = at.decyear2datetime(t)
rms_l1 = np.median(np.fabs(y))
# computes average velocity
ivel = np.diff(l1y) / np.diff(t)
median_vel = np.median(ivel)
median_vel_change = np.median(np.fabs(ivel - median_vel))
lperiod = []
lcp = []
for i in np.arange(cp.shape[0] - 1):
wt = t[cp[i]:cp[i + 1] + 1]
wy = y[cp[i]:cp[i + 1] + 1]
if wt.shape[0] < min_samples_per_segment:
continue
mb, resv = measure_bias(wt, wy)
# threshold for velocity 4 * median scatter
rvel = np.fabs(np.mean(np.diff(l1y[cp[i]:cp[i + 1] + 1]) / np.diff(wt)) - median_vel) / median_vel_change
if mb > threshold_bias_res_detect and rvel > threshold_vel:
VERBOSE((" %s-%s nsample: %3d velocity_bias_measure: %.1lf rms_l1: %.1lf vs %.1lf rvel %.1lf " % \
(datetime_t[cp[i]].isoformat()[:10], datetime_t[cp[i + 1]].isoformat()[:10], wt.shape[0], mb,
resv, rms_l1, rvel)))
lperiod.append([t[cp[i]], t[cp[i + 1]]])
lcp.append(cp[i])
return lperiod, lcp