Source code for pyacs.gts.lib.model.detrend_hectorp

from __future__ import annotations

[docs] def detrend_hectorp(self, component='NEU', **kwargs): """Detrend a time series using the Hector model (HECTOR from Machiel Bos). References: Bos et al. (2013), J. Geodesy 87(4), 351-360. See https://gitlab.com/machielsimonbos/hectorp and estimatetrend wiki. Parameters ---------- component : str, optional Component(s) to detrend ('N', 'E', 'U' or 'NEU'). Default is 'NEU'. **kwargs : dict, optional Additional arguments for the Hector control file (estimatetrend). Returns ------- Gts Residual time series with velocity (and sigmas), offsets attributes. """ import numpy as np from pyacs.gts.Gts import Gts import inspect import os import logging import pyacs.message.message as MESSAGE import pyacs.message.verbose_message as VERBOSE import pyacs.message.error as ERROR import pyacs.message.warning as WARNING import pyacs.message.debug_message as DEBUG # check if hectorp is installed by running os command estimatetrend import subprocess result = subprocess.run(['estimatetrend', '--help'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) if result.returncode != 0: ERROR("hectorp is not installed. Please install it from https://gitlab.com/machielsimonbos/hectorp", exit=True) # function to convert a Gts object to a HectorP .mom format def gts2mom(gts): import pyacs.lib.astrotime as at import numpy as np date_mom = at.decyear2mjd(gts.data[:, 0]) # force mid-day (integer MJD day) date_mom = np.floor(date_mom).astype(int) + 0.5 pos_mom = gts.data[:, 1:4]*1.E3 # header header = "sampling period 1.000000" # offsets if gts.offsets_dates is not None : for date in gts.offsets_dates: header += "\noffset %f " % (at.decyear2mjd(date)) # create output directory if it does not exist import os if not os.path.exists('tmp_hectorp'): os.makedirs('tmp_hectorp') # north out_mom_north = 'tmp_hectorp/'+gts.code + "_north.mom" np.savetxt(out_mom_north, np.c_[date_mom, pos_mom[:, 0]], fmt="%10.5f %15.6f", header=header) # east out_mom_east = 'tmp_hectorp/'+gts.code + "_east.mom" np.savetxt(out_mom_east, np.c_[date_mom, pos_mom[:, 1]], fmt="%10.5f %15.6f", header=header) # up out_mom_up = 'tmp_hectorp/'+gts.code + "_up.mom" np.savetxt(out_mom_up, np.c_[date_mom, pos_mom[:, 2]], fmt="%10.5f %15.6f", header=header) # parse estimatetrend report def parse_estimatetrend_report(text: str) -> Dict[str, Any]: """ Parse 'estimatetrend' text output and extract: - noise_models: list of dicts - bias: dict(value, sigma, epoch) - trend: dict(value, sigma, unit) - seasonal: dict(period -> {cos/sin/amp/pha: {value, sigma, unit/deg}}) - offsets: list of dicts(epoch, value, sigma, unit) """ import re from dataclasses import dataclass, asdict from pathlib import Path from typing import Any, Dict, List, Optional, Union @dataclass class NoiseModel: name: str fraction: Optional[float] = None sigma: Optional[float] = None sigma_unit: Optional[str] = None d: Optional[float] = None kappa: Optional[float] = None one_minus_phi: Optional[float] = None def _to_float(x: str) -> float: # Handles " 3.0183", "-1.0000", "0.0000", etc. return float(x.strip()) out: Dict[str, Any] = { "noise_models": [], "bias": None, "trend": None, "seasonal": {}, # keyed by period (float) "offsets": [], } # -------- Noise Models block -------- # We'll locate "Noise Models" section and parse model sub-blocks like: # FlickerGGM: # fraction = 0.36643 # sigma = 3.0183 mm/yr^0.25 # d = 0.5000 (fixed) # kappa = -1.0000 (fixed) # 1-phi = 0.0000 (fixed) # # White: # fraction = 0.63357 # sigma = 0.9079 mm # ... noise_section_match = re.search( r"(?ms)^Noise Models\s*\n-+\s*\n(?P<body>.*?)(?:\n\s*\n|^bias\s*:|^trend\s*:|^offset\b|$)", text, ) if noise_section_match: body = noise_section_match.group("body") # Split into model blocks by headers "Name:" # Keep header lines using a regex finditer. model_header_re = re.compile(r"(?m)^(?P<name>[A-Za-z0-9_\-]+)\s*:\s*$") headers = list(model_header_re.finditer(body)) for i, h in enumerate(headers): name = h.group("name") start = h.end() end = headers[i + 1].start() if i + 1 < len(headers) else len(body) block = body[start:end] nm = NoiseModel(name=name) m = re.search(r"(?m)^\s*fraction\s*=\s*(?P<v>[+-]?\d+(?:\.\d+)?)\s*$", block) if m: nm.fraction = _to_float(m.group("v")) m = re.search( r"(?m)^\s*sigma\s*=\s*(?P<v>[+-]?\d+(?:\.\d+)?)\s*(?P<u>.+?)\s*$", block, ) if m: nm.sigma = _to_float(m.group("v")) nm.sigma_unit = m.group("u").strip() m = re.search(r"(?m)^\s*d\s*=\s*(?P<v>[+-]?\d+(?:\.\d+)?)\b", block) if m: nm.d = _to_float(m.group("v")) m = re.search(r"(?m)^\s*kappa\s*=\s*(?P<v>[+-]?\d+(?:\.\d+)?)\b", block) if m: nm.kappa = _to_float(m.group("v")) m = re.search(r"(?m)^\s*1-phi\s*=\s*(?P<v>[+-]?\d+(?:\.\d+)?)\b", block) if m: nm.one_minus_phi = _to_float(m.group("v")) out["noise_models"].append(asdict(nm)) # -------- bias -------- # bias : 4.320 +/- 0.946 (at 55965.50) m = re.search( r"(?m)^\s*bias\s*:\s*(?P<v>[+-]?\d+(?:\.\d+)?)\s*\+/-\s*(?P<s>[+-]?\d+(?:\.\d+)?)\s*\(at\s*(?P<t>[+-]?\d+(?:\.\d+)?)\)\s*$", text, ) if m: out["bias"] = {"value": _to_float(m.group("v")), "sigma": _to_float(m.group("s")), "epoch": _to_float(m.group("t"))} # -------- trend -------- # trend: 0.955 +/- 0.164 mm/days m = re.search( r"(?m)^\s*trend\s*:\s*(?P<v>[+-]?\d+(?:\.\d+)?)\s*\+/-\s*(?P<s>[+-]?\d+(?:\.\d+)?)\s*(?P<u>.+?)\s*$", text, ) if m: out["trend"] = {"value": _to_float(m.group("v")), "sigma": _to_float(m.group("s")), "unit": m.group("u").strip()} # -------- seasonal terms -------- # cos 365.250 : -0.387 +/- 0.139 mm # sin 365.250 : -0.158 +/- 0.141 mm # amp 365.250 : 0.442 +/- 0.135 mm # pha 365.250 : -111.046 +/- 108.700 degrees seasonal_re = re.compile( r"(?m)^\s*(?P<kind>cos|sin|amp|pha)\s+(?P<per>\d+(?:\.\d+)?)\s*:\s*(?P<v>[+-]?\d+(?:\.\d+)?)\s*\+/-\s*(?P<s>[+-]?\d+(?:\.\d+)?)\s*(?P<u>mm|degrees)\s*$" ) for m in seasonal_re.finditer(text): kind = m.group("kind") per = float(m.group("per")) v = _to_float(m.group("v")) s = _to_float(m.group("s")) u = m.group("u").strip() if per not in out["seasonal"]: out["seasonal"][per] = {} out["seasonal"][per][kind] = {"value": v, "sigma": s, "unit": u} # -------- offsets -------- # offset at 56129.5000 : -4.77 +/- 0.61 mm offset_re = re.compile( r"(?m)^\s*offset\s+at\s+(?P<t>[+-]?\d+(?:\.\d+)?)\s*:\s*(?P<v>[+-]?\d+(?:\.\d+)?)\s*\+/-\s*(?P<s>[+-]?\d+(?:\.\d+)?)\s*(?P<u>mm)\s*$" ) for m in offset_re.finditer(text): out["offsets"].append( {"epoch": _to_float(m.group("t")), "value": _to_float(m.group("v")), "sigma": _to_float(m.group("s")), "unit": m.group("u").strip()} ) return out def run_hectorp(gts, component='ENU', **kwargs): import os import subprocess from icecream import ic import numpy as np import pyacs.lib.astrotime as at import logging import pyacs.message.message as MESSAGE import pyacs.message.verbose_message as VERBOSE import pyacs.message.error as ERROR import pyacs.message.warning as WARNING import pyacs.message.debug_message as DEBUG # check tmp_hector exists if not os.path.exists('tmp_hectorp'): os.makedirs('tmp_hectorp') # new data new_data = np.zeros((gts.data.shape[0],3)) # create control file base = gts.code default_ctl = { "DataFile": f"{base}_north.mom", "DataDirectory": os.getcwd()+'/tmp_hectorp', "OutputFile": os.getcwd()+'/tmp_hectorp/'+f"{base}_north.out", "TimeUnit": "days", "interpolate": "no", "PhysicalUnit": "mm", "ScaleFactor": 1.0, "periodicsignals": "365.25 182.625", "DegreePolynomial": 1, "NoiseModels": "GGM White", "GGM_1mphi": "6.9e-06", "useRMLE" : "no", "Verbose": "yes" } # update with user-provided parameters default_ctl.update(kwargs) link_path = 'estimatetrend.ctl' # North component if 'N' in component: default_ctl["DataFile"] = f"{base}_north.mom" default_ctl["OutputFile"] = os.getcwd()+'/tmp_hectorp/'+f"{base}_north.out" # write control file for noth component ctl_file = 'tmp_hectorp/'+gts.code + "_north.ctl" if os.path.exists(ctl_file): os.remove(ctl_file) with open(ctl_file, "w") as f: for k, v in default_ctl.items(): f.write(f"{k} {v}\n") # run HectorP command # Remove existing file/symlink (including broken symlink) if os.path.lexists(link_path): os.unlink(link_path) os.symlink(ctl_file, link_path) # run HectorP command cmd = ['estimatetrend'] VERBOSE("Running HectorP for north component (%d data) with Noise Models = %s" % (gts.data.shape[0], default_ctl["NoiseModels"])) completed = subprocess.run(cmd, check=True, text=True, capture_output=True) # get HectorP output results_north = parse_estimatetrend_report(completed.stdout) # check processing is OK if results_north["trend"] is None: ERROR("HectorP processing failed for north component. ", exit=True) # populate new_data new_data[:, 0] = np.genfromtxt(default_ctl["OutputFile"], usecols=2)*1.E-3 # write completed.stdout to file with open(os.path.join('tmp_hectorp', f"{base}_north.log"), "w") as f: f.write(completed.stdout) # East component if 'E' in component: default_ctl["DataFile"] = f"{base}_east.mom" default_ctl["OutputFile"] = os.getcwd()+'/tmp_hectorp/'+f"{base}_east.out" # write control file for east component ctl_file = 'tmp_hectorp/'+gts.code + "_east.ctl" if os.path.exists(ctl_file): os.remove(ctl_file) with open(ctl_file, "w") as f: for k, v in default_ctl.items(): f.write(f"{k} {v}\n") # run HectorP command # Remove existing file/symlink (including broken symlink) if os.path.lexists(link_path): os.unlink(link_path) os.symlink(ctl_file, link_path) cmd = ['estimatetrend'] VERBOSE("Running HectorP for east component (%d data) with Noise Models = %s" % (gts.data.shape[0], default_ctl["NoiseModels"])) completed = subprocess.run(cmd, check=True, text=True, capture_output=True) # get HectorP output results_east = parse_estimatetrend_report(completed.stdout) if results_east["trend"] is None: ERROR("HectorP processing failed for east component. ", exit=True) # populate new_data new_data[:, 1] = np.genfromtxt(default_ctl["OutputFile"], usecols=2)*1.E-3 # write completed.stdout to file with open(os.path.join('tmp_hectorp', f"{base}_east.log"), "w") as f: f.write(completed.stdout) # Up component if 'U' in component: default_ctl["DataFile"] = f"{base}_up.mom" default_ctl["OutputFile"] = os.getcwd()+'/tmp_hectorp/'+f"{base}_up.out" # write control file for up component ctl_file = 'tmp_hectorp/'+gts.code + "_up.ctl" if os.path.exists(ctl_file): os.remove(ctl_file) with open(ctl_file, "w") as f: for k, v in default_ctl.items(): f.write(f"{k} {v}\n") # run HectorP command # Remove existing file/symlink (including broken symlink) if os.path.lexists(link_path): os.unlink(link_path) os.symlink(ctl_file, link_path) cmd = ['estimatetrend'] VERBOSE("Running HectorP for up component (%d data) with Noise Models = %s" % (gts.data.shape[0], default_ctl["NoiseModels"])) completed = subprocess.run(cmd, check=True, text=True, capture_output=True) # get HectorP output results_up = parse_estimatetrend_report(completed.stdout) if results_up["trend"] is None: ERROR("HectorP processing failed for up component. ", exit=True) # populate new_data new_data[:, 2] = np.genfromtxt(default_ctl["OutputFile"], usecols=2)*1.E-3 # write completed.stdout to file with open(os.path.join('tmp_hectorp', f"{base}_up.log"), "w") as f: f.write(completed.stdout) # Populate and return residual gts new_gts = gts.copy() new_gts.data[:, 1:4] = new_gts.data[:, 1:4] - new_data # robust velocity assignment: keep existing values when available, fill only processed components if getattr(new_gts, "velocity", None) is not None and len(new_gts.velocity) >= 6: velocity = np.array(new_gts.velocity[:6], dtype=float) else: velocity = np.full(6, np.nan, dtype=float) if "results_north" in locals() and results_north.get("trend") is not None: velocity[0] = results_north["trend"].get("value", np.nan) velocity[3] = results_north["trend"].get("sigma", np.nan) if "results_east" in locals() and results_east.get("trend") is not None: velocity[1] = results_east["trend"].get("value", np.nan) velocity[4] = results_east["trend"].get("sigma", np.nan) if "results_up" in locals() and results_up.get("trend") is not None: velocity[2] = results_up["trend"].get("value", np.nan) velocity[5] = results_up["trend"].get("sigma", np.nan) new_gts.velocity = velocity * 1.E-3 # convert from mm/yr to m/yr # offsets if gts.offsets_dates is not None: np_offsets_values = np.zeros((len(gts.offsets_dates), 7)) # epoch + 3 components + 3 sigmas if "results_north" in locals() and results_north.get("offsets") != []: for i, offset in enumerate(results_north["offsets"]): np_offsets_values[i, 0] = at.mjd2decyear(offset.get("epoch", np.nan)) np_offsets_values[i, 1] = offset.get("value", np.nan) * 1.E-3 # convert from mm to m np_offsets_values[i, 4] = offset.get("sigma", np.nan) * 1.E-3 # convert from mm to m if "results_east" in locals() and results_east.get("offsets") != []: for i, offset in enumerate(results_east["offsets"]): np_offsets_values[i, 0] = at.mjd2decyear(offset.get("epoch", np.nan)) np_offsets_values[i, 2] = offset.get("value", np.nan) * 1.E-3 # convert from mm to m np_offsets_values[i, 5] = offset.get("sigma", np.nan) * 1.E-3 # convert from mm to m if "results_up" in locals() and results_up.get("offsets") != []: for i, offset in enumerate(results_up["offsets"]): np_offsets_values[i, 0] = at.mjd2decyear(offset.get("epoch", np.nan)) np_offsets_values[i, 3] = offset.get("value", np.nan) * 1.E-3 # convert from mm to m np_offsets_values[i, 6] = offset.get("sigma", np.nan) * 1.E-3 # convert from mm to m new_gts.offsets_values = np_offsets_values # annual and semi annual terms np_annual = np.zeros(6) np_semi_annual = np.zeros(6) if "results_north" in locals() and results_north.get("seasonal") != []: amp = results_north["seasonal"].get(365.25, {}).get("amp", {}).get("value", np.nan) pha = results_north["seasonal"].get(365.25, {}).get("pha", {}).get("value", np.nan) np_annual[0] = amp * np.cos(pha/180.*np.pi) * 1.E-3 # convert from mm to m np_annual[1] = amp * np.sin(pha/180.*np.pi) * 1.E-3 # convert from mm to m if "results_east" in locals() and results_east.get("seasonal") != []: amp = results_east["seasonal"].get(365.25, {}).get("amp", {}).get("value", np.nan) pha = results_east["seasonal"].get(365.25, {}).get("pha", {}).get("value", np.nan) np_annual[2] = amp * np.cos(pha/180.*np.pi) * 1.E-3 # convert from mm to m np_annual[3] = amp * np.sin(pha/180.*np.pi) * 1.E-3 # convert from mm to m if "results_up" in locals() and results_up.get("seasonal") != []: amp = results_up["seasonal"].get(365.25, {}).get("amp", {}).get("value", np.nan) pha = results_up["seasonal"].get(365.25, {}).get("pha", {}).get("value", np.nan) np_annual[4] = amp * np.cos(pha/180.*np.pi) * 1.E-3 # convert from mm to m np_annual[5] = amp * np.sin(pha/180.*np.pi) * 1.E-3 # convert from mm to m if np_annual.any() != 0: new_gts.annual = np_annual # semi_annual if "results_north" in locals() and results_north.get("seasonal") != []: amp = results_north["seasonal"].get(182.625, {}).get("amp", {}).get("value", np.nan) pha = results_north["seasonal"].get(182.625, {}).get("pha", {}).get("value", np.nan) np_semi_annual[0] = amp * np.cos(pha/180.*np.pi) * 1.E-3 # convert from mm to m np_semi_annual[1] = amp * np.sin(pha/180.*np.pi) * 1.E-3 # convert from mm to m if "results_east" in locals() and results_east.get("seasonal") != []: amp = results_east["seasonal"].get(182.625, {}).get("amp", {}).get("value", np.nan) pha = results_east["seasonal"].get(182.625, {}).get("pha", {}).get("value", np.nan) np_semi_annual[2] = amp * np.cos(pha/180.*np.pi) * 1.E-3 # convert from mm to m np_semi_annual[3] = amp * np.sin(pha/180.*np.pi) * 1.E-3 # convert from mm to m if "results_up" in locals() and results_up.get("seasonal") != []: amp = results_up["seasonal"].get(182.625, {}).get("amp", {}).get("value", np.nan) pha = results_up["seasonal"].get(182.625, {}).get("pha", {}).get("value", np.nan) np_semi_annual[4] = amp * np.cos(pha/180.*np.pi) * 1.E-3 # convert from mm to m np_semi_annual[5] = amp * np.sin(pha/180.*np.pi) * 1.E-3 # convert from mm to m if np_semi_annual.any() != 0: new_gts.semi_annual = np_semi_annual # populate the noise attribute - todo return new_gts # MAIN gts2mom(self) new_gts = run_hectorp(self, component=component, **kwargs) return new_gts