Source code for profiler.utils.offsets

""" Python functions for calculating offsets and distances
    from a line 
"""

import numpy as np

from IPython import embed

[docs] def calc_dist_offset(lons:np.ndarray, lats:np.ndarray, endpoints:tuple, debug:bool=False): """ Calculate the distnace from shore and offset from a line for a given line Args: lons (np.ndarray): longitudes lats (np.ndarray): latitudes endpoints (tuple, optional): endpoints of the "line". Defaults to None. lonendpts latendpts debug (bool, optional): If True, print debug information. Defaults to False. Returns: tuple: dist, offset dist = distance from the start of the line offset = offset from the line (normal) """ # Endpoints lonendpts, latendpts = endpoints # Unpack lon0, lon1 = lonendpts lat0, lat1 = latendpts # Constants nm2km = 1.852 deg2min = 60. deg2rad = np.pi/180 deg2km=deg2min*nm2km # Calculate angle of new coordinate system relative to east dyy = (lat1-lat0) dxx = np.cos(1/2*(lat1+lat0)*deg2rad)*(lon1-lon0) theta = np.arctan2(dyy,dxx) #print(f"calc_dist_offset: theta={theta} rad, {theta*180/np.pi} deg") # Calculate x, y of lon, lat relative to start of line dy = (lats-lat0)*deg2km; dx = np.cos(1/2*(lat1+lat0)*deg2rad)*(lons-lon0)*deg2km #dx = np.cos(lat0*deg2rad)*(lons-lon0)*deg2km if debug: embed(header='calc_dist_offset debug') # Calculate dist, offset in new coordinate system by rotating z=dx+1j*dy zhat=z*np.exp(-1j*theta) if np.any(np.isnan(zhat)): embed(header='NaN in zhat') raise ValueError('calc_dist_offset: NaN in zhat') # Finish dist=np.real(zhat) offset=np.imag(zhat) # Return return dist, offset
[docs] def jsonify(obj, debug=False): """ Recursively process an object so it can be serialised in json format. Taken from linetools. WARNING - the input object may be modified if it's a dictionary or list! Parameters ---------- obj : any object debug : bool, optional Returns ------- obj - the same obj is json_friendly format (arrays turned to lists, np.int64 converted to int, np.float64 to float, and so on). """ if isinstance(obj, np.float64): obj = float(obj) elif isinstance(obj, np.float32): obj = float(obj) elif isinstance(obj, np.int32): obj = int(obj) elif isinstance(obj, np.int64): obj = int(obj) elif isinstance(obj, np.int16): obj = int(obj) elif isinstance(obj, np.bool_): obj = bool(obj) elif isinstance(obj, np.str_): obj = str(obj) elif isinstance(obj, np.ndarray): # Must come after Quantity obj = obj.tolist() elif isinstance(obj, dict): for key, value in obj.items(): obj[key] = jsonify(value, debug=debug) elif isinstance(obj, list): for i,item in enumerate(obj): obj[i] = jsonify(item, debug=debug) elif isinstance(obj, tuple): obj = list(obj) for i,item in enumerate(obj): obj[i] = jsonify(item, debug=debug) obj = tuple(obj) if debug: print(type(obj)) return obj
[docs] def savejson(filename:str, obj:dict, overwrite=False, indent=None, easy_to_read=False, **kwargs): """ Save a python object to filename using the JSON encoder. Parameters ---------- filename : str obj : object Frequently a dict overwrite : bool, optional indent : int, optional Input to json.dump easy_to_read : bool, optional Another approach and obj must be a dict kwargs : optional Passed to json.dump Returns ------- """ if os.path.lexists(filename) and not overwrite: raise IOError('%s exists' % filename) if easy_to_read: if not isinstance(obj, dict): raise IOError("This approach requires obj to be a dict") with io.open(filename, 'w', encoding='utf-8') as f: f.write(json.dumps(obj, sort_keys=True, indent=4, separators=(',', ': '), **kwargs)) else: if filename.endswith('.gz'): with gzip.open(filename, 'wt') as fh: json.dump(obj, fh, indent=indent, **kwargs) else: with open(filename, 'wt') as fh: json.dump(obj, fh, indent=indent, **kwargs)
[docs] def loadjson(filename): """Load a python object saved with savejson. Args: filename (str): The path to the JSON file. Returns: obj: The loaded Python object. """ if filename.endswith('.gz'): with gzip.open(filename, "rb") as f: obj = json.loads(f.read().decode("ascii")) else: with open(filename, 'rt') as fh: obj = json.load(fh) return obj
[docs] def merge_dicts(dict_list:list): result = {} for d in dict_list: result.update(d) return result
[docs] def match_ids(IDs, match_IDs, require_in_match=True): """ Match input IDs to another array of IDs (usually in a table) Return the rows aligned with input IDs Parameters ---------- IDs : ndarray IDs that are to be found in match_IDs match_IDs : ndarray IDs to be searched require_in_match : bool, optional Require that each of the input IDs occurs within the match_IDs Returns ------- rows : ndarray Rows in match_IDs that match to IDs, aligned -1 if there is no match """ rows = -1 * np.ones_like(IDs).astype(int) # Find which IDs are in match_IDs in_match = np.in1d(IDs, match_IDs) if require_in_match: if np.sum(~in_match) > 0: raise IOError("qcat.match_ids: One or more input IDs not in match_IDs") rows[~in_match] = -1 # IDs_inmatch = IDs[in_match] # Find indices of input IDs in meta table -- first instance in meta only! xsorted = np.argsort(match_IDs) ypos = np.searchsorted(match_IDs, IDs_inmatch, sorter=xsorted) indices = xsorted[ypos] rows[in_match] = indices return rows
[docs] def round_to_day(dt): # Convert to unix timestamp in nanoseconds ts = dt.astype('datetime64[ns]').astype('int64') # Get number of nanoseconds in a day day_ns = 24 * 60 * 60 * 1_000_000_000 # Round to nearest day rounded_ts = ((ts + day_ns//2) // day_ns) * day_ns # Convert back to datetime64 return np.array(rounded_ts, dtype='datetime64[ns]')