Source code for pyacs.gts.lib.format.pos

"""Read and write GAMIT/GLOBK PBO pos files."""


###################################################################
## Loads Gts from GAMIT/GLOBK  pos file format for time series
###################################################################
[docs] def read_pos(self,tsdir='.',tsfile=None, xyz=True, verbose=False): """Read GAMIT/GLOBK PBO pos file and load the time series into this Gts. Parameters ---------- tsdir : str, optional Directory containing pos file(s). Default is '.'. tsfile : str, optional Pos file to load. If None, a file CODE*.pos is sought. Default is None. xyz : bool, optional If True, read XYZ and sx,sy,sz, corr columns. Default is True. verbose : bool, optional If True, print progress. Default is False. Returns ------- Gts self (data, code, X0, Y0, Z0, t0 populated). Notes ----- A pos file contains (almost) all needed info. If tsfile is None, read_pos looks for a file named CODE*.pos. """ # import import numpy as np import pyacs.lib.astrotime import pyacs.lib.coordinates import os import logging import pyacs.message.message as MESSAGE import pyacs.message.verbose_message as VERBOSE import pyacs.message.error as ERROR import pyacs.message.warning as WARNING import pyacs.message.debug_message as DEBUG from icecream import ic # name of the file to be read - if not provided, tries to guess if (tsfile is None): if (self.code is not None): from glob import glob try: pos_file=glob(tsdir+'/*'+self.code.upper()+'*.pos')[0] except: ERROR("Could not find any time series file for code ",self.code, exit=True) else: ERROR("no code or file provided.", exit=True) else: pos_file=tsfile # file self.ifile=os.path.abspath(pos_file) # actual read # get site code from header, handling both 4-character and 9-character IDs import re self.code = None with open(pos_file, "r") as f_pos: for _ in range(40): line = f_pos.readline() if not line: break match = re.match(r"\s*(?:4|9)-character ID\s*:\s*(\S+)", line, flags=re.IGNORECASE) if match: header_id = match.group(1).strip() self.code = header_id[:4].upper() if len(header_id) >= 4 else header_id.upper() break if self.code is None: # fallback for non-standard headers: keep backward compatibility with former behavior import linecache header_id_line = linecache.getline(pos_file, 3) header_id = header_id_line.split(':', 1)[-1].strip() if ':' in header_id_line else header_id_line.strip() self.code = header_id[:4].upper() if len(header_id) >= 4 else header_id.upper() # determine the header length # last header line has *YYYYMMDD header_length = 0 with open(pos_file, "r") as f_pos: for line in f_pos: if line.strip() and not line.startswith('*YYYYMMDD'): header_length += 1 else: header_length += 1 break if header_length == 0: ERROR("Could not determine header length. Returning Gts", exit=True) if xyz: # trust xyz and re-creates dneu data=np.genfromtxt(pos_file,skip_header=header_length,usecols=tuple(range(12))) # reshape to ensure a 2D array if data.ndim == 1: data=data.reshape((1,data.shape[0])) # convert dates to dec.year mjd_data=list(data[:,2]) dec_year=list(map(pyacs.lib.astrotime.mjd2decyear,mjd_data)) # fill data_xyz self.data_xyz=data[:,2:12] # fill data self.data_xyz[:,0]=np.array(dec_year) self.xyz2neu(corr=True) if np.isnan(self.data[:,4:]).any(): self.data[:,4:7]=1.E-3 self.data[:,7:10]=0. else: # for some reason, you prefer to trust dneu try: data=np.genfromtxt(pos_file,skip_header=header_length,usecols=tuple(range(15,24))) # reshape to ensure a 2D array if data.ndim==1: data=data.reshape((1,data.shape[0])) except: # there is a big problem raise IOError("!!! Error: Could not read dN,dE,dU from file: %s " % pos_file ) # convert dates to dec.year mjd_data=list(np.genfromtxt(pos_file,skip_header=header_length,usecols= 2)) dec_year=pyacs.lib.astrotime.mjd2decyear(mjd_data) # XYZ reference from header import linecache [self.X0,self.Y0,self.Z0]=list(map(float , linecache.getline(pos_file, 8).split(':')[-1].split()[:3] )) # fill data self.data = np.zeros( (dec_year.shape[0],10) ) self.data[:,1:]=data self.data[:,0]=np.array(dec_year) # fill t0 self.t0=self.data[0,0] # fill lon, lat lon_radian,lat_radian,self.h=pyacs.lib.coordinates.xyz2geo(self.X0,self.Y0,self.Z0) self.lon=np.degrees(lon_radian) self.lat=np.degrees(lat_radian) # check duplicate or non-ordered entries if self.data.shape[0]>1: if np.min(np.diff(self.data[:,0])) <=0: print("!!! time series not properly ordered by dates or dates duplicated ") self.reorder() # force clean self.offsets_dates=[] self.offsets_values=None self.outliers=[] self.annual=None self.semi_annual=None self.velocity=None return(self)
###################################################################
[docs] def write_pos(self,idir='./pos',add_key='' , force=None, verbose=False): ################################################################### """Write time series in GAMIT/GLOBK PBO pos format. Parameters ---------- idir : str, optional Output directory. Default is './pos'. add_key : str, optional If non-blank, output file is CODE_add_key.pos; otherwise CODE.pos. force : str, optional 'data' or 'data_xyz' to force source; None for default behavior. verbose : bool, optional If True, print progress. Default is False. Returns ------- Gts self. Notes ----- Default (force=None): if both data and data_xyz exist, both are written; if only data, uses X0,Y0,Z0 to write data_xyz; if only data_xyz, recreates data and writes. """ # import import pyacs.lib.coordinates import pyacs.lib.astrotime import numpy as np import os import logging import pyacs.message.message as MESSAGE import pyacs.message.verbose_message as VERBOSE import pyacs.message.error as ERROR import pyacs.message.warning as WARNING import pyacs.message.debug_message as DEBUG # force option if force is not None: if force not in ['data','data_xyz']: ERROR('force parameter must be either data or data_xyz. Returning Gts') return(self) if force == 'data': self.data_xyz = None if force == 'data_xyz': self.data = None # cdata if force != 'data_xyz': if not self.cdata(data=True): WARNING('Can not write .pos file. Problem with .data attribute. Returning Gts') self.cdata(data=True,verbose=True) return(self) ############################################### def __print_header_pos_file(f_pos,code,first_epoch,last_epoch,release_date,XYZ_ref,geo_ref,verbose=False): f_pos.write("PBO Station Position Time Series. Reference Frame : Unknown\n") f_pos.write("Format Version: 1.1.0\n") f_pos.write("4-character ID: %s\n" % code) f_pos.write("Station name : %s \n" % code) f_pos.write("First Epoch : %s\n" % first_epoch) f_pos.write("Last Epoch : %s\n" % last_epoch) f_pos.write("Release Date : %s\n" % release_date) f_pos.write("XYZ Reference position : %s\n" % XYZ_ref) f_pos.write("NEU Reference position : %s (Unknown/WGS84)\n" % geo_ref) f_pos.write("Start Field Description\n") f_pos.write("YYYYMMDD Year, month, day for the given position epoch\n") f_pos.write("HHMMSS Hour, minute, second for the given position epoch\n") f_pos.write("JJJJJ.JJJJJ Modified Julian day for the given position epoch\n") f_pos.write("X X coordinate, Specified Reference Frame, meters\n") f_pos.write("Y Y coordinate, Specified Reference Frame, meters\n") f_pos.write("Z Z coordinate, Specified Reference Frame, meters\n") f_pos.write("Sx Standard deviation of the X position, meters\n") f_pos.write("Sy Standard deviation of the Y position, meters\n") f_pos.write("Sz Standard deviation of the Z position, meters\n") f_pos.write("Rxy Correlation of the X and Y position\n") f_pos.write("Rxz Correlation of the X and Z position\n") f_pos.write("Ryz Correlation of the Y and Z position\n") f_pos.write("Nlat North latitude, WGS-84 ellipsoid, decimal degrees\n") f_pos.write("Elong East longitude, WGS-84 ellipsoid, decimal degrees\n") f_pos.write("Height (Up) Height relative to WGS-84 ellipsoid, m\n") f_pos.write("dN Difference in North component from NEU reference position, meters\n") f_pos.write("dE Difference in East component from NEU reference position, meters\n") f_pos.write("du Difference in vertical component from NEU reference position, meters\n") f_pos.write("Sn Standard deviation of dN, meters\n") f_pos.write("Se Standard deviation of dE, meters\n") f_pos.write("Su Standard deviation of dU, meters\n") f_pos.write("Rne Correlation of dN and dE\n") f_pos.write("Rnu Correlation of dN and dU\n") f_pos.write("Reu Correlation of dEand dU\n") f_pos.write("Soln corresponding to products generated with rapid or final orbit products, in supplemental processing, campaign data processing or reprocessing\n") f_pos.write("End Field Description\n") f_pos.write("*YYYYMMDD HHMMSS JJJJJ.JJJJ X Y Z Sx Sy Sz Rxy Rxz Ryz NLat Elong Height dN dE dU Sn Se Su Rne Rnu Reu Soln\n") ############################################### def __dndedu_to_pos_file(gts,verbose=False): """ Assuming that the Gts instance has dn de du coordinates, returns an list ready to be written a Gamit/Globk .pos file Requires the Gts instance to have high accuracy .lon .lat & .h Since X Y Z sigmas cannot be properly calculated they are set to 0.0 """ ################################# #def __format_cal_pos(lyear,lcal): # lformatted_date1=[] # lformatted_date2=[] # # for i in range(len(lyear)): # lformatted_date1.append(int("%d%02d%02d" % (lyear[i],lcal[i][1],lcal[i][0]))) # uts=pyacs.lib.astrotime.ut2uts(lcal[i][2]) # (h,mn,s,microsecond)=pyacs.lib.astrotime.uts2hmsmicros(np.around(uts)) # lformatted_date2.append(int(("%02d%02d%02d" % (h,mn,s)))) # return(lformatted_date1,lformatted_date2) ################################# def __format_cal_pos(np_decyear): lformatted_date1=[] lformatted_date2=[] for i in range(len(np_decyear)): mdt = pyacs.lib.astrotime.datetime_round_second( pyacs.lib.astrotime.decyear2datetime(np_decyear[i]) ) lformatted_date1.append(int("%d%02d%02d" % (mdt.year,mdt.month,mdt.day))) lformatted_date2.append(int(("%02d%02d%02d" % (mdt.hour,mdt.minute,mdt.second)))) return(lformatted_date1,lformatted_date2) # import import pyacs.lib.coordinates import pyacs.lib.astrotime import numpy as np # check for duplicate records VERBOSE('running reorder') self.reorder() dndedulamphih=np.zeros((gts.data.shape[0],6)) dndedulamphih[:,:3]=self.data[:,1:4] # case dndedu and XYZ exists if isinstance(gts.data,np.ndarray) and isinstance(gts.data_xyz,np.ndarray): VERBOSE("will write NEU data & XYZ data independently") # write data & data_xyz independently XYZ=gts.data_xyz[:,1:10] # case dndedu exits and XYZ does not exists if isinstance(gts.data,np.ndarray) and not isinstance(gts.data_xyz,np.ndarray): # creates data_xyz independently VERBOSE('creating XYZ data from NEU') if gts.X0 is not None: dndedulamphih[:,3]=gts.X0 dndedulamphih[:,4]=gts.Y0 dndedulamphih[:,5]=gts.Z0 # CHECK ENU or NEU tmp_XYZ=list(map(pyacs.lib.coordinates.denu_at_x0y0z0_to_xyz,dndedulamphih[:,1],dndedulamphih[:,0],dndedulamphih[:,2],dndedulamphih[:,3],dndedulamphih[:,4],dndedulamphih[:,5])) tmp_XYZ=np.array(tmp_XYZ) XYZ=np.zeros((gts.data.shape[0],10)) XYZ[:,:3]=tmp_XYZ[:,:3] else: # put 0 values XYZ=np.zeros((gts.data.shape[0],10)) # case dndedu does exist and XYZ exists if isinstance(gts.data_xyz,np.ndarray) and not isinstance(gts.data,np.ndarray): # write data & data_xyz independently VERBOSE('creating NEU data from XYZ') gts.xyz2neu(corr=True) XYZ=gts.data_xyz[:,1:10] # first & second columns - calendar date and time #lcal=map(list,map(pyacs.lib.astrotime.decyear2cal,gts.data[:,0])) #lyear=map(int,gts.data[:,0]) #ldate1,ldate2=__format_cal_pos(lyear,lcal) ldate1,ldate2=__format_cal_pos(gts.data[:,0]) # 3rd column - modified julian day lmjd=list(map(pyacs.lib.astrotime.decyear2mjd,gts.data[:,0])) # column 13-15 - geographical coordinates l_13_15=list(map(list,list(map(pyacs.lib.coordinates.xyz2geo,XYZ[:,0],XYZ[:,1],XYZ[:,2])))) # other columns lpos=[] for i in range(gts.data.shape[0]): if self.data.shape[1]==10: # self.data has date, dn,de,du,sdn,sde,sdu,Rne,Rnu,Reu new_pos=[ldate1[i],ldate2[i],lmjd[i]]+[XYZ[i,0],XYZ[i,1],XYZ[i,2],XYZ[i,3],XYZ[i,4],XYZ[i,5],XYZ[i,6],XYZ[i,7],XYZ[i,8]]+l_13_15[i]+gts.data[i,1:10].tolist()+['pyacs'] else: # self.data has date, dn,de,du,sdn,sde,sdu new_pos=[ldate1[i],ldate2[i],lmjd[i]]+[XYZ[i,0],XYZ[i,1],XYZ[i,2],XYZ[i,3],XYZ[i,4],XYZ[i,5],XYZ[i,6],XYZ[i,7],XYZ[i,8]]+l_13_15[i]+gts.data[i,1:7].tolist()+[0.0,0.0,0.0,'pyacs'] lpos.append(new_pos) return(lpos) ############################################### def __print_content_pos_file(f_pos,lformatted): import numpy as np for i in range(len(lformatted)): [YYYYMMDD,HHMMSS,MJD,X,Y,Z,Sx,Sy,Sz,Rxy,Rxz,Ryz,Elong,NLat,Height,dN,dE,dU,Sn,Se,Su,Rne,Rnu,Reu,Soln]=lformatted[i] f_pos.write\ (" %8d %6d %10.4lf %14.5lf %14.5lf %14.5lf %8.5lf %8.5lf %8.5lf %6.3lf %6.3lf %6.3lf %14.10lf %14.10lf %10.5lf %10.5lf%10.5lf%10.5lf %8.5lf %8.5lf %8.5lf %6.3lf %6.3lf %6.3lf %s\n" % \ (YYYYMMDD,HHMMSS,MJD,X,Y,Z,Sx,Sy,Sz,Rxy,Rxz,Ryz,np.degrees(NLat),np.degrees(Elong),Height,dN,dE,dU,Sn,Se,Su,Rne,Rnu,Reu,Soln)) ############################################### import logging import pyacs.message.message as MESSAGE import pyacs.message.verbose_message as VERBOSE import pyacs.message.error as ERROR import pyacs.message.warning as WARNING import pyacs.message.debug_message as DEBUG if not os.path.isdir(idir): try: os.makedirs(idir, exist_ok=True) except: WARNING("Could not create directory ",idir) # add_key if add_key != '': f_pos_name=idir+'/'+self.code+'_'+add_key+'.pos' else: f_pos_name=idir+'/'+self.code+'.pos' # open pos file if verbose: print("-- writing %s" % os.path.abspath(f_pos_name)) f_pos=open(f_pos_name,'w+') # X0,Y0,Z0 if self.X0 is not None: pass else: if self.lon is not None: self.X0,self.Y0,self.Z0=pyacs.lib.coordinates.geo2xyz(np.radians(self.lon),np.radians(self.lat),self.h) else: print("!!! Cannot produce a proper pos file because a reference position is needed") print("!!! Setting all XYZ values to 0") # populates info for pos file wpos=__dndedu_to_pos_file(self,verbose=verbose) sdecyear=self.data[0,0] (mday,month,ut)=pyacs.lib.astrotime.decyear2cal(sdecyear) uts=pyacs.lib.astrotime.ut2uts(ut) (h,mn,s, _microsecond)=pyacs.lib.astrotime.uts2hmsmicros(uts) first_epoch=("%d%02d%02d %02d%02d%02d"% (int(sdecyear),month,mday,h,mn,s)) edecyear=self.data[0,0] (mday,month,ut)=pyacs.lib.astrotime.decyear2cal(edecyear) uts=pyacs.lib.astrotime.ut2uts(ut) (h,mn,s, _microsecond)=pyacs.lib.astrotime.uts2hmsmicros(uts) last_epoch=("%d%02d%02d %02d%02d%02d"% (int(edecyear),month,mday,h,mn,s)) import datetime current_date=datetime.datetime.today() release_date=("%d%02d%02d %02d%02d%02d"% (current_date.year,current_date.month,current_date.day,current_date.hour,current_date.minute,current_date.second)) if self.X0 is not None: XYZ_ref=(" %14.5lf %14.5lf %14.5lf (unkwnon)" % (self.X0,self.Y0,self.Z0)) else: XYZ_ref='unknown' if self.lon is not None: geo_ref=(" %16.10lf %16.10lf %16.10lf (unkwnon)" % ( self.lat,self.lon,self.h )) else: geo_ref='unknown' __print_header_pos_file(f_pos,self.code,first_epoch,last_epoch,release_date,XYZ_ref,geo_ref) __print_content_pos_file(f_pos,wpos) f_pos.close() return(self)