#-------------------------------------------------------------------------------
# Module : mathutils
# Purpose : Mathematical routines
# Author : P. Rebischung
# Created : 22-Aug-2014
#
# Changes :
#
# Routines : - dot : Matrix/matrix, matrix/vector and vector/vector multiplication
# - syminv : Invert a symmetric matrix
# - sympinv : Pseudo-invert a positive semi-definite symmetric matrix
# - rms : Compute RMS of a vector
# - trend : Fit linear model to a time series
# - detrend : Remove linear trend from a time series
# - cart2geo : Transform cartesian coordinates into geographical coordinates
# - geo2cart : Transform geographical coordinates into cartesian coordinates
# - xyz2enh : Compute rotation matrices from geocentric to topocentric coordinates
# - vondrak : Pass a Vondrak filter through a time series
# - lombscargle : Compute Lomb-Scargle periodogram of a time series
# - kolmo2d : 2D Kolmogorov-Smirnov test wrt uniform distribution over [0,1] x [0,1]
# - q2c : Compute correlation matrix from covariance matrix
# - chebychev : Compute Chebychev polynomials
# - spline_smooth : Cubic spline smoothing of data
#-------------------------------------------------------------------------------
# LIBRARIES
#-------------------------------------------------------------------------------
import sys
from math import *
import numpy
from scipy import linalg
from scipy import interpolate as interp
from .constants import *
#-------------------------------------------------------------------------------
# Routine : dot
# Purpose : Matrix/matrix, matrix/vector and vector/vector multiplication
# Author : P. Rebischung
# Created : 02-Aug-2013
#
# Changes :
#
# Input : - A : Matrix or vector
# - B : Matrix or vector
# Output : - C : A*B
#-------------------------------------------------------------------------------
[docs]
def dot(A, B):
# Matrix/matrix multiplication
if ((A.ndim == 2) and (B.ndim == 2)):
if (A.flags['C_CONTIGUOUS'] and B.flags['C_CONTIGUOUS']):
C = linalg.blas.dgemm(1.0, A, B)
elif (A.flags['C_CONTIGUOUS']):
C = linalg.blas.dgemm(1.0, A, B.T, trans_b=True)
elif (B.flags['C_CONTIGUOUS']):
C = linalg.blas.dgemm(1.0, A.T, B, trans_a=True)
else:
C = linalg.blas.dgemm(1.0, A.T, B.T, trans_a=True, trans_b=True)
# Matrix/vector multiplication
elif (A.ndim == 2):
if (A.flags['C_CONTIGUOUS']):
C = linalg.blas.dgemv(1.0, A, B)
else:
C = linalg.blas.dgemv(1.0, A.T, B, trans=True)
# Vector/vector multiplication
else:
C = linalg.blas.ddot(A, B)
return C
#-------------------------------------------------------------------------------
# Routine : syminv
# Purpose : Invert a symmetric matrix
# Author : P. Rebischung
# Created : 02-Aug-2013
#
# Changes :
#
# Input : - M : Matrix
# Output : - Q : Inverse matrix
#-------------------------------------------------------------------------------
[docs]
def syminv(M):
Q = linalg.lapack.dpotri(linalg.lapack.dpotrf(M)[0])[0]
Q = Q + Q.T
n = len(Q)
Q[list(range(n)), list(range(n))] = Q[list(range(n)), list(range(n))] / 2
return Q
#-------------------------------------------------------------------------------
# Routine : sympinv
# Purpose : Pseudo-invert a positive semi-definite symmetric matrix
# Author : P. Rebischung
# Created : 02-Aug-2013
#
# Changes :
#
# Input : - M : Matrix
# Output : - Q : Pseudo-inverse
#-------------------------------------------------------------------------------
[docs]
def sympinv(M):
(l, v, info) = linalg.lapack.dsyev(M)
t = sys.float_info.epsilon * numpy.max(l)
l[numpy.nonzero(l <= t)[0]] = numpy.inf
return dot(v / l, v.T)
#-------------------------------------------------------------------------------
# Routine : rms
# Purpose : Compute RMS of a vector
# Author : P. Rebischung
# Created : 07-Jul-2011
#
# Changes :
#
# Input : - x : Data vector
# Output : - s : RMS
#-------------------------------------------------------------------------------
[docs]
def rms(x):
return sqrt(dot(x, x) / len(x))
#-------------------------------------------------------------------------------
# Routine : trend
# Purpose : Fit linear model to a time series
# Author : P. Rebischung
# Created : 17-Nov-2011
#
# Changes :
#
# Input : - t : Dates
# - x : Time series
# Output : - a : Coefficients of the linear fit
#-------------------------------------------------------------------------------
[docs]
def trend(t, x):
n = len(t)
St = sum(t)
Sx = sum(x)
Stx = sum(t * x)
Stt = sum(t ** 2)
d = n * Stt - St ** 2
a = numpy.zeros(2)
a[0] = (Stt * Sx - St * Stx) / d
a[1] = (n * Stx - St * Sx) / d
return a
#-------------------------------------------------------------------------------
# Routine : detrend
# Purpose : Remove linear trend from a time series
# Author : P. Rebischung
# Created : 17-Nov-2011
#
# Changes :
#
# Input : - t : Dates
# - x : Time series
# Output : - d : Detrended time series
#-------------------------------------------------------------------------------
[docs]
def detrend(t, x):
a = trend(t, x)
d = x - a[0] - a[1] * t
return d
#-------------------------------------------------------------------------------
# Routine : cart2geo
# Purpose : Transform cartesian coordinates into geographical coordinates
# Author : P. Rebischung
# Created : 25-May-2011
#
# Changes :
#
# Input : - X : [X, Y, Z] (cartesian coordinates in m)
# Output : - phi : Latitudes (rad)
# - lam : Longitudes (rad)
# - h : Ellipsoidal heights (m)
#-------------------------------------------------------------------------------
[docs]
def cart2geo(X):
reshape = False
if (X.shape == (3,)):
reshape = True
X.resize(1, 3)
p = numpy.sqrt(X[..., 0] ** 2 + X[..., 1] ** 2)
r = numpy.sqrt(X[..., 0] ** 2 + X[..., 1] ** 2 + X[..., 2] ** 2)
u = numpy.arctan2(X[..., 2] / p * (1 - fe + ee ** 2 * ae / r), 1)
lam = 2 * numpy.arctan2(X[..., 1], X[..., 0] + p)
phi = numpy.arctan2(X[..., 2] * (1 - fe) + ee ** 2 * ae * numpy.sin(u) ** 3, (1 - fe) * (p - ee ** 2 * ae * numpy.cos(u) ** 3))
h = p * numpy.cos(phi) + X[..., 2] * numpy.sin(phi) - ae * numpy.sqrt(1 - ee ** 2 * numpy.sin(phi) ** 2)
# lam = numpy.arctan2(X[...,1], X[...,0])
# phi = numpy.arctan2(X[...,2], numpy.sqrt(X[...,0]**2 + X[...,1]**2))
# h = numpy.sqrt(X[...,0]**2 + X[...,1]**2 + X[...,2]**2) - ae
if (reshape):
X.resize(3)
phi = phi[0]
lam = lam[0]
h = h[0]
return (phi, lam, h)
#-------------------------------------------------------------------------------
# Routine : geo2cart
# Purpose : Transform geographical coordinates into cartesian coordinates
# Author : P. Rebischung
# Created : 25-May-2011
#
# Changes :
#
# Input : - phi : Latitudes (rad)
# - lam : Longitudes (rad)
# - h : Ellipsoidal heights (m)
# Output : - X : [X, Y, Z] (cartesian coordinates in m)
#-------------------------------------------------------------------------------
[docs]
def geo2cart(phi, lam, h):
N = ae / numpy.sqrt(1 - (ee * numpy.sin(phi)) ** 2)
if (type(phi) == type(float())):
X = numpy.zeros(3,)
else:
X = numpy.zeros(phi.shape + (3,))
X[..., 0] = (N + h) * numpy.cos(phi) * numpy.cos(lam)
X[..., 1] = (N + h) * numpy.cos(phi) * numpy.sin(lam)
X[..., 2] = (N * (1 - ee ** 2) + h) * numpy.sin(phi)
# X[...,0] = ae * numpy.cos(phi) * numpy.cos(lam)
# X[...,1] = ae * numpy.cos(phi) * numpy.sin(lam)
# X[...,2] = ae * numpy.sin(phi)
return X
#-------------------------------------------------------------------------------
# Routine : xyz2enh
# Purpose : Compute rotation matrices from geocentric to topocentric coordinates
# Author : P. Rebischung
# Created : 25-May-2011
#
# Changes :
#
# Input : - X : [X, Y, Z] (cartesian coordinates in m)
# Output : - R : Rotation matrices from geocentric to topocentric coordinates
#-------------------------------------------------------------------------------
[docs]
def xyz2enh(X):
reshape = False
if (X.shape == (3,)):
reshape = True
X.resize(1, 3)
(phi, lam, h) = cart2geo(X)
R = numpy.zeros(phi.shape + (3, 3))
R[..., 0, 0] = -numpy.sin(lam)
R[..., 0, 1] = numpy.cos(lam)
R[..., 1, 0] = -numpy.sin(phi) * numpy.cos(lam)
R[..., 1, 1] = -numpy.sin(phi) * numpy.sin(lam)
R[..., 1, 2] = numpy.cos(phi)
R[..., 2, 0] = numpy.cos(phi) * numpy.cos(lam)
R[..., 2, 1] = numpy.cos(phi) * numpy.sin(lam)
R[..., 2, 2] = numpy.sin(phi)
if (reshape):
X.resize(3)
R.resize((3, 3))
return R
#-------------------------------------------------------------------------------
# Routine : enh2uvh
# Purpose : Compute rotation matrices from topocentric frames of two stations to their UVH frame
# Author : P. Rebischung
# Created : 04-Oct-2016
#
# Changes :
#
# Input : - X1 : Cartesian coordinates of first station (m)
# - X2 : Cartesian coordinates of second station (m)
# Output : - R1 : Rotation matrix from ENH frame to UVH frame of first station
# - R2 : Rotation matrix from ENH frame to UVH frame of second station
#-------------------------------------------------------------------------------
[docs]
def enh2uvh(X1, X2):
(p1, l1, h1) = cart2geo(X1)
(p2, l2, h2) = cart2geo(X2)
az = atan2(sin(l2 - l1), cos(p1) * tan(p2) - sin(p1) * cos(l2 - l1))
s = sin(az)
c = cos(az)
R1 = numpy.zeros((3, 3))
R1[0, 0] = s
R1[0, 1] = c
R1[1, 0] = -c
R1[1, 1] = s
R1[2, 2] = 1
az = atan2(sin(l1 - l2), cos(p2) * tan(p1) - sin(p2) * cos(l1 - l2))
s = sin(az + pi)
c = cos(az + pi)
R2 = numpy.zeros((3, 3))
R2[0, 0] = s
R2[0, 1] = c
R2[1, 0] = -c
R2[1, 1] = s
R2[2, 2] = 1
return (R1, R2)
#-------------------------------------------------------------------------------
# Routine : vondrak
# Purpose : Pass a Vondrak filter through a time series
# Author : P. Rebischung
# Created : 17-Dec-2011
#
# Changes :
#
# Input : - t : Dates
# - x : Time series
# - fc : Cutoff frequency
# - return_partials : True if the partial derivatives dd/dy should be
# returned. Default is False.
# Output : - xs : Filtered time series
#-------------------------------------------------------------------------------
[docs]
def vondrak(t, x, fc, return_partials=False):
import numpy
import scipy.linalg as linalg
eps = (7.223147119819503 * fc) ** 6 / (len(t) - 3)
num = 6 * numpy.sqrt(t[2:-1] - t[1:-2])
den = numpy.sqrt(t[-1] - t[0])
a = numpy.hstack((0, 0, 0, num / den / ((t[0:-3] - t[1:-2]) * (t[0:-3] - t[2:-1]) * (t[0:-3] - t[3:])), 0, 0, 0))
b = numpy.hstack((0, 0, 0, num / den / ((t[1:-2] - t[0:-3]) * (t[1:-2] - t[2:-1]) * (t[1:-2] - t[3:])), 0, 0, 0))
c = numpy.hstack((0, 0, 0, num / den / ((t[2:-1] - t[0:-3]) * (t[2:-1] - t[1:-2]) * (t[2:-1] - t[3:])), 0, 0, 0))
d = numpy.hstack((0, 0, 0, num / den / ((t[3:] - t[0:-3]) * (t[3:] - t[1:-2]) * (t[3:] - t[2:-1])), 0, 0, 0))
d0 = eps + a[3:] ** 2 + b[2:-1] ** 2 + c[1:-2] ** 2 + d[0:-3] ** 2
d1 = a[3:-1] * b[3:-1] + b[2:-2] * c[2:-2] + c[1:-3] * d[1:-3]
d2 = a[3:-2] * c[3:-2] + b[2:-3] * d[2:-3]
d3 = a[3:-3] * d[3:-3]
A = numpy.diag(d0) + numpy.diag(d1, 1) + numpy.diag(d1, -1) + numpy.diag(d2, 2) + numpy.diag(d2, -2) + numpy.diag(d3, 3) + numpy.diag(d3, -3)
if (return_partials):
A = eps * syminv(A)
xs = dot(A, x)
return (xs, A)
else:
xs = linalg.solve(A, eps * x)
return xs
#-------------------------------------------------------------------------------
# Routine : lombscargle
# Purpose : Compute Lomb-Scargle periodogram of a time series
# Author : P. Rebischung
# Created : 17-Dec-2011
#
# Changes :
#
# Input : - t : Dates
# - x : Time series
# - sf : Oversampling factor. Default is 4.
# - f : Frequencies. Default is None (automatically set).
# - normalize : True for a normalized periodogram. Default is False.
# Output : - f : Frequencies
# - p : Powers
#-------------------------------------------------------------------------------
[docs]
def lombscargle(t, x, sf=4, f=None, normalize=False):
# Length and time span of the series
n = len(x)
T = t[-1] - t[0]
# Normalize time series if needed
if (normalize):
x = x - numpy.mean(x)
x = x / numpy.std(x)
# Build list of frequencies
if (type(f).__name__ == 'NoneType'):
f0 = (n - 1) / (n * T * sf)
fc = (n - 1) / (2 * T)
f = numpy.arange(f0, fc + f0, f0)
# Initialize periodogram
p = numpy.zeros(len(f))
# Loop over frequencies
for i in range(len(f)):
w = 2 * pi * f[i]
# Compute some sums
xc = numpy.sum(x * numpy.cos(w * t))
xs = numpy.sum(x * numpy.sin(w * t))
cc = numpy.sum(numpy.cos(w * t) ** 2)
ss = numpy.sum(numpy.sin(w * t) ** 2)
cs = numpy.sum(numpy.cos(w * t) * numpy.sin(w * t))
# Offset
tau = atan2(2 * cs, cc - ss) / (2 * w)
ctau = cos(w * tau)
stau = sin(w * tau)
# Power at frequency i
p[i] = (ctau * xc + stau * xs) ** 2 / (ctau ** 2 * cc + 2 * ctau * stau * cs + stau ** 2 * ss) + (ctau * xs - stau * xc) ** 2 / (ctau ** 2 * ss - 2 * ctau * stau * cs + stau ** 2 * cc)
return (f, p)
#-------------------------------------------------------------------------------
# Routine : kolmo2d
# Purpose : 2D Kolmogorov-Smirnov test wrt uniform distribution over [0,1] x [0,1]
# Author : P. Rebischung
# Created : 08-Feb-2012
#
# Changes :
#
# Input : - x, y : Coordinates of data points
# Output : - D : Maximal absolute difference between theoretical and empirical
# cumulative distributions
#-------------------------------------------------------------------------------
[docs]
def kolmo2d(x, y):
# Initializations
D = 0
n = len(x)
# Loop over data points
for i in range(n):
# Count number of other data points in each quadrant
n1a = numpy.sum(numpy.logical_and(x < x[i], y < y[i]))
n2a = numpy.sum(numpy.logical_and(x < x[i], y > y[i]))
n3a = numpy.sum(numpy.logical_and(x > x[i], y < y[i]))
n4a = numpy.sum(numpy.logical_and(x > x[i], y > y[i]))
n1b = numpy.sum(numpy.logical_and(x <= x[i], y <= y[i]))
n2b = numpy.sum(numpy.logical_and(x <= x[i], y >= y[i]))
n3b = numpy.sum(numpy.logical_and(x >= x[i], y <= y[i]))
n4b = numpy.sum(numpy.logical_and(x >= x[i], y >= y[i]))
# Theoretical probabilities for each quadrant
p1 = x[i] * y[i]
p2 = x[i] * (1 - y[i])
p3 = (1 - x[i]) * y[i]
p4 = (1 - x[i]) * (1 - y[i])
# Compare empirical and theoretical distributions
d1a = abs(p1 - float(n1a) / float(n))
d1b = abs(p1 - float(n1b) / float(n))
d2a = abs(p2 - float(n2a) / float(n))
d2b = abs(p2 - float(n2b) / float(n))
d3a = abs(p3 - float(n3a) / float(n))
d3b = abs(p3 - float(n3b) / float(n))
d4a = abs(p4 - float(n4a) / float(n))
d4b = abs(p4 - float(n4b) / float(n))
# Update max absolute difference if neccesary
d = max([d1a, d1b, d2a, d2b, d3a, d3b, d4a, d4b])
if (d > D):
D = d
return D
#-------------------------------------------------------------------------------
# Routine : Q2C
# Purpose : Compute correlation matrix from covariance matrix
# Author : P. Rebischung
# Created : 21-Feb-2012
#
# Changes :
#
# Input : - Q : Covariance matrix
# Output : - C : Correlation matrix
#-------------------------------------------------------------------------------
[docs]
def Q2C(Q):
f = 1. / numpy.sqrt(numpy.diag(Q))
return f * (Q * f).T
#-------------------------------------------------------------------------------
# Routine : chebychev
# Purpose : Compute Chebychev polynomials
# Author : P. Rebischung
# Created : 14-Mar-2012
#
# Changes :
#
# Input : - x : Array of abscissas in [-1,1]
# - n : Number of polynomials to compute
# Output : - y : Values of polynomials in x
#-------------------------------------------------------------------------------
[docs]
def chebychev(x, n):
# Initialization
y = numpy.zeros((len(x), n))
# Degree 0 and 1 polynomials
y[:, 0] = 1
if (n > 1):
y[:, 1] = x
# Degree >=2 polynomials
for i in range(2, n):
y[:, i] = 2 * x * y[:, i - 1] - y[:, i - 2]
return y
#-------------------------------------------------------------------------------
# Routine : spline_smooth
# Purpose : Cubic spline smoothing of data
# Author : P. Rebischung
# Created : 26-Sep-2013
#
# Changes :
#
# Input : - x : Abscissas
# - y : Ordinates
# - s : Sigmas
# - l : Smoothing parameter
# - return_partials : True if the partial derivatives dd/dy should be
# returned. Default is False.
# Output : - d : Smoothed ordinates
#-------------------------------------------------------------------------------
[docs]
def spline_smooth(x, y, s, l, return_partials=False):
# Useful things
n = len(x) - 1
mu = 2 * (1 - l) / (3 * l)
h = x[1:] - x[:-1]
p = 2 * (h[:-1] + h[1:])
r = 3 / h
f = -(r[:-1] + r[1:])
# Some matrices
S = numpy.diag(s)
R = numpy.diag(p) + numpy.diag(h[1:-1], 1) + numpy.diag(h[1:-1], -1)
Q = numpy.zeros((n + 1, n - 1))
Q[0:n - 1, :] = Q[0:n - 1, :] + numpy.diag(r[:-1])
Q[1:n + 0, :] = Q[1:n + 0, :] + numpy.diag(f)
Q[2:n + 1, :] = Q[2:n + 1, :] + numpy.diag(r[1:])
# Compute the partial derivatives dd/dy
D = syminv(mu * dot(Q.T, dot(S, Q)) + R)
D = numpy.eye(n + 1) - mu * dot(S, dot(Q, dot(D, Q.T)))
# Smoothed data
d = dot(D, y)
if (return_partials):
return (d, D)
else:
return d