Source code for STJ_PV.stj_metric

# -*- coding: utf-8 -*-
"""Calculate the position of the subtropical jet in both hemispheres."""
import subprocess
import yaml
import numpy as np
import numpy.polynomial as poly
from scipy import signal as sig
from scipy.signal import argrelextrema

from netCDF4 import num2date, date2num
import pandas as pd
import xarray as xr
from xarray import ufuncs as xu
from STJ_PV import utils

try:
    from eddy_terms import Kinetic_Eddy_Energies
except ModuleNotFoundError:
    print('Eddy Terms Function not available, STJKangPolvani not available')

try:
    GIT_ID = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip()
except subprocess.CalledProcessError:
    GIT_ID = 'NONE'


[docs]class STJMetric: """ Generic Class containing Sub Tropical Jet metric methods and attributes. Parameters ---------- name : string Name of this type of method data : :class:`~STJ_PV.input_data.InputData` Object containing required input data props : :class:`~STJ_PV.run_stj.JetFindRun` Properties about the current jet finding attempt, including log file """ def __init__(self, name=None, data=None, props=None): """Initialize a subtropical jet metric.""" self.name = name self.data = data self.props = props.config self.log = props.log self.out_data = {} self.time = None self.hemis = None self.debug_data = {} self.plot_idx = 0 def _drop_vars(self, out_var): """Drop coordinate variables that may not match.""" for drop_var in ['pv', self.data.cfg['lat']]: if drop_var in self.out_data[out_var].coords: self.out_data[out_var] = self.out_data[out_var].drop(drop_var)
[docs] def save_jet(self): """Save jet position to file.""" # Setup metadata for output variables props = { 'lat': { 'standard_name': 'jet_latitude', 'descr': 'Latitude of subtropical jet', 'units': 'degrees_north', }, 'intens': { 'standard_name': 'jet_intensity', 'descr': 'Intensity of subtropical jet', 'units': 'm s-1', }, 'theta': { 'standard_name': 'jet_theta', 'descr': 'Theta level of subtropical jet', 'units': 'K', }, } for out_var in self.out_data: # Clean up dimension labels self._drop_vars(out_var) prop_name = out_var.split('_')[0] self.out_data[out_var] = self.out_data[out_var].assign_attrs(props[prop_name]) out_dset = xr.Dataset(self.out_data) self.log.info("WRITE TO {output_file}".format(**self.props)) file_attrs = {'commit-id': GIT_ID, 'run_props': yaml.safe_dump(self.props)} out_dset = out_dset.assign_attrs(file_attrs) out_dset.to_netcdf(self.props['output_file'] + '.nc')
[docs] def set_hemis(self, shemis): r""" Select hemisphere data. This function sets `self.hemis` to be an length N list of slices such that only the desired hemisphere is selected with N-D data (e.g. uwind and ipv) along all other axes. It also returns the latitude for the selected hemisphere, an index to select the hemisphere in output arrays, and the extrema function to find min/max of PV derivative in a particular hemisphere. Parameters ---------- shemis : boolean If true - use southern hemisphere data, if false, use NH data Returns ------- extrema : function Function used to identify extrema in meridional PV gradient, either :func:`scipy.signal.argrelmax` if SH, or :func:`scipy.signal.argrelmin` for NH lats : tuple (start, end) of latitude, for use with `xarray.DataArray.sel(lat=slice(\*lats))` hem_s : string Abbreviation for hemisphere (NH or SH) """ lats = [self.props.get('min_lat', 0), self.props.get('max_lat', 90)] lat_dec = self.data[self.data.cfg['lat']][0] > self.data[self.data.cfg['lat']][-1] if shemis: _lstart = -90 _lend = 0 extrema = sig.argrelmax hem_s = 'sh' if lats[0] >= 0 and lats[1] >= 0: # Lats are positive, multiply by -1 to get positive for SH lats = (-lats[0], -lats[1]) else: _lstart = 0 _lend = 90 extrema = sig.argrelmin hem_s = 'nh' if lats[0] <= 0 and lats[1] <= 0: # Lats are negative, multiply by -1 to get positive for NH lats = (-lats[0], -lats[1]) if lat_dec: _lstart, _lend = _lend, _lstart self.hemis = {self.data.cfg['lat']: slice(_lstart, _lend)} return extrema, lats, hem_s
[docs] def compute(self): """Compute all dask arrays in `self.out_data`.""" for vname in self.out_data: self._drop_vars(vname) try: self.out_data[vname] = self.out_data[vname].compute() except (AttributeError, TypeError): self.props.log.info( 'Compute fail data at %s not dask array it is %s', vname, type(self.out_data[vname]), )
[docs] def append(self, other): """Append another metric's intensity, latitude, and theta positon to this one.""" for var_name in self.out_data: self.out_data[var_name] = xr.concat( (self.out_data[var_name], other.out_data[var_name]), dim=self.data.cfg['time'], )
[docs]class STJPV(STJMetric): """ Subtropical jet position metric using dynamic tropopause on isentropic levels. Parameters ---------- props : :class:`~STJ_PV.run_stj.JetFindRun` Class containing properties about the current search for the STJ data : :class:`~STJ_PV.input_data.InputData` Input data class containing a year (or more) of required data """ def __init__(self, props, data): """Initialise Metric using PV Gradient Method.""" name = 'PVGrad' super(STJPV, self).__init__(name=name, props=props, data=data) # Some config options should be properties for ease of access self.pv_lev = self.props['pv_value'] self.fit_deg = self.props['fit_deg'] self.min_lat = self.props['min_lat'] if self.props['poly'].lower() in ['cheby', 'cby', 'cheb', 'chebyshev']: self.pfit = poly.chebyshev.chebfit self.pder = poly.chebyshev.chebder self.peval = poly.chebyshev.chebval elif self.props['poly'].lower() in ['leg', 'legen', 'legendre']: self.pfit = poly.legendre.legfit self.pder = poly.legendre.legder self.peval = poly.legendre.legval elif self.props['poly'].lower() in ['poly', 'polynomial']: self.pfit = poly.polynomial.polyfit self.pder = poly.polynomial.polyder self.peval = poly.polynomial.polyval # Initialise latitude & theta output dicts self.out_data = {} def _poly_deriv(self, lat, data, deriv=1): """ Calculate the `deriv`^th derivative of a one-dimensional array w.r.t. latitude. Parameters ---------- data : array_like 1D array of data, same shape as `self.data.lat` y_s, y_e : integers, optional Start and end indices of subset, default is None deriv : integer, optional Number of derivatives of `data` to take Returns ------- poly_der : array_like 1D array of 1st derivative of data w.r.t. latitude between indices y_s and y_e """ # Determine where data is valid...Intel's Lin Alg routines fail when trying to do # a least squares fit on array with np.nan, use where it's valid to do the fit valid = np.isfinite(data) try: poly_fit = self.pfit(lat[valid], data[valid], self.fit_deg) except TypeError as err: # This can happen on fitting the polynomial: # `raise TypeError("expected non-empty vector for x")` # If that's the error we get, just set the position to 0, # which is later masked, otherwise raise the error if 'non-empty' in err.args[0]: poly_fit = np.zeros(self.fit_deg) else: raise poly_der = self.peval(lat, self.pder(poly_fit, deriv)) return poly_der, (poly_fit, lat[valid])
[docs] def isolate_pv(self, pv_lev): """ Get the potential temperature, zonal wind and zonal wind shear for a PV level. Parameters ---------- pv_lev : float PV value (for a particular hemisphere, >0 for NH, <0 for SH) on which to interpolate potential temperature and wind theta_bnds : tuple, optional Start and end theta levels to use for interpolation. Default is None, if None, use all theta levels, otherwise restrict so theta_bnds[0] <= theta <= theta_bnds[1] Returns ------- theta_xpv : array_like N-1 dimensional array (where `self.data.ipv` is N-D) of potential temperature on `pv_lev` PVU uwnd_xpv : array_like N-1 dimensional array (where `self.data.uwnd` is N-D) of zonal wind on `pv_lev` PVU ushear : array_like Wind shear between uwnd_xpv and "surface", meaning the lowest valid level """ if 'theta_s' in self.data.cfg and 'theta_e' in self.data.cfg: theta_bnds = (self.data.cfg['theta_s'], self.data.cfg['theta_e']) assert theta_bnds[0] < theta_bnds[1], 'Start level not strictly less than end' theta_bnds = slice(*theta_bnds) else: theta_bnds = slice(None) lev_name = self.data.cfg['lev'] lev_subset = {lev_name: theta_bnds} # Create a copy of the level subset so we can add the latitude hemisphere # subset to do both at once on the 4D arrays, without interfering with the 1D sel # because if the 'lat' dim isn't a dim on the self.data[lev_name] array, the # selection will raise an error _latlev = lev_subset.copy() _latlev.update(self.hemis) _pv = self.data.ipv.sel(**_latlev).load() _uwnd = self.data.uwnd.sel(**_latlev).load() self.log.info(' COMPUTING THETA ON %.1e', pv_lev) theta_xpv = utils.xrvinterp( self.data[lev_name].sel(**lev_subset), _pv, pv_lev, levname=lev_name, newlevname='pv', ).load() self.log.info(' COMPUTING UWND ON %.1e', pv_lev) uwnd_xpv = utils.xrvinterp( _uwnd, _pv, pv_lev, levname=lev_name, newlevname='pv' ).load() self.log.info(' COMPUTING SHEAR FROM %.1e', pv_lev) ushear = self._get_max_shear(uwnd_xpv.squeeze(dim='pv')).load() return theta_xpv.squeeze(dim='pv'), uwnd_xpv.squeeze(dim='pv'), ushear
[docs] def find_jet(self, shemis=True, debug=False): """ Find the subtropical jet using input parameters. Parameters ---------- shemis : logical, optional If True, find jet position in Southern Hemisphere, If False, find N.H. jet debug : logical, optional Enter debug mode if true, returns d(theta) / d(lat) values, polynomial fit, and jet latitude """ if shemis and self.pv_lev < 0 or not shemis and self.pv_lev > 0: pv_lev = np.array([self.pv_lev]) * 1e-6 else: pv_lev = -1 * np.array([self.pv_lev]) * 1e-6 extrema, lats, hem_s = self.set_hemis(shemis) self.log.info('COMPUTING THETA/UWND ON %.1f PVU', pv_lev * 1e6) # Get theta on PV==pv_level theta_xpv, uwnd_xpv, ushear = self.isolate_pv(pv_lev) # Shortcut for latitude variable name, since it's used a lot vlat = self.data.cfg['lat'] # Restrict theta and shear between our min / max latitude from config file # that was processed by self.set_hemis _theta = theta_xpv.sel(**{vlat: slice(*lats)}) if _theta[vlat].shape[0] == 0: # If the selection is empty along the latitude axis, that means # the selection is the wrong way around, so flip it before moving on lats = lats[::-1] _theta = theta_xpv.sel(**{vlat: slice(*lats)}) _shear = ushear.sel(**{vlat: slice(*lats)}) self.log.info('COMPUTING JET POSITION FOR %s in %d', hem_s, self.data.year) # Set up computation of all the jet latitudes at once using self.find_single_jet # The input_core_dims is a list of lists, that tells xarray/dask that the # arguments _theta, _theta.lat, and _shear are passed to self.find_single_jet # with that dimension intact. The kwargs argument passes keyword args to the # self.find_single_jet if not debug: jet_lat = xr.apply_ufunc( self.find_single_jet, _theta, _theta[vlat], _shear, input_core_dims=[[vlat], [vlat], [vlat]], vectorize=True, dask='parallelized', kwargs={'extrema': extrema}, ) else: dtheta, theta_fit, jet_lat = self._debug_jet_loop(_theta, _shear, extrema) # Select the data for level and intensity by the latitudes generated jet_theta = theta_xpv.sel(**{vlat: jet_lat}) jet_intens = uwnd_xpv.sel(**{vlat: jet_lat}) # This masks our xarrays of intrest where the jet_lat == 0.0, which is set # whenever there is invalid data for a particular cell jet_intens = jet_intens.where(jet_lat != 0.0) jet_theta = jet_theta.where(jet_lat != 0.0) jet_lat = jet_lat.where(jet_lat != 0.0) # If we're interested in mean / median, take those if self.props['zonal_opt'].lower() == 'mean': jet_intens = jet_intens.mean(dim=self.data.cfg['lon']) jet_theta = jet_theta.mean(dim=self.data.cfg['lon']) jet_lat = jet_lat.mean(dim=self.data.cfg['lon']) elif self.props['zonal_opt'].lower() == 'median': jet_intens = jet_intens.median(dim=self.data.cfg['lon']) jet_theta = jet_theta.median(dim=self.data.cfg['lon']) jet_lat = jet_lat.median(dim=self.data.cfg['lon']) # Put the parameters into place for this hemisphere self.out_data['intens_{}'.format(hem_s)] = jet_intens self.out_data['theta_{}'.format(hem_s)] = jet_theta self.out_data['lat_{}'.format(hem_s)] = jet_lat if debug: output = dtheta, theta_fit, _theta, jet_lat else: output = None return output
def _debug_jet_loop(self, _theta, _shear, extrema): """Loop over each time/lon in _theta rather than xarray.apply_ufunc.""" dims = _theta.shape tht_fit_shape = (self.props['fit_deg'] + 1, dims[0], dims[-1]) dtheta = np.zeros(dims) theta_fit = np.zeros(tht_fit_shape) jet_lat = np.zeros((dims[0], dims[-1])) lat = _theta[self.data.cfg['lat']].values dims_names = (self.data.cfg['time'], self.data.cfg['lon']) coords = {dim_name: _theta[dim_name] for dim_name in dims_names} _theta = _theta.compute() _shear = _shear.compute() for tix in range(dims[0]): for xix in range(dims[-1]): # Find derivative of dynamical tropopause _info = self.find_single_jet( _theta[tix, :, xix].values, lat, _shear[tix, :, xix].values, extrema, debug=True, ) jet_lat[tix, xix] = _info[0] dtheta[tix, :, xix] = _info[2] theta_fit[:, tix, xix] = _info[3][0] jet_lat = xr.DataArray(jet_lat, coords=coords, dims=dims_names) _dims = (self.data.cfg['time'], self.data.cfg['lat'], self.data.cfg['lon']) _coords = {dim_name: _theta[dim_name] for dim_name in _dims} dtheta = xr.DataArray(dtheta, coords=_coords, dims=_dims) _dims = (_dims[0], _dims[-1]) _coords = {dim_name: _theta[dim_name] for dim_name in _dims} _coords['deg'] = np.arange(theta_fit.shape[0]) _dims = ('deg', *_dims) theta_fit = xr.DataArray(theta_fit, coords=_coords, dims=_dims) return dtheta, theta_fit, jet_lat def _get_max_shear(self, uwnd_xpv): """Get maximum wind-shear between surface and PV surface.""" # Our zonal wind data is on isentropic levels. Lower levels are bound to be below # the surface in some places, so we need to use the lowest valid wind level as # the surface, so do some magic to make that happen. _lev = self.data.cfg['lev'] if self.data[_lev].shape[0] != self.data.chunks[_lev][0]: # re-chunk wind to ensure continuity along vertical axis self.data = self.data.chunk({_lev: -1}) uwnd_sfc = xr.apply_ufunc( lowest_valid, self.data.uwnd, input_core_dims=[[self.data.cfg['lev']]], vectorize=True, dask='parallelized', output_dtypes=[float], ) return uwnd_xpv - uwnd_sfc.sel(**self.hemis)
[docs] def find_single_jet(self, theta_xpv, lat, ushear, extrema, debug=False): """ Find jet location for a 1D array of theta on latitude. Parameters ---------- theta_xpv : array_like Theta on PV level as a function of latitude lat : array_like 1D array of latitude same shape as theta_xpv from :py:meth:`~isolate_pv` ushear : array_like 1D array along latitude axis of maximum surface - troposphere u-wind shear debug : boolean If True, returns debugging information about how jet position is found, if False (default) returns only jet location Returns ------- jet_loc : int If debug is False, Index of jet location on latitude axis jet_loc, jet_loc_all, dtheta, theta_fit, lat, y_s, y_e : tuple If debug is True, return lots of stuff TODO: document this better!! """ # Find derivative of dynamical tropopause dtheta, theta_fit = self._poly_deriv(lat, theta_xpv) jet_loc_all = extrema(dtheta)[0].astype(int) select = self.select_jet(jet_loc_all, ushear) if np.max(np.abs(theta_fit[0])) == 0.0: # This means there was a TypeError in _poly_deriv so probably # none of the theta_xpv data is valid for this time/lon, so # set the output latitude to be 0, so it can be masked out out_lat = 0.0 else: out_lat = lat[select] if debug: output = out_lat, jet_loc_all, dtheta, theta_fit, lat else: output = out_lat return output
[docs] def select_jet(self, locs, ushear): """ Select correct jet latitude given list of possible jet locations. Parameters ---------- locs : list List of indicies of jet locations ushear : array_like 1D array along latitude axis of maximum surface - troposphere u-wind shear Returns ------- jet_loc : int Index of the jet location. Between [`0`, `lat.shape[0] - 1`] Notes ----- * If the list of locations is empty, return ``0`` as the location, this is interpreted by :py:meth:`~find_jet` as missing. * If the list of locations is exactly one return that location. * Otherwise use the location with maximum zonal wind shear between lowest provided level and the dynamical tropopause. """ if len(locs) == 0: # A jet has not been identified at this time/location, set the position # to zero so it can be masked out when the zonal median is performed jet_loc = 0 elif len(locs) == 1: # This essentially converts a list of length 1 to a single int jet_loc = locs[0] elif len(locs) > 1: # The jet location, if multiple peaks are identified, should be the one # with maximum wind shear between the jet level and the surface ushear_max = np.argmax(ushear[locs]) jet_loc = locs[ushear_max] return jet_loc
[docs]class STJDavisBirner(STJMetric): """ Subtropical jet position metric using the method of Davis and Birner 2016. Parameters ---------- props : :py:meth:`~STJ_PV.run_stj.JetFindRun` Class containing properties about the current search for the STJ data : :py:meth:`~STJ_PV.input_data.InputData` Input data class containing a year (or more) of required data Notes ----- Based on "Climate Model Biases in the Width of the Tropical Belt Parameters", Davis and Birner (2016) The logic for the method is to subtract the surface wind and then find the max in the upper level wind. """ # https://github.com/TropD/pytropd/blob/master/pytropd/metrics.py def __init__(self, props, data): """Initialise Metric using Davis and Birner 2016 method.""" name = 'DavisBirner' super(STJDavisBirner, self).__init__(name=name, props=props, data=data) # Some config options should be properties for ease of access self.upper_p_level = self.props['upper_p_level'] self.lower_p_level = self.props['lower_p_level'] self.surf_p_level = self.props['surface_p_level'] if self.data[self.data.cfg['lev']].units in ['mb', 'millibars', 'hPa']: self.lower_p_level /= 100.0 self.upper_p_level /= 100.0 self.surf_p_level /= 100.0
[docs] def find_jet(self, shemis=True): """ Find the subtropical jet using method from Davis and Birner (2016). doi:10.1175/JCLI-D-15-0336.1 Parameters ---------- shemis : logical, optional If True, find jet position in Southern Hemisphere, if False, find N.H. jet """ _, hlats, hem_s = self.set_hemis(shemis) cfg = self.data.cfg if self.data[cfg['lev']][0] > self.data[cfg['lev']][-1]: subset = {cfg['lev']: slice(self.lower_p_level, self.upper_p_level)} else: subset = {cfg['lev']: slice(self.upper_p_level, self.lower_p_level)} subset.update({cfg['lat']: slice(*hlats)}) uwnd_p = self.data.uwnd.sel(**subset) if uwnd_p[cfg['lat']].shape[0] == 0: # Reverse order of lat selection if none are found subset[cfg['lat']] = slice(*hlats[::-1]) uwnd_p = self.data.uwnd.sel(**subset) # Subtract the surface wind from the wind in 400-100 layer uwnd_p = uwnd_p - self.data.uwnd.sel(**{cfg['lev']: self.surf_p_level}) dims = uwnd_p.shape self.log.info('COMPUTING JET POSITION FOR %d TIMES HEMIS: %s', dims[0], hem_s) if self.props['zonal_opt'].lower() == 'mean': uzonal = uwnd_p.mean(dim=cfg['lon']) else: uzonal = uwnd_p jet_info = xr.apply_ufunc( self.find_max_wind_surface, uzonal, uzonal[cfg['lat']], input_core_dims=[[cfg['lev'], cfg['lat']], [cfg['lat']]], vectorize=True, dask='allowed', output_core_dims=[[], []], output_dtypes=[float, float], ) # Put the parameters into place for this hemisphere self.out_data['lat_{}'.format(hem_s)] = jet_info[0] self.out_data['intens_{}'.format(hem_s)] = jet_info[1]
[docs] def find_max_wind_surface(self, uzonal, lat, test_plot=False): """ Find most equatorward maximum wind on column maximum wind surface. Parameters ---------- uzonal : array_like Zonal mean zonal wind shemis : bool Flag to indicate hemisphere, true for SH """ max_wind_surface = np.max(uzonal, axis=0) # for the given maximum wind surface, find local # maxima and then keep most equatorward. turning_points = argrelextrema(max_wind_surface, np.greater_equal)[0] turning_lats = lat[turning_points] # Take the argmin of the absolute value of the latitudes, use that index # This finds the most equatorward latitude, regardless of hemisphere lat_idx = turning_points[np.abs(turning_lats).argmin()] if lat_idx not in [0, 1, lat.shape[0] - 1, lat.shape[0]]: # If the selected index is away from the boundaries, interpolate using # a quadratic to find the "real" maximum location nearby = slice(lat_idx - 1, lat_idx + 2) # Cast lat and max_wind_surface as arrays of float32 # because linalg can't handle float16 pfit = np.polyfit(lat[nearby], max_wind_surface[nearby], deg=2) # Take first derivative of the quadratic, solve for maximum # to get the jet latitude stj_lat = -pfit[1] / (2 * pfit[0]) # Evaluate 2nd degree polynomial at the maximum to get intensity stj_intens = np.polyval(pfit, stj_lat) else: stj_lat = lat[lat_idx] stj_intens = max_wind_surface[lat_idx] # test_plot = False if test_plot: self.test_plot(lat, max_wind_surface, lat_idx, stj_lat, stj_intens) return stj_lat, stj_intens
[docs] def test_plot(self, lat, max_wind_surface, lat_idx, stj_lat, stj_intens): """Make a test plot for this method.""" print("jet intensity is: ", stj_intens) import matplotlib.pyplot as plt plot_line = False if max(lat) < 0: hem = 'SH' plt.subplot(2, 1, 1) if stj_lat < -30.0: plot_line = True else: hem = 'NH' plt.subplot(2, 1, 2) if stj_lat > 30.0: plot_line = True if plot_line: plt.plot(lat, max_wind_surface) plt.plot(stj_lat, max_wind_surface[lat_idx], 'x') plt.title(hem) filename = 'test_davis.png' plt.savefig(filename)
[docs]class STJMaxWind(STJMetric): """ Subtropical jet position metric: maximum zonal mean zonal wind on a pressure level. Parameters ---------- props : :py:meth:`~STJ_PV.run_stj.JetFindRun` Class containing properties about the current search for the STJ data : :py:meth:`~STJ_PV.input_data.InputData` Input data class containing a year (or more) of required data """ def __init__(self, props, data): """Initialise Metric using PV Gradient Method.""" name = 'UMax' super(STJMaxWind, self).__init__(name=name, props=props, data=data) # Some config options should be properties for ease of access self.pres_lev = self.props['pres_level'] if self.data[self.data.cfg['lev']].units in ['mb', 'hPa', 'millibars']: self.pres_lev /= 100.0 self.min_lat = self.props['min_lat']
[docs] def find_jet(self, shemis=True): """ Find the subtropical jet using input parameters. Parameters ---------- shemis : logical, optional If True, find jet position in Southern Hemisphere, if False, find N.H. jet """ _, hlats, hem_s = self.set_hemis(shemis) vlat = self.data.cfg['lat'] self.log.info( 'COMPUTING JET POSITION FOR %d TIMES HEMIS: %s', self.data.uwnd.shape[0], hem_s, ) # Setup a dict to get the desired pressure level and hemisphere _latlev_select = {self.data.cfg['lev']: self.pres_lev, vlat: slice(*hlats)} # Select the latitudes and level uwnd_hem = self.data.uwnd.sel(**_latlev_select) if uwnd_hem[vlat].shape[0] == 0: # This probably means that lat is decreasing with increasing index # so swap the hemisphere latitudes around and re-try the selection _latlev_select[vlat] = slice(*hlats[::-1]) uwnd_hem = self.data.uwnd.sel(**_latlev_select) # Find the maximum zonal mean zonal wind at the level set in config uwnd_max = uwnd_hem.argmax(dim=vlat) # Use those indicies to find the latitude and intensity of the max wind # Use the latitude coordinate from the hemisphere restricted uwnd so that the # isel works properly (otherwise it's off by the nuber of gridpoints excluded) jet_lat = uwnd_hem[vlat].isel(**{vlat: uwnd_max.load()}) jet_intens = uwnd_hem.isel(**{vlat: uwnd_max.load()}) # Put the parameters into place for this hemisphere, taking the zonal mean first self.out_data['lat_{}'.format(hem_s)] = jet_lat.mean(dim=self.data.cfg['lon']) self.out_data['intens_{}'.format(hem_s)] = jet_intens.mean( dim=self.data.cfg['lon'] )
[docs]class STJKangPolvani(STJMetric): """ Subtropical jet position metric: Kang and Polvani 2010. Parameters ---------- props : :py:meth:`~STJ_PV.run_stj.JetFindRun` Class containing properties about the current search for the STJ data : :py:meth:`~STJ_PV.input_data.InputData` Input data class containing a year (or more) of required data """ def __init__(self, props, data): """Initialise Metric using Kang and Polvani Method.""" name = 'KangPolvani' super(STJKangPolvani, self).__init__(name=name, props=props, data=data) self.wh_200 = 20000.0 / self.data.cfg["pfac"] self.wh_1000 = 100000.0 / self.data.cfg["pfac"]
[docs] def find_jet(self, shemis=True): """ Find the subtropical jet using input parameters. Parameters ---------- shemis : logical, optional If True, find jet position in Southern Hemisphere, if False, find N.H. jet """ _, hlats, hem_s = self.set_hemis(shemis) del_f = self.get_flux_div(hlats) self.get_jet_lat(del_f, hem_s)
[docs] def get_flux_div(self, lats): """Calculate the meridional eddy momentum flux divergence.""" _select = {self.data.cfg["lev"]: self.wh_200} _select.update(self.hemis) uwnd = self.data["uwnd"].sel(**_select) vwnd = self.data["vwnd"].sel(**_select) lat = uwnd[self.data.cfg["lat"]].values k_e = Kinetic_Eddy_Energies(uwnd, vwnd, self.data.cfg) k_e.get_components(zonal=True, time=False) k_e.calc_momentum_flux() del_f = k_e.del_f return del_f
[docs] def get_jet_lat(self, del_f, hem_s): """Find the 200hpa zero crossing of meridional eddy momentum flux divergence.""" uwnd = self.data["uwnd"].sel(**self.hemis).mean(dim=self.data.cfg["lon"]) _vlat = self.data.cfg["lat"] lat = uwnd[_vlat] # signchange = ((np.roll(np.sign(del_f), 1) - np.sign(del_f)) != 0).values _delsign = del_f.pipe(np.sign) signchange = (_delsign.roll({_vlat: 1}, roll_coords=False) - _delsign) != 0 stj_lat = np.zeros(uwnd.shape[0]) stj_int = np.zeros(uwnd.shape[0]) _uwnd200 = uwnd.sel(**{self.data.cfg["lev"]: self.wh_200}) _uwnd1000 = uwnd.sel(**{self.data.cfg["lev"]: self.wh_1000}) # Don't consider endpoints as having a sign change _no_endpoints = {_vlat: slice(1, -1)} signchange = signchange.isel(**_no_endpoints) # And restrict the wind in the same way _uwnd200 = _uwnd200.isel(**_no_endpoints) _uwnd1000 = _uwnd1000.isel(**_no_endpoints) stj_lat = xr.apply_ufunc( self.find_single_jet, _uwnd200, _uwnd1000, _uwnd200[_vlat], signchange, input_core_dims=[[_vlat], [_vlat], [_vlat], [_vlat]], vectorize=True, dask='allowed', ) # Output the monthly mean of daily S for comparing the method jet_data = xr.DataArray( stj_lat, coords=[self.data["uwnd"][self.data.cfg["time"]]], dims=[self.data.cfg["time"]], ) jet_intens = _uwnd200.sel(**{_vlat: jet_data}) _resample = {self.data.cfg["time"]: "MS"} jet_data_mm = jet_data.resample(**_resample).mean() jet_intens_mm = jet_intens.resample(**_resample).mean() self.out_data["lat_{}".format(hem_s)] = jet_data_mm self.out_data["intens_{}".format(hem_s)] = jet_intens_mm
[docs] def find_single_jet(self, u_high, u_low, lat, sign_change): """ Get streamfunction inflection point with the largest zonal-mean zonal wind shear. """ shear = u_high[sign_change] - u_low[sign_change] return lat[sign_change][shear.argmax()]
[docs] def get_jet_loc(self, data, expected_lat, lat): """Get jet location based on sign changes of Del(f).""" signchange = ((np.roll(np.sign(data), 1) - np.sign(data)) != 0).values idx = (np.abs(lat[signchange] - expected_lat)).argmin() return lat[signchange][idx]
[docs] def loop_jet_lat(self, data, expected_lat, lat): """Get jet location at multiple times.""" return np.array( [ self.get_jet_loc(data[tidx, :], expected_lat[tidx], lat) for tidx in range(data.shape[0]) ] )
[docs]def lowest_valid(col): """ Given 1-D array find lowest (along axis) valid data. Parameters ---------- col : array_like 1D array of data Returns ------- valid : col.dtype Lowest (in index) valid value in `col` """ return col[np.isfinite(col).argmax()]
[docs]def get_season(month): """Map month index to index of season [DJF -> 0, MAM -> 1, JJA -> 2, SON -> 3].""" seasons = np.array([0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 0]) return seasons[month]