Source code for yapc.classify_photons

#!/usr/bin/env python
u"""
classify_photons.py
Written by Aimee Gibbons and Tyler Sutterley (12/2022)
Yet Another Photon Classifier for ATL03 Geolocated Photon Data

PYTHON DEPENDENCIES:
    numpy: Scientific Computing Tools For Python
        https://numpy.org
        https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
    scipy: Scientific Tools for Python
        https://docs.scipy.org/doc/
    scikit-learn: Machine Learning in Python
        http://scikit-learn.org/stable/index.html
        https://github.com/scikit-learn/scikit-learn

UPDATE HISTORY:
    Updated 12/2022: place some imports behind try/except statement
    Updated 06/2022: added option for setting the minimum KNN value
        can use a dynamic window height by setting win_h to 0
    Updated 04/2022: can weight using only height differences
    Updated 10/2021: half the perimeter for weighting the distances
        scale weights by the selected number of neighbors (K)
        check that the telemetry band height is positive
    Updated 09/2021: add option for setting aspect ratio of window
        add option to return selection window dimensions
    Updated 08/2021: update algorithm to match current GSFC version
    Updated 05/2021: use int64 to fix numpy deprecation warning
    Written 05/2021
"""
import warnings
import numpy as np
import yapc._dist_metrics as _dist_metrics

# attempt imports
try:
    import sklearn.neighbors
except (AttributeError, ImportError, ModuleNotFoundError) as exc:
    warnings.filterwarnings("always")
    warnings.warn("scikit-learn not available")
    warnings.warn("Some functions will throw an exception if called")
# ignore warnings
warnings.filterwarnings("ignore")

# PURPOSE: create distance metric for windowed classifier
[docs]def windowed_manhattan(u, v, window=[], w=[]): """ Create a windowed Manhattan distance metric Parameters ---------- u: float Input array v: float Input array for distance window: float or list, default [] distance window for reducing neighbors w: float or list, default [] weights for each value """ # verify dimensions u = np.atleast_1d(u) v = np.atleast_1d(v) # calculate Manhattan (rectilinear) distances l1_diff = np.abs(u - v) # broadcast window to dimensions if using a square window window = np.broadcast_to(np.atleast_1d(window),l1_diff.shape) w = np.broadcast_to(np.atleast_1d(w),l1_diff.shape) for d,wnd in enumerate(window): if (l1_diff[d] >= wnd): l1_diff[d] = np.inf # scale differences by weights with np.errstate(invalid='ignore'): l1_diff[d] *= w[d] return l1_diff.sum()
# PURPOSE: calculate distances between points as matrices
[docs]def distance_matrix(u, v, p=1, window=[], w=[]): """ Calculate distances between two collections of points Parameters ---------- u: float First collection of coordinates v: float Second collection of coordinates p: int, default 1 Power for calculating distance - ``1``: Manhattan distances - ``2``: Euclidean distances window: float or list, default [] Distance window for reducing neighbors w: float or list, default [] weights for each value """ M,s = np.shape(u) N,s = np.shape(v) # allocate for output distance matrix D = np.zeros((M,N)) # broadcast window to dimensions if using a square window window = np.broadcast_to(np.atleast_1d(window),(s,)) w = np.broadcast_to(np.atleast_1d(w),(s,)) for d in range(s): ii, = np.dot(d,np.ones((1,N))).astype(np.int64) jj, = np.dot(d,np.ones((1,M))).astype(np.int64) dx = np.abs(u[:,ii] - v[:,jj].T) # window differences for dimension dx[dx >= window[d]] = np.inf # scale differences by weights # add differences to total distance matrix with np.errstate(invalid='ignore'): D += np.power(w[d]*dx, p) # convert distances to output units return np.power(D, 1.0/p)
# PURPOSE: use the GSFC YAPC k-nearest neighbors algorithm to determine # weights for each photon event
[docs]def classify_photons(x, h, h_win_width, indices, **kwargs): """ Use the NASA GSFC YAPC k-nearest neighbors algorithm to determine weights for each photon event Parameters ---------- x: float along-track x coordinates for photon events h: float photon event heights h_win_width: float height of (possibly 2) telemetry bands indices: int indices of photon events to classify K: int, default 0 number of values for KNN algorithm - Set to 0 for dynamic selection of neighbors min_knn: int, default 5 minimum number of values for KNN algorithm min_ph: int, default 3 minimum number of photons to be valid min_xspread: float, default 1.0 minimum along-track spread of photon events min_hspread: float, default 0.01 minimum window of heights for photon events win_x: float, default 15.0 along-track length of window win_h: float, default 6.0 height of window - Set to 0 for dynamic window height aspect: float, default 0.0 aspect ratio of x and h window - Set to 0 for pre-defined window dimensions method: str, default 'linear' algorithm for computing photon event weights - ``'ball_tree'``: use scikit.learn.BallTree with custom distance metric - ``'linear'``: use a brute-force approach with linear algebra - ``'brute'``: use a brute-force approach metric: str, default 'height' metric for computing distances - ``'height'``: height differences - ``'manhattan'``: manhattan distances return_window: bool, default False return the width and height of the selection window return_K: bool, default False return the dynamically selected number of values """ # set default keyword arguments kwargs.setdefault('K', 0) kwargs.setdefault('min_knn', 5) kwargs.setdefault('min_ph', 3) kwargs.setdefault('min_xspread', 1.0) kwargs.setdefault('min_hspread', 0.01) kwargs.setdefault('win_x', 15.0) kwargs.setdefault('win_h', 6.0) kwargs.setdefault('aspect', 0.0) kwargs.setdefault('method', 'linear') kwargs.setdefault('metric', 'height') kwargs.setdefault('return_window', False) kwargs.setdefault('return_K', False) # number of photon events n_pe = len(h[indices]) # output photon weights pe_weights = np.zeros((n_pe)) # number of values for KNN algorithm if (kwargs['K'] == 0): K = np.max([kwargs['min_knn'], np.sqrt(n_pe)/2]).astype(int) else: K = np.copy(kwargs['K']) # check that number of photons is greater than criteria # number of points but be greater than or equal to k min_ph_check = (n_pe >= kwargs['min_ph']) & (n_pe >= (K+1)) if np.logical_not(min_ph_check) and kwargs['return_window'] and kwargs['return_K']: # return empty weights, window sizes and selection window return (pe_weights, 0.0, 0.0, K) if np.logical_not(min_ph_check) and kwargs['return_window']: # return empty weights and window sizes return (pe_weights, 0.0, 0.0) if np.logical_not(min_ph_check) and kwargs['return_K']: # return empty weights and selection window return (pe_weights, K) elif np.logical_not(min_ph_check): # return empty weights return pe_weights # along-track spread of photon events xspread = np.max(x[indices]) - np.min(x[indices]) # height spread of photon events hspread = np.max(h[indices]) - np.min(h[indices]) # check that spread widths are greater than criteria spread_check = (xspread >= kwargs['min_xspread']) & \ (hspread >= kwargs['min_hspread']) & \ (h_win_width >= 0.0) if np.logical_not(spread_check) and kwargs['return_window'] and kwargs['return_K']: # return empty weights, window sizes and selection window return (pe_weights, 0.0, 0.0, K) if np.logical_not(spread_check) and kwargs['return_window']: # return empty weights and window sizes return (pe_weights, 0.0, 0.0) if np.logical_not(spread_check) and kwargs['return_K']: # return empty weights and selection window return (pe_weights, K) elif np.logical_not(spread_check): # return empty weights return pe_weights # use pre-defined window size or adaptive window size if (kwargs['aspect'] > 0.0): # photon density density = n_pe/(xspread*h_win_width) # minimum area to contain minimum number of photon events area_min = kwargs['min_ph']/density # minimum length of a square containing minimum number of photons length_min = np.sqrt(area_min) # calculate horizontal and vertical window sizes with aspect ratio # x window length will be aspect times the h window length win_x = np.power(kwargs['aspect'], 0.5)*length_min win_h = np.power(kwargs['aspect'], -0.5)*length_min elif (kwargs['win_h'] == 0): # use adaptive window height win_x = np.copy(kwargs['win_x']) win_h = h_win_width/n_pe*K else: # pre-defined window size win_x = np.copy(kwargs['win_x']) win_h = np.copy(kwargs['win_h']) # reduce to a buffered window xmin = np.min(x[indices]) - win_x/2.0 xmax = np.max(x[indices]) + win_x/2.0 hmin = np.min(h[indices]) - win_h/2.0 hmax = np.max(h[indices]) + win_h/2.0 iwin, = np.nonzero((x >= xmin) & (x <= xmax) & (h >= hmin) & (h <= hmax)) # normalization for weights if (kwargs['metric'] == 'height'): dist_norm = K*win_h/2.0 dist_weights = np.array([0.0, 1.0]) else: dist_norm = K*(win_x/2.0 + win_h/2.0) dist_weights = np.array([1.0, 1.0]) # method of calculating photon event weights if (kwargs['method'] == 'ball_tree'): # use BallTree with custom metric to calculate photon event weights # window for nearest neighbors window = np.array([win_x/2.0,win_h/2.0]) # create ball tree with photon events # using a cythonized callable distance metric tree = sklearn.neighbors.BallTree(np.c_[x[iwin],h[iwin]], metric=_dist_metrics.windowed_manhattan, window=window, w=dist_weights) # K nearest neighbors with windowed manhattan metric # use K+1 to remove identity distances (d=0) dist,_ = tree.query(np.c_[x[indices],h[indices]], k=K+1, return_distance=True) # calculate photon event weights and normalize by window size valid = np.all(np.isfinite(dist),axis=1) inv_dist = np.sum(win_x/2.0 + win_h/2.0 - dist[:,1:],axis=1) pe_weights[valid] = inv_dist[valid]/dist_norm elif (kwargs['method'] == 'linear'): # use brute force with linear algebra to calculate photon event weights # window for nearest neighbors window = np.array([win_x/2.0,win_h/2.0]) # calculate distance matrix between points dist = distance_matrix(np.c_[x[indices],h[indices]], np.c_[x[iwin],h[iwin]], p=1, window=window, w=dist_weights) # sort distances and get K nearest neighbors # use K+1 to remove identity distances (d=0) dist_sort = np.sort(dist, axis=1)[:,1:K+1] # calculate inverse distance of photon events in window inv_dist = win_x/2.0 + win_h/2.0 - dist_sort # calculate photon event weights and normalize by window size valid = np.all(np.isfinite(dist_sort),axis=1) pe_weights[valid] = np.sum(inv_dist[valid,:], axis=1)/dist_norm elif (kwargs['method'] == 'brute'): # use brute force approach to calculate photon event weights for j,i in enumerate(indices): # all photon events in buffer excluding source photon ii = sorted(set(iwin) - set([i])) # distance of photon events to source photon dx = np.abs(x[ii] - x[i]) dh = np.abs(h[ii] - h[i]) # indices of photons within window n, = np.nonzero((dx < (win_x/2.0)) & (dh < (win_h/2.0))) # skip iteration if there are less than K within window if (len(n) < K): continue # calculate inverse distance of photon events in window # sort distances and get K nearest neighbors if (kwargs['metric'] == 'height'): inv_dist = win_h/2.0 - dh[n] k_sort = np.argsort(dh[n])[:K] else: inv_dist = win_x/2.0 - dx[n] + win_h/2.0 - dh[n] k_sort = np.argsort(dx[n] + dh[n])[:K] # sum of the K largest weights (normalized by the window size) pe_weights[j] = np.sum(inv_dist[k_sort])/dist_norm # check if returning both the weights and auxiliary data if kwargs['return_window'] and kwargs['return_K']: # return the weights, window size and dynamically selected K return (pe_weights, win_x, win_h, K) elif kwargs['return_window']: # return the weights and window size return (pe_weights, win_x, win_h) elif kwargs['return_K']: # return the weights and dynamically selected K return (pe_weights, K) else: # return the weights return pe_weights