# -*- coding: utf-8 -*-
"""Utility functions not specific to subtropical jet finding."""
from __future__ import division
import numpy as np
import xarray as xr
import xarray.ufuncs as xu
from scipy import interpolate as interp
__author__ = "Penelope Maher, Michael Kelleher"
# Constants to be used within this file
# specify the range and increment over which to calculate IPV
TH_LEV = np.arange(300.0, 501.0, 5)
RAD = np.pi / 180.0 # radians per degree
OM = 7.292e-5 # Angular rotation rate of earth [rad]
GRV = 9.81 # Acceleration due to GRVity [m/s^2]
EARTH_R = 6.371e6 # Radius of earth [m]
R_D = 287.0 # Dry gas constant [J kg^-1 K^-1]
C_P = 1004.0 # Specific heat of dry air [J kg^-1 K^-1]
KPPA = R_D / C_P # Ratio of gas constants
[docs]class NDSlicer(object):
"""
Create an n-dimensional slice list.
Parameters
----------
axis : integer
Axis on which to apply the slice
ndim : integer
Total number of dimensions of array to be sliced
start, stop, step : integer, optional
Index of beginning, stop and step width of the slice [start:stop:step]
default for each is None.
Examples
--------
Create random array, slice it::
x = np.random.randn(5, 3)
# Create slicer equivalent to [1:-1, :]
slc = NDSlicer(0, x.ndim)
print(x)
[[ 0.68470539 0.87880216 -0.45086367]
[ 1.06804045 0.63094676 -0.76633033]
[-1.69841915 0.35207064 -0.4582049 ]
[-0.56431067 0.62833728 -0.04101542]
[-0.02760744 2.02814338 0.13195714]]
print(x[slc[1:-1]])
[[ 1.06804045 0.63094676 -0.76633033]
[-1.69841915 0.35207064 -0.4582049 ]
[-0.56431067 0.62833728 -0.04101542]]
"""
def __init__(self, axis, ndim, start=None, stop=None, step=None):
"""N-Dimensional slice class for numpy arrays."""
self.axis = axis
self.ndim = ndim
self.start = start
self.stop = stop
self.step = step
self.slicer = None
self.__getitem__(slice(start, stop, step))
def __getitem__(self, key):
"""Create an n-dimensional slice list."""
if isinstance(key, slice):
self.start = key.start
self.stop = key.stop
self.step = key.step
elif isinstance(key, int):
self.start = key
self.stop = key + 1
self.step = None
self.slicer = [slice(None)] * self.ndim
self.slicer[self.axis] = slice(self.start, self.stop, self.step)
return tuple(self.slicer)
[docs] def slice(self, start=None, stop=None, step=None):
"""
Create an n-dimensional slice list.
This is a legacy compatibility method, calls `self.__getitem__`.
Parameters
----------
axis : integer
Axis on which to apply the slice
ndim : integer
Total number of dimensions of array to be sliced
start, stop, step : integer, optional
Index of beginning, stop and step width of the slice [start:stop:step]
default for each is None.
Returns
-------
slicer : list
list of slices such that all data at other axes are kept, one axis is sliced
"""
self.__getitem__(slice(start, stop, step))
[docs]def vinterp(data, vcoord, vlevels):
r"""
Perform linear vertical interpolation.
Parameters
----------
data : array_like (>= 2D)
Array of data to be interpolated
vcoord : array_like (>= 2D, where data.shape[1] == vcoord.shape[1])
Array representing the vertical structure (height/pressure/PV/theta/etc.) of
`data`
vlevels : array_like (1D)
Levels, in same units as vcoord, to interpolate to
Returns
-------
out_data : array_like, (data.shape[0], vlevels.shape[0], \*data.shape[2:])
Data on vlevels
Examples
--------
**data** and **vcoord** of the same shape
Given some u-wind data ``uwnd`` that is on (time, pressure, lat, lon), a potential
temperature field ``theta`` on the same dimensions (t, p, lat, lon), and a
few selected potential temperature levels to which u-wind will be interpolated
>>> th_levs = [275, 300, 325, 350, 375]
>>> print(uwnd.shape)
(365, 17, 73, 144)
>>> print(theta.shape)
(365, 17, 73, 144)
uwnd_theta = vinterp(uwnd, theta, th_levs)
>>> print(uwnd_theta.shape)
(365, 5, 73, 144)
This results in u-wind on the requested theta surfaces
**data** is 1D **vcoord** is 4D
Given some potential temperature levels ``th_levs = [275, 300, 325, 350, 375]``
and a 4D array of potential vorticity ipv on (time, theta, lat, lon), and
several selected PV levels
>>> pv_levs = [1.0, 2.0, 3.0]
>>> print(th_levs.shape)
(5, )
>>> print(ipv.shape)
(365, 5, 73, 144)
>>> theta_pv = vinterp(th_levs, ipv, pv_levs)
>>> print(theta_pv.shape)
(365, 3, 73, 144)
This gives potential temperature on potential vorticity surfaces
**data** is 4D **vcoord** is 1D
Given some pressure levels ``levs = [1000., 900., 850., 500.]``
and a 4D array of potential vorticity epv on (time, pressure, lat, lon), and
several selected pressure levels
>>> pres_new = [950.0, 800.0, 700.0]
>>> print(levs.shape)
(4, )
>>> print(epv.shape)
(365, 4, 73, 144)
>>> epv_intrp = vinterp(epv, levs, pres_new)
>>> print(epv_interp.shape)
(365, 3, 73, 144)
This gives potential vorticity on new pressure surfaces.
"""
if vcoord.ndim == 1 and data.ndim > 1:
# This handles the case where vcoord is 1D and data is N-D
v_dim = int(np.where(np.array(data.shape) == vcoord.shape[0])[0])
# numpy.broadcast_to only works for the last axis of an array, swap our shape
# around so that vertical dimension is last, broadcast vcoord to it, then swap
# the axes back so vcoord.shape == data.shape
data_shape = list(data.shape)
data_shape[-1], data_shape[v_dim] = data_shape[v_dim], data_shape[-1]
vcoord = np.broadcast_to(vcoord, data_shape)
vcoord = np.swapaxes(vcoord, -1, v_dim)
vcoord_shape = list(vcoord.shape)
vcoord_shape.pop(1)
valid = np.min([np.prod(vcoord_shape) - np.sum(np.isnan(vcoord[:, 0, ...])),
np.prod(vcoord_shape) - np.sum(np.isnan(vcoord[:, -1, ...]))])
if np.sum(vcoord[:, 0, ...] > vcoord[:, -1, ...]) / valid > 0.80:
# Vcoord data is decreasing on interpolation axis, (at least 80% is)
idx_gt = 1
idx_lt = 0
else:
# Data is increasing on interpolation axis
idx_gt = 0
idx_lt = 1
if data.ndim >= vcoord.ndim:
# Handle case where data has the same dimensions or data has more dimensions
# compared to vcoord (e.g. vcoord is 4D, data is 4D, or vcoord is 1D, data is 4D)
out_shape = list(data.shape)
else:
# Handle case where data has fewer dimensions than vcoord
# (e.g. data is 1-D vcoord is N-D)
out_shape = list(vcoord.shape)
out_shape[1] = vlevels.shape[0]
out_data = np.zeros(out_shape) + np.nan
for lev_idx, lev in enumerate(vlevels):
if idx_gt == 0:
# Case where vcoord data is increasing, find index where vcoord below [:-1]
# is equal or less than desired lev, and vcoord above [1:] is greater than
# lev, this means <data> for lev is between these points, use
# weight to determine exactly where
idx = np.squeeze(np.where(np.logical_and(vcoord[:, :-1, ...] <= lev,
vcoord[:, 1:, ...] > lev)))
else:
# This does the same, but where vcoord is decreasing with index, so find
# where vcoord below [:-1] is greater, and vcoord above [1:] is less or equal
idx = np.squeeze(np.where(np.logical_and(vcoord[:, :-1, ...] > lev,
vcoord[:, 1:, ...] <= lev)))
# Create copies of this index, so they can be modified for weighting functions
# and output array
idx_abve = idx.copy()
idx_belw = idx.copy()
out_idx = idx.copy()
# The interpolation axis index (1) for output is the level index (lev_idx)
out_idx[1, :] = lev_idx
# Weighting function 'above' is index +1 for decreasing, or index +0 for decr.
idx_abve[1, :] += idx_gt
# Weighting function 'below' is index +0 for decreasing, or index +1 for decr.
idx_belw[1, :] += idx_lt
# Change indicies back into tuples so numpy.array.__getitem__ understands them
idx_abve = tuple(idx_abve)
idx_belw = tuple(idx_belw)
out_idx = tuple(out_idx)
# Weighting function for distance above lev
wgt1 = ((lev - vcoord[idx_belw]) / (vcoord[idx_abve] - vcoord[idx_belw]))
# Weighting function for distance below lev
wgt0 = 1.0 - wgt1
if data.ndim >= vcoord.ndim:
# Handle case where data has same or more dimensions than vcoord
out_data[out_idx] = (wgt0 * data[idx_belw] + wgt1 * data[idx_abve])
else:
# Handle case where data has fewer dimensions than vcoord
out_data[out_idx] = (wgt0 * data[idx_belw[1]] + wgt1 * data[idx_abve[1]])
return np.squeeze(out_data)
[docs]def inc_with_z(vcoord, levname):
"""
Find what proportion of `vcoord` is increasing along the `levname` axis.
Parameters
----------
vcoord : :class:`xarray.DataArray`
array of vertical coordinate to test
levname : str
String name of vertical coordinate variable along which to test
Returns
-------
pctinc : float
Percentage of valid points in vcoord which are increasing
with increasing index
"""
_sabove = {levname: slice(1, None)}
_sbelow = {levname: slice(None, -1)}
# number of valid points on the 0th surface
nvalid = vcoord.isel(**{levname: 0}).notnull().sum().drop(levname)
# Places where the vcoord value on the 0th surface is less than the -1th (top)
n_incr = (vcoord.isel(**{levname: 0}) < vcoord.isel(**{levname: -1}))
# Sum the number where v[0] < v[-1], divide by number of valid points
pctinc = n_incr.sum(skipna=True) / nvalid
return pctinc
def _xrvinterp_single(data, vcoord, lev, levname='lev'):
r"""
Perform linear interpolation along vertical axis on an :class:`xarray.DataArray`.
Parameters
----------
data : :class:`xarray.DataArray` (>= 2D)
array of data to be interpolated
vcoord : :class:`xarray.DataArray`
array representing the vertical structure
(height/pressure/PV/theta/etc.) of `data`. This should be >= 2D, and
data[levname].shape == vcoord[levname].shape
lev : float
Level, in same units as vcoord, to interpolate to
levname : string
Name of the vertical level coordinate variable upon
which to interpolate
Returns
-------
out_data : array_like, (data.shape[0], 1, \*data.shape[2:])
Data on `lev`
"""
# Slice shortcut dicts, above looks like [1:], below looks like [:-1]
_sabove = {levname: slice(1, None)}
_sbelow = {levname: slice(None, -1)}
if inc_with_z(vcoord, levname) <= 0.8:
# Fewer than 80% of points are increasing with index, swap directions
_sbelow, _sabove = _sabove, _sbelow
# Original levels from vcoord, should match original levels from data
_olevs = vcoord[levname]
# Selection dictionary to map levelname to coordinates above / below
_coord_above = {levname: _olevs.isel(**_sabove)}
_coord_below = {levname: _olevs.isel(**_sbelow)}
# xarray.DataArray of booleans, true where the level "below" is less than
# or equal to the desired level, false everywhere else
below = (vcoord.isel(**_sbelow) <= lev).drop(levname)
# xarray.DataArray of booleans, true where the level "above" is
# greater than the desired level, false everywhere else
above = (vcoord.isel(**_sabove) > lev).drop(levname)
# Combine, this means select all points where
# vcoord[LEVEL - 1] <= lev && vcoord[LEVEL + 1] > lev
idx = xu.logical_and(above, below)
# This is vcoord[:, 1:, ...], wherever idx is true and NaN everywhere else
ix_ab = xr.where(idx.assign_coords(**_coord_above),
vcoord.isel(**_sabove), np.nan)
# This is vcoord[:, :-1, ...], wherever idx is true and NaN everywhere else
ix_bl = xr.where(idx.assign_coords(**_coord_below),
vcoord.isel(**_sbelow), np.nan)
# Drop the coordinate information for the index xarrays, otherwise the
# maths won't work (+, -, /, *)
ix_ab = ix_ab.drop(levname)
ix_bl = ix_bl.drop(levname)
# Compute linear interpolation weights
wgt1 = ((lev - ix_bl) / (ix_ab - ix_bl))
wgt0 = 1.0 - wgt1
# Perform the linear interpolation, sum over all original levels, to
# eliminate the vertical dimension and remove all the NaNs
out = (wgt0 * data.isel(**_sbelow).drop(levname) +
wgt1 * data.isel(**_sabove).drop(levname)).sum(dim=levname)
return out
[docs]def xrvinterp(data, vcoord, vlevs, levname, newlevname):
r"""
Perform vertical interpolation for several levels for an :class:`xarray.DataArray`.
Parameters
----------
data : :class:`xarray.DataArray` (>= 2D)
array of data to be interpolated
vcoord : :class:`xarray.DataArray`
array representing the vertical structure
(height/pressure/PV/theta/etc.) of `data`. This should be >= 2D, and
data[levname].shape == vcoord[levname].shape
vlevs : array_like (1D)
Levels, in same units as vcoord, to interpolate to
levname : string
Name of the vertical level coordinate variable upon
which to interpolate
newlevname : string
Name of new vertical level coordinate variable
Returns
-------
out_data : array_like, (data.shape[0], vlevs.shape[0], \*data.shape[2:])
Data on vlevels
Notes
-----
If the input vertical coordinate data is increasing / decreasing with
height in different places (e.g. potential vorticity across hemispheres),
mask data and compute each part separately then add them together
(if appropriate) or combine them, or whatever you'd like
to do with them. We're not the boss of you :)
"""
# Use a list-comprehension to assemble all the vertical coordinates
intp = [_xrvinterp_single(data, vcoord, lev, levname) for lev in vlevs]
# Concatenate the length (vlevs.shape[0]) list of xarray.DataArrays
# The concat_dims is default, and avoids weirdness when data rank is small
# (e.g. data.ndim < vcoord.ndim). Assing vlevs to be the values
# for the new coordinate
intp = xr.concat(intp, dim=newlevname).assign_coords(**{newlevname: vlevs})
# Transpose the data, so the new level dimension is where the old level
# dimension used to be in the original data or vcoord xarray.DataArray
# depending on which has higher rank
if data.ndim > vcoord.ndim:
_dims = list(data.dims)
else:
_dims = list(vcoord.dims)
lix = _dims.index(levname)
_dims[lix] = newlevname
intp = intp.transpose(*_dims)
# Use where to mask out values that are extrapolated
intp = intp.where(intp <= data.max()).where(intp >= data.min())
return intp
[docs]def interp_nd(lat, theta_in, data, lat_hr, theta_hr):
"""
Perform interpolation on 2-dimensions on up to 4-dimensional numpy array.
Parameters
----------
lat : array_like
One dimensional latitude coordinate array, matches a dimension of `data`
theta_in : array_like
One dimensional theta (vertical) coordinate array, matches a dimension of `data`
data : array_like
Data to be interpolated to high-resolution grid
lat_hr : array_like
1-D latitude coordinate array that `data` is interpolated to
theta_hr : array_like
1-D vertical coordinate array that `data` is interpolated to
Returns
-------
data_interp : array_like
Interpolated data where lat/theta dimensions are interpolated to `lat_hr` and
`theta_hr`
"""
lat_dim = np.where(np.array(data.shape) == lat.shape[0])[0]
theta_dim = np.where(np.array(data.shape) == theta_in.shape[0])[0]
if data.ndim == 2:
data_f = interp.interp2d(lat, theta_in, data, kind='cubic')
data_interp = data_f(lat_hr, theta_hr)
elif data.ndim == 3:
out_shape = list(data.shape)
out_shape[lat_dim] = lat_hr.shape[0]
out_shape[theta_dim] = theta_hr.shape[0]
data_interp = np.zeros(out_shape)
#For contexts when you have a 3d array but 2 dimensions are the same.
cmn_axis = np.where(np.logical_and(np.array(data.shape) != lat.shape[0],
np.array(data.shape) != theta_in.shape[0]))[0]
# cmn_axis = np.where(out_shape == np.array(data.shape))[0]
for idx0 in range(data.shape[cmn_axis]):
data_f = interp.interp2d(lat, theta_in, data.take(idx0, axis=cmn_axis),
kind='cubic')
slc = [slice(None)] * data_interp.ndim
slc[cmn_axis] = idx0
data_interp[slc] = data_f(lat_hr, theta_hr)
elif data.ndim == 4:
out_shape = list(data.shape)
out_shape[lat_dim] = lat_hr.shape[0]
out_shape[theta_dim] = theta_hr.shape[0]
data_interp = np.zeros(out_shape)
# For contexts when you have a 3d array but 2 dimensions are the same.
cmn_axis = np.where(np.logical_and(np.array(data.shape) != lat.shape[0],
np.array(data.shape) != theta_in.shape[0]))[0]
# cmn_axis = np.where(out_shape == np.array(data.shape))[0][:]
for idx0 in range(data.shape[cmn_axis[0]]):
for idx1 in range(data.shape[cmn_axis[1]]):
data_slice = data.take(idx1, axis=cmn_axis[1]).take(idx0,
axis=cmn_axis[0])
data_f = interp.interp2d(lat, theta_in, data_slice, kind='cubic')
# slc says which axis to place interpolated array on, it's what changes
# with the loops
slc = [slice(None)] * data_interp.ndim
slc[cmn_axis[0]] = idx0
slc[cmn_axis[1]] = idx1
data_interp[slc] = data_f(lat_hr, theta_hr)
return data_interp
[docs]def xrtheta(tair, pvar='level'):
"""
Calculate potential temperature from temperature and pressure coordinate.
Parameters
----------
tair : :class:`xarray.DataArray`
ND array of air temperature [in K]
pvar : string
Pressure level coordinate name for tair
Returns
-------
theta : :class:`xarray.DataArray`
ND array of potential temperature in K, same shape
as `tair` input
"""
# Attempt to figure pressure units
try:
p_units = tair[pvar].attrs['units']
except KeyError:
p_units = None
# Default assumption is that pressure is in Pascals
p_0 = 100000.0
if p_units in ['hPa', 'mb', 'mbar', 'millibar', 'millibars']:
# if pressure is in hPa (or similar), fix p_0
p_0 /= 100.
# Compute and return theta
return tair * (p_0 / tair[pvar]) ** KPPA
[docs]def theta(tair, pres):
"""
Calculate potential temperature from temperature and pressure coordinate.
Parameters
----------
tair : array_like
ND array of air temperature [in K]
pres : array_like
Either ND array of pressure same shape as `tair`, or 1D array of pressure where
its shape is same as one dimension of `tair` [in Pa]
Returns
-------
theta : array_like
ND array of potential temperature in K, same shape as `tair` input
"""
p_0 = 100000.0 # Don't be stupid, make sure pres and p_0 are in Pa!
if tair.ndim == pres.ndim:
p_axis = pres
else:
# Find which coordinate of tair is same shape as pres
zaxis = tair.shape.index(pres.shape[0])
# Generates a list of [None, None, ..., None] whose length is the number of
# dimensions of tair, then set the z-axis element in this list to be a slice
# of None:None This accomplishes same thing as [None, :, None, None] for
# ndim=4, zaxis=1
slice_idx = [None] * tair.ndim
slice_idx[zaxis] = slice(None)
# Create an array of pres so that its shape is (1, NPRES, 1, 1) if zaxis=1, ndim=4
p_axis = pres[slice_idx]
return tair * (p_0 / p_axis) ** KPPA
[docs]def xr_inv_theta(thta, pvar='level'):
"""
Calculate potential temperature from temperature and pressure coordinate.
Parameters
----------
thta : :class:`xarray.DataArray`
ND array of potential temperature [in K]
pvar : string
Pressure level coordinate name for tair
Returns
-------
tair : :class:`xarray.DataArray`
ND array of air temperature in K, same shape as `thta` input
"""
# Attempt to figure pressure units
try:
p_units = thta[pvar].attrs['units']
except KeyError:
p_units = None
# Default assumption is that pressure is in Pascals
p_0 = 100000.0
if p_units in ['hPa', 'mb', 'millibar']:
# if pressure is in hPa (or similar), fix p_0
p_0 /= 100.
# Compute and return theta
return thta * (p_0 / thta[pvar]) ** -KPPA
[docs]def inv_theta(thta, pres):
"""
Calculate potential temperature from temperature and pressure coordinate.
Parameters
----------
thta : array_like
Either ND array of potential temperature same shape as `pres`, or 1D array of
potential temperature where its shape is same as one dimension of `pres` [in K]
pres : array_like
ND array of pressure [in Pa]
Returns
-------
tair : array_like
ND array of air temperature in K, same shape as `tair` input
"""
p_0 = 100000.0 # Don't be stupid, make sure pres and p_0 are in Pa!
if thta.ndim == pres.ndim:
th_axis = thta
else:
# Find which coordinate of tair is same shape as pres
zaxis = pres.shape.index(thta.shape[0])
# Generates a list of [None, None, ..., None] whose length is the number of
# dimensions of tair, then set the z-axis element in this list to be a slice
# of None:None This accomplishes same thing as [None, :, None, None] for
# ndim=4, zaxis=1
slice_idx = [None] * pres.ndim
slice_idx[zaxis] = slice(None)
# Create an array of pres so that its shape is (1, NPRES, 1, 1) if zaxis=1, ndim=4
th_axis = thta[slice_idx]
return th_axis * (p_0 / pres) ** -KPPA
[docs]def lapse_rate(t_air, pres, vaxis=None):
"""
Calculate the lapse rate of temperature in K/km from isobaric temperature data.
Parameters
----------
t_air : array_like
N-D array of temperature on isobaric surfaces in K
pres : array_like
1-D array of pressure levels, matching one dimension of t_air, in hPa
Returns
-------
dtdz : array_like
N-D array of lapse rate in K/km matching dimensionality of t_air
d_z : array_like
N-D array of height differences in km between levels, same shape as t_air
"""
# Common axis is vertical axis
if pres.ndim == 1 or vaxis is None:
ax_com = t_air.shape.index(pres.shape[0])
elif pres.ndim == t_air.ndim:
ax_com = vaxis
else:
raise ValueError('Dimensions do not match: T: {} P: {}'
.format(t_air.ndim, pres.ndim))
# Create slices to use that are correct shape
slc_t = NDSlicer(ax_com, t_air.ndim)
if pres.ndim == t_air.ndim:
slc_p = NDSlicer(ax_com, pres.ndim)
bcast_nd = [slice(None)] * pres.ndim
else:
slc_p = NDSlicer(0, pres.ndim)
# This generates a list of length ndim of t_air, (if 4-D then
# [None, None, None, None]
bcast_nd = [None] * t_air.ndim
# This makes the common axis (vertical) a slice, if ax_com = 1 then it is the
# same as saying pres[None, :, None, None], but allowing ax_com to be automagic
bcast_nd[ax_com] = slice(None)
# Calculate lapse rate in K/km
d_p = (pres[slc_p.slice(1, None)] - pres[slc_p.slice(None, -1)]) # Units = Pa or hPa
if np.max(pres) < 90000.0:
d_p *= 100.0 # Now units should be in Pa
pres_fac = 100.0
else:
pres_fac = 1.0
# rho = p / (Rd * T)
# Hydrostatic approximation dz = -dp/(rho * g)
d_z = -d_p[bcast_nd] / (((pres[bcast_nd] * pres_fac) /
(R_D * t_air))[slc_t.slice(1, None)] * GRV) / 1000.0
# Lapse rate [K/km] (-dt / dz)
dtdz = -(t_air[slc_t.slice(1, None)] - t_air[slc_t.slice(None, -1)]) / d_z
return dtdz, d_z
[docs]def trop_lev_1d(dtdz, d_z, thr=2.0, return_idx=False):
"""
Given 1D arrays for lapse rate and change in height, and a threshold, find tropopause.
Parameters
----------
dtdz : array_like
1D array of lapse rate (in same units as `thr`, usually K km^-1
d_z : array_like
1D array of change of height from previous level, same shape as `dtdz`, in same
units as `dtdz` denominator units
thr : float
Lapse rate threshold for definition of tropopause, WMO/default is 2.0 K km^-1
return_idx : bool
Flag to return index of tropopause level. Default is False, if True output is
`out_mask`, `idx` if False, output is `out_mask`
Returns
-------
out_mask : array_like
1D array, same shape as `dtdz` and `d_z`, of booleans, `True` everywhere except
the tropopause level
"""
lt_thr = dtdz < thr
lt_transition = np.append(False, np.logical_and(np.logical_not(lt_thr[:-1]),
lt_thr[1:]))
start_idx = np.where(np.logical_and(lt_thr, lt_transition))[0]
out_mask = np.array([True] * d_z.shape[0], dtype=bool)
for idx in start_idx:
try:
max_in_2km = dtdz[idx:idx + abs(np.cumsum(d_z[idx:]) - 2.0).argmin()].max()
except ValueError:
continue
if max_in_2km <= thr:
out_mask[idx] = False
if return_idx:
return out_mask, idx
else:
return out_mask
[docs]def find_tropopause_mask(dtdz, d_z, thr=2.0):
"""
Use Reichler et al. 2003 method to calculate tropopause level.
Parameters
----------
dtdz : array_like
1-D array of lapse rate in K/km
d_z : array_like
1-D array (same shape as `dtdz`) of difference in height between levels
thr : float
Lapse rate threshold in K/km (WMO definition is 2.0 K/km)
Returns
-------
trop_level : integer
Index of tropopause level on the vertical axis
"""
# For now, we'll assume if 1-D data: vertical is dim 0, >= 2-D vertical is dim 1
# Loops are slow, but still can't figure out a way beyond them :-(
trop_level = np.empty(dtdz.shape, dtype=bool)
if dtdz.ndim == 1:
return trop_lev_1d(dtdz, d_z, thr)
elif dtdz.ndim == 2:
for ixt in range(dtdz.shape[0]):
trop_level[ixt, :] = trop_lev_1d(dtdz[ixt, :], d_z[ixt, :], thr)
elif dtdz.ndim == 3:
for ixt in range(dtdz.shape[0]):
for ixy in range(dtdz.shape[2]):
trop_level[ixt, :, ixy] = trop_lev_1d(dtdz[ixt, :, ixy], d_z[ixt, :, ixy],
thr)
elif dtdz.ndim == 4:
for ixt in range(dtdz.shape[0]):
for ixy in range(dtdz.shape[2]):
for ixx in range(dtdz.shape[3]):
trop_level[ixt, :, ixy, ixx] = trop_lev_1d(dtdz[ixt, :, ixy, ixx],
d_z[ixt, :, ixy, ixx], thr)
return trop_level
[docs]def get_tropopause(t_air, pres, thr=2.0, vaxis=1):
"""
Return the tropopause temperature and pressure for WMO tropopause.
Parameters
----------
t_air : array_like
ND array of temperature, where axis 1 is vertical axis
pres : array_like
ND array of pressure levels, shape is same as `t_air`
thr : float
Lapse rate threshold, default/WMO definition is 2.0 K km^-1
Returns
-------
trop_temp, trop_pres : array_like
Temperature and pressure at tropopause level, in (N-1)-D arrays, where dimension
dropped is vertical axis, same units as input t_air and pres respectively
"""
# Calculate the lapse rate, gives back lapse rate and d(height)
dtdz, d_z = lapse_rate(t_air, pres, vaxis=vaxis)
# Create tropopause level mask, use only the half levels (every other starting at 1)
trop_level_mask = find_tropopause_mask(dtdz[:, 1::2, ...], d_z[:, 1::2, ...], thr=thr)
# To get the tropopause temp/pres, mask the 4D arrays (at every other level)
# then take the mean across level axis (now only one unmasked point) to give 3D data
trop_temp = np.mean(np.ma.masked_where(trop_level_mask, t_air[:, 1::2, ...]), axis=1)
trop_pres = np.mean(np.ma.masked_where(trop_level_mask, pres[:, 1::2, ...]), axis=1)
return trop_temp, trop_pres
[docs]def get_tropopause_pres(t_air, pres, thr=2.0):
"""
Return the tropopause temperature and pressure for WMO tropopause.
Parameters
----------
t_air : array_like
ND array of temperature, where axis 1 is vertical axis
pres : array_like
1D array of pressure levels, shape is same as `t_air`.shape[1]
thr : float
Lapse rate threshold, default/WMO definition is 2.0 K km^-1
Returns
-------
trop_temp, trop_pres : array_like
Temperature and pressure at tropopause level, in (N-1)-D arrays, where dimension
dropped is vertical axis, same units as input t_air and pres respectively
"""
# Find half pressure levels
pres_hf = (pres[:-1] - pres[1:]) / 2.0 + pres[1:]
# Create full pressure array to interpolate temperature to
pres_full = np.zeros(pres.shape[0] + pres_hf.shape[0])
pres_full[::2] = pres
pres_full[1::2] = pres_hf
# Interpolate temperature to half pressure levels
t_interp = interp.interp1d(pres, t_air, axis=1, kind='linear')(pres_full)
# Broadcast pres_full to 4D, but pressure axis has to be last axis for broadcast_to
# so, use temp shape, where index 1 and -1 are switched
temp_shape = list(t_interp.shape)
temp_shape[1], temp_shape[-1] = temp_shape[-1], temp_shape[1]
pres_full_4d = np.swapaxes(np.broadcast_to(pres_full, temp_shape), -1, 1)
return get_tropopause(t_interp, pres_full_4d, thr=thr, vaxis=1)
[docs]def get_tropopause_theta(theta_in, pres, thr=2.0):
"""
Return the tropopause temperature and pressure for WMO tropopause.
Parameters
----------
theta_in : array_like
1D array of potential temperature
pres : array_like
ND array of pressure levels, with one axis matching shape of theta_in
thr : float
Lapse rate threshold, default/WMO definition is 2.0 K km^-1
Returns
-------
trop_temp, trop_pres : array_like
Temperature and pressure at tropopause level, in (N-1)-D arrays, where dimension
dropped is vertical axis, same units as input t_air and pres respectively
"""
t_air = inv_theta(theta_in, pres)
pres_levs = np.logspace(5, 3, theta_in.shape[0])
t_pres = vinterp(t_air, pres, pres_levs)
return get_tropopause_pres(t_pres, pres_levs, thr=thr)
[docs]def diff_cfd(data, axis=-1, cyclic=False):
"""
Calculate centered finite difference on a field along an axis with even spacing.
Parameters
----------
data : array_like
ND array of data of which to calculate the differences
axis : integer
Axis of `data` on which differences are calculated
cyclic : bool
Flag to indicate whether `data` is cyclic on `axis`
Returns
-------
diff : array_like
ND array of central finite differences of `data` along `axis`
"""
# Calculate centred differences along longitude direction
# Eqivalent to: diff = data[..., 2:] - data[..., :-2] for axis == -1
slc = NDSlicer(axis, data.ndim)
diff = data[slc[2:None]] - data[slc[None:-2]]
if cyclic:
# Cyclic boundary in "East"
# Equiv to diff[..., 0] = data[..., 1:2] - data[..., -1:]
d_1 = (data[slc[1:2]] - data[slc[-1:None]])
diff = np.append(d_1, diff, axis=axis)
# Cyclic boundary in "West"
# Equiv to diff[..., -1] = data[..., 0:1] - data[..., -2:-1]
diff = np.append(diff, (data[slc[0:1]] - data[slc[-2:-1]]), axis=axis)
else:
# Otherwise edges are forward/backward differences
# Boundary in "South", (data[..., 1:2] - data[..., 0:1])
diff = np.append((data[slc[1:2]] - data[slc[0:1]]), diff, axis=axis)
# Boundary in "North" (data[..., -1:] - data[..., -2:-1])
diff = np.append(diff, (data[slc[-1:None]] - data[slc[-2:-1]]), axis=axis)
return diff
[docs]def diff_cfd_xr(data, dim='lon', cyclic=False):
"""
Calculate centered finite difference along an axis with even spacing.
Parameters
----------
data : :class:`xarray.DataArray`
N-D array of data of which to calculate the differences
dim : string
Dimension name of `data` along which differences are calculated
cyclic : bool
Flag to indicate whether `data` is cyclic on `axis`
Returns
-------
diff : :class:`xarray.DataArray`
N-D array of central finite differences
of `data` along `axis`
"""
# For centred differences along longitude direction (e.g.) this is
# eqivalent to: diff = data[..., 2:] - data[..., :-2] for axis == -1
diff = (data.isel(**{dim: slice(2, None)}).drop(dim)
- data.isel(**{dim: slice(None, -2)}).drop(dim))
diff = diff.assign_coords(**{dim: data[dim].isel(**{dim: slice(1, -1)})})
if cyclic:
# Cyclic boundary in "East"
# Equiv to diff[..., 0] = data[..., 1:2] - data[..., -1:]
d_1 = (data.isel(**{dim: 1}).drop(dim) -
data.isel(**{dim: -1})).drop(dim)
# Cyclic boundary in "West"
# Equiv to diff[..., -1] = data[..., 0:1] - data[..., -2:-1]
d_2 = (data.isel(**{dim: 0}).drop(dim) -
data.isel(**{dim: -2})).drop(dim)
else:
# Otherwise edges are forward/backward differences
# Boundary in "South", (data[..., 1:2] - data[..., 0:1])
d_1 = data.isel(**{dim: 1}) - data.isel(**{dim: 0})
d_2 = data.isel(**{dim: -1}) - data.isel(**{dim: -2})
# Assign the coordiate to each "point"
d_1 = d_1.assign_coords(**{dim: data[dim].isel(**{dim: 0})})
d_2 = d_2.assign_coords(**{dim: data[dim].isel(**{dim: -1})})
# Concatinate along the difference dimension
diff = xr.concat((d_1, diff, d_2), dim=dim)
# Use .transpose to make sure diff dims are same as data dims
return diff.transpose(*data.dims)
[docs]def diffz(data, vcoord, axis=None):
"""
Calculate vertical derivative for data on uneven vertical levels.
Parameters
----------
data : array_like
N-D array of input data to be differentiated, where
data.shape[axis] == vcoord.shape[0]
vcoord : array_like
Vertical coordinate, 1D
axis : integer, optional
Axis where data.shape[axis] == vcoord.shape[0]
Returns
-------
dxdz : array_like
N-D array of d(data)/d(vcoord), same shape as input `data`
"""
if axis is None:
# Find matching axis between data and vcoord
try:
axis = data.shape.index(vcoord.shape[0])
except ValueError:
axis = vcoord.shape.index(data.shape[0])
# Create array to hold vertical derivative
dxdz = np.zeros(data.shape)
# Create an n-dimensional broadcast along matching axis, same as [None, :, None, None]
# for axis=1, ndim=4
if vcoord.ndim < data.ndim:
bcast = [np.newaxis] * data.ndim
bcast[axis] = slice(None)
d_z = (vcoord[1:] - vcoord[:-1])
d_z2 = d_z[:-1][bcast]
d_z1 = d_z[1:][bcast]
# Create n-dimensional slicer along matching axis
slc = NDSlicer(axis, data.ndim)
else:
slc_vc = NDSlicer(axis, vcoord.ndim)
d_z = (vcoord[slc_vc[1:None]] - vcoord[slc_vc[None:-1]])
d_z2 = d_z[slc_vc[None:-1]]
d_z1 = d_z[slc_vc[1:None]]
slc = NDSlicer(0, data.ndim)
dxdz[slc[1:-1]] = ((d_z2 * data[slc[2:None]] + (d_z1 - d_z2) * data[slc[1:-1]] -
d_z1 * data[slc[None:-2]]) / (2.0 * d_z1 * d_z2))
# Do forward difference at 0th level [:, 1, :, :] - [:, 0, :, :]
dz1 = vcoord[1] - vcoord[0]
dxdz[slc[0:1]] = (data[slc[0:1]] - data[slc[1:2]]) / dz1
# Do backward difference at Nth level
dz1 = vcoord[-1] - vcoord[-2]
dxdz[slc[-1:None]] = (data[slc[-1:None]] - data[slc[-2:-1]]) / dz1
return dxdz
[docs]def xrdiffz(data, vcoord, dim='lev'):
"""
Calculate vertical derivative for data on uneven vertical levels.
Parameters
----------
data : :class:`xarray.DataArray`
N-D array of input data to be differentiated, where
data.shape[axis] == vcoord.shape[0]
vcoord : :class:`xarray.DataArray`
Vertical coordinate, 1D
dim : string
Vertical dimension name, must exist in both `data` and `vcoord`
Returns
-------
dxdz : :class:`xarray.DataArray`
N-D array of d(data)/d(vcoord), same shape as input `data`
"""
d_z = (vcoord.isel(**{dim: slice(1, None)}).drop(dim) -
vcoord.isel(**{dim: slice(None, -1)}).drop(dim))
d_z1 = d_z.isel(**{dim: slice(1, None)})
d_z2 = d_z.isel(**{dim: slice(None, -1)})
# Central difference for uneven levels
diff = ((d_z2 * data.isel(**{dim: slice(2, None)}).drop(dim) +
(d_z1 - d_z2) * data.isel(**{dim: slice(1, -1)}).drop(dim) -
d_z1 * data.isel(**{dim: slice(None, -2)}).drop(dim)) /
(2.0 * d_z1 * d_z2))
diff = diff.assign_coords(**{dim: data[dim].isel(**{dim: slice(1, -1)})})
# Do forward difference at 0th level [:, 1, :, :] - [:, 0, :, :]
dz1 = (vcoord.isel(**{dim: 1}).drop(dim) -
vcoord.isel(**{dim: 0}).drop(dim))
diff_0 = (data.isel(**{dim: 1}).drop(dim) -
data.isel(**{dim: 0}).drop(dim)) / dz1
diff_0 = diff_0.assign_coords(**{dim: data[dim].isel(**{dim: 0})})
# Do backward difference at Nth level
dz2 = (vcoord.isel(**{dim: -1}).drop(dim) -
vcoord.isel(**{dim: -2}).drop(dim))
diff_1 = (data.isel(**{dim: -1}).drop(dim) -
data.isel(**{dim: -2}).drop(dim)) / dz2
diff_1 = diff_1.assign_coords(**{dim: data[dim].isel(**{dim: -1})})
# Combine all the differences
diff = xr.concat((diff_0, diff, diff_1), dim=dim)
return diff.transpose(*data.dims)
[docs]def convert_radians_latlon(lat, lon):
"""
Convert input lat/lon array to radians if input is degrees, do nothing if radians.
Parameters
----------
lat : array_like
ND array of latitude
lon : array_like
ND array of longitude
Returns
----------
lat : array_like
ND array of latitude in radians
lon : array_like
ND array of longitude in radians
"""
if (np.max(np.abs(lat)) - np.pi / 2.0) > 1.0:
lat_out = lat * RAD
else:
lat_out = lat
if(np.min(lon) < 0 and np.max(lon) > 0 and
np.abs(np.max(np.abs(lon)) - np.pi) > np.pi):
lon_out = lon * RAD
elif np.abs(np.max(np.abs(lon)) - np.pi * 2) > np.pi:
lon_out = lon * RAD
else:
lon_out = lon
return lat_out, lon_out
[docs]def dlon_dlat(lon, lat, cyclic=True):
"""
Calculate distance along lat/lon axes on spherical grid.
Parameters
----------
lat : array_like
ND array of latitude
lon : array_like
ND array of longitude
Returns
----------
dlong : array_like
NLat x Nlon array of horizontal distnaces along longitude axis
dlatg : array_like
NLat x Nlon array of horizontal distnaces along latitude axis
"""
# Check that lat/lon are in radians
lat, lon = convert_radians_latlon(lat, lon)
# Calculate centre finite difference of lon / lat
dlon = lon[2:] - lon[:-2]
dlat = lat[2:] - lat[:-2]
# If we want cyclic data, repeat dlon[0] and dlon[-1] at edges
if cyclic:
dlon = np.append(dlon[0], dlon) # cyclic boundary in East
dlon = np.append(dlon, dlon[-1]) # cyclic boundary in West
lon2d, lat2d = np.meshgrid(lon, lat)
else:
lon2d, lat2d = np.meshgrid(lon[1:-1], lat)
dlat = np.append(lat[1] - lat[0], dlat) # boundary in South
dlat = np.append(dlat, lat[-1] - lat[-2]) # boundary in North
dlong, dlatg = np.meshgrid(dlon, dlat)
# Lon/Lat differences in spherical coords
dlong = dlong * EARTH_R * np.cos(lat2d)
dlatg = dlatg * EARTH_R
return dlong, dlatg
[docs]def xr_dlon_dlat(data, vlon='lon', vlat='lat', cyclic=True):
"""
Calculate distance on lat/lon axes on spherical grid for :class:`xarray.DataArray`.
Parameters
----------
data : :class:`xarray.DataArray`
DataArray with latitude and longitude coordinates
vlon, vlat : string
Variable names of latitude and longitude
Returns
----------
dlong : :class:`xarray.DataArray`
NLat x Nlon array of horizontal distnaces
along longitude axis in m
dlatg : :class:`xarray.DataArray`
NLat x Nlon array of horizontal distances
along latitude axis in m
"""
lon = data[vlon]
lat = data[vlat]
lon, lat = convert_radians_latlon(lon, lat)
dlon = diff_cfd_xr(lon, dim=vlon, cyclic=False).compute()
if cyclic:
# If cyclic, the endpoints are central not fwd/bkw diffs, so assume
# that the grid is regular, and double the edges
dlon[0] *= 2
dlon[-1] *= 2
dlat = diff_cfd_xr(lat, dim=vlat, cyclic=False)
dlon = dlon * EARTH_R * lat.pipe(np.cos)
dlat = dlat * EARTH_R
return dlon, dlat
[docs]def rel_vort(uwnd, vwnd, lat, lon, cyclic=True):
r"""
Calculate the relative vorticity given zonal (uwnd) and meridional (vwnd) winds.
Parameters
----------
uwnd : array_like
Zonal wind >=2 dimensional (time, level, lat, lon)
vwnd : array_like
Meridional wind >=2 dimensional, same dimensions as uwnd
lat : array_like
Latitude array, 1 dimensional with lat.shape[0] == uwnd.shape[-2]
lon : array_like
Longitude array, 1 dimensional with lon.shape[0] == uwnd.shape[-1]
cyclic : boolean
Flag to indicate if data is cyclic in longitude direction
Returns
-------
rel_vort : array_like
Relative vorticity V = U x V (cross product of U and V wind)
If cyclic, returns same dimensions as uwnd & vwnd, if not it is
``(*vwnd.shape[0:-1], vwnd.shape[-1] - 2)``
"""
# Check that lat/lon are in radians
lat, lon = convert_radians_latlon(lat, lon)
# Get dlon and dlat in spherical coords
dlong, dlatg = dlon_dlat(lon, lat, cyclic)
# Generate quasi-broadcasts of lat/lon differences for divisions
if uwnd.ndim == 4:
dlong = dlong[np.newaxis, np.newaxis, ...]
dlatg = dlatg[np.newaxis, np.newaxis, ...]
elif uwnd.ndim == 3:
dlong = dlong[np.newaxis, ...]
dlatg = dlatg[np.newaxis, ...]
elif uwnd.ndim < 2:
raise ValueError('Not enough dimensions '
'for relative vorticity: {}'.format(uwnd.shape))
elif uwnd.ndim > 4:
raise NotImplementedError('Too many dimensions for this method: {} not yet'
'implemented'.format(uwnd.shape))
dvwnd = diff_cfd(vwnd, axis=-1, cyclic=True)
duwnd = diff_cfd(uwnd, axis=-2, cyclic=False)
# Divide vwnd differences by longitude differences
dvdlon = dvwnd / dlong
# Divide uwnd differences by latitude differences
if cyclic:
dudlat = duwnd / dlatg
else:
# If data isn't cyclic, then d(vwnd) and d(lon) will be one dim shorter on last
# dimension, so we need to make up for that in the d(uwnd)/d(lat) field to match
dudlat = duwnd[..., 1:-1] / dlatg
return dvdlon - dudlat
[docs]def xr_rel_vort(uwnd, vwnd, dimvars, cyclic=True):
r"""
Calculate the relative vorticity given zonal (u) and meridional (v) winds.
Parameters
----------
uwnd : :class:`xarray.DataArray`
Array of Zonal wind (with at least lat / lon dimensions)
vwnd : :class:`xarray.DataArray`
Array of Meridional wind with same dimensions as uwnd
cyclic : boolean
Flag to indicate if data is cyclic in longitude direction
Returns
-------
rel_vort : :class:`xarray.DataArray`
Relative vorticity V = U x V (cross product of U and V wind)
with same dimensions as uwnd & vwnd
"""
# Use .get to avoid key errors if dimvars is wrong, just defualt to
# lat / lon, and if those are wrong, it'll fail later
vlon = dimvars.get('lon', 'lon')
vlat = dimvars.get('lat', 'lat')
# Get dlon and dlat in spherical coords
dlong, dlatg = xr_dlon_dlat(uwnd, vlon=vlon, vlat=vlat, cyclic=cyclic)
dvwnd = diff_cfd_xr(vwnd, dim=vlon, cyclic=cyclic)
duwnd = diff_cfd_xr(uwnd, dim=vlat, cyclic=False)
# Divide vwnd differences by longitude differences
dvdlon = dvwnd / dlong
# Divide uwnd differences by latitude differences
dudlat = duwnd / dlatg
return dvdlon - dudlat
[docs]def dth_dp(theta_in, data_in):
"""
Calculate vertical derivative on even (theta) levels.
Parameters
----------
theta_in : array_like, 1D
Vertical coordinate (potential temperature)
data_in : array_like, ND, where data_in.shape[1] == theta_in.shape[0]
Data on theta levels
Returns
-------
out_data : array_like ND, same as data
Centered finite difference for data_in[:, 1:-1, ...], backward/forward for
bottom/top boundaries
"""
# Calculate centred finite differences for theta (1D) and data on theta lvls (>=2D)
dth = theta_in[2:] - theta_in[:-2]
ddata = data_in[:, 2:, ...] - data_in[:, :-2, ...]
# Calculated backward finite difference for bottom and top layers
dth = np.append(theta_in[1] - theta_in[0], dth)
dth = np.append(dth, theta_in[-1] - theta_in[-2])
ddata = np.append((data_in[:, 1, ...] - data_in[:, 0, ...])[:, np.newaxis, ...],
ddata, axis=1)
ddata = np.append(ddata, (data_in[:, -1, ...] -
data_in[:, -2, ...])[:, np.newaxis, ...], axis=1)
if data_in.ndim == 4:
return dth[np.newaxis, :, np.newaxis, np.newaxis] / ddata
elif data_in.ndim == 3:
return dth[np.newaxis, :, np.newaxis] / ddata
elif data_in.ndim == 2:
return dth[np.newaxis, :] / ddata
else:
raise ValueError('Incorrect number of dimensons: {}'.format(data_in.shape))
[docs]def ipv(uwnd, vwnd, tair, pres, lat, lon, th_levels=None):
"""
Calculate isentropic PV on theta surfaces.
Notes
-----
Interpolation assumes pressure is monotonically increasing.
Parameters
----------
uwnd : array_like
3 or 4-D zonal wind component (t, p, y, x) or (p, y, x)
vwnd : array_like
3 or 4-D meridional wind component (t, p, y, x) or (p, y, x)
tair : array_like
3 or 4-D air temperature (t, p, y, x) or (p, y, x)
pres : array_like
1D pressure in Pa
lat : array_like
1D latitude in degrees
lon : array_like
1D longitude in degrees
th_levels : array_like, optional
1D Theta levels on which to calculate PV. Defaults to 300K - 500K by 5K.
Returns
-------
ipv : array_like
3 or 4-D isentropic potential vorticity in units
of m-2 s-1 K kg-1 (e.g. 10^6 PVU)
p_th : array_like
Pressure on isentropic levels [Pa]
u_th : array_like
Zonal wind on isentropic levels [m/s]
"""
if th_levels is None:
th_levels = TH_LEV
# Calculate potential temperature on isobaric (pressure) levels
thta = theta(tair, pres)
# Interpolate zonal, meridional wind, pressure to isentropic from isobaric levels
u_th = vinterp(uwnd, thta, th_levels)
v_th = vinterp(vwnd, thta, th_levels)
p_th = vinterp(pres, thta, th_levels)
# Calculate IPV on theta levels
ipv_out = ipv_theta(u_th, v_th, p_th, lat, lon, th_levels)
return ipv_out, p_th, u_th
[docs]def ipv_theta(uwnd, vwnd, pres, lat, lon, th_levels):
"""
Calculate isentropic PV on theta surfaces from data on theta levels.
Parameters
----------
uwnd : array_like
3 or 4-D zonal wind component (t, p, y, x) or (p, y, x)
vwnd : array_like
3 or 4-D meridional wind component (t, p, y, x) or (p, y, x)
pres : array_like
3 or 4-D pressure in Pa (t, p, y, x) or (p, y, x)
lat : array_like
1D latitude in degrees
lon : array_like
1D longitude in degrees
th_levels : array_like
1D Theta levels on which to calculate PV
Returns
-------
ipv : array_like
3 or 4-D isentropic potential vorticity in units
of m-2 s-1 K kg-1 (e.g. 10^6 PVU)
"""
# Calculate relative vorticity on isentropic levels
rel_v = rel_vort(uwnd, vwnd, lat, lon)
# Calculate d{Theta} / d{pressure} on isentropic levels
dthdp = 1.0 / diffz(pres, th_levels)
# Calculate Coriolis force
# First, get axis matching latitude to input data
lat_bcast = [np.newaxis] * rel_v.ndim
lat_axis = np.where(np.array(rel_v.shape) == lat.shape[0])[0][0]
lat_bcast[lat_axis] = slice(None)
f_cor = 2.0 * OM * np.sin(lat[lat_bcast] * RAD)
# Calculate IPV, then correct for y-derivative problems at poles
ipv_out = -GRV * (rel_v + f_cor) * dthdp
for pole_idx in [0, -1]:
# This sets all points in longitude direction to mean of all points at the pole
ipv_out[..., pole_idx, :] = np.mean(ipv_out[..., pole_idx, :], axis=-1)[..., None]
# Return isentropic potential vorticity
return ipv_out
[docs]def xripv_theta(uwnd, vwnd, pres, dimvars):
"""
Calculate isentropic PV on theta surfaces from data on theta levels.
Parameters
----------
uwnd : :class:`xarray.DataArray`
3 or 4-D zonal wind component (t, theta, y, x) or (theta, y, x)
vwnd : :class:`xarray.DataArray`
3 or 4-D meridional wind component (t, theta, y, x) or (theta, y, x)
pres : :class:`xarray.DataArray`
3 or 4-D pressure in Pa (t, theta, y, x) or (theta, y, x)
dimvars : dict
Mapping of variable names for standard coordinates. This will default
to 'theta' -> 'theta', 'lat' -> 'lat', 'lon' -> 'lon'
Returns
-------
ipv : :class:`xarray.DataArray`
3 or 4-D isentropic potential vorticity in units
of m-2 s-1 K kg-1 (e.g. 10^6 PVU)
"""
th_var = dimvars.get('lev', 'level')
# Calculate relative vorticity on isentropic levels
rel_v = xr_rel_vort(uwnd, vwnd, dimvars, cyclic=True)
# Calculate d{Theta} / d{pressure} on isentropic levels
dthdp = 1.0 / xrdiffz(pres, pres[th_var], dim=th_var)
# Calculate Coriolis force
# First, get axis matching latitude to input data
f_cor = 2.0 * OM * (RAD * uwnd[dimvars['lat']]).pipe(np.sin)
# Calculate IPV, then correct for y-derivative problems at poles
ipv_out = -GRV * (rel_v + f_cor) * dthdp
# Return isentropic potential vorticity
return ipv_out
[docs]def xripv(uwnd, vwnd, tair, dimvars=None, th_levels=None):
"""
Calculate isentropic PV on theta surfaces from :class:`xarray.DataArray`.
Notes
-----
Interpolation assumes pressure is monotonically increasing.
Parameters
----------
uwnd : :class:`xarray.DataArray`
3 or 4-D zonal wind component (t, p, y, x) or (p, y, x)
vwnd : :class:`xarray.DataArray`
3 or 4-D meridional wind component (t, p, y, x) or (p, y, x)
tair : :class:`xarray.DataArray`
3 or 4-D air temperature (t, p, y, x) or (p, y, x)
dimvars : dict
Mapping of variable names for standard coordinates. This will default
to 'lev' -> 'level', 'lat' -> 'lat', 'lon' -> 'lon'
th_levels : array_like, optional
1D array of Theta levels on which to calculate PV. Defaults to 300K - 500K by 5K.
Returns
-------
ipv : :class:`xarray.DataArray`
3 or 4-D isentropic potential vorticity in units
of m-2 s-1 K kg-1 (e.g. 10^6 PVU)
p_th : :class:`xarray.DataArray`
Pressure on isentropic levels [Pa]
u_th : :class:`xarray.DataArray`
Zonal wind on isentropic levels [m/s]
"""
if th_levels is None:
th_levels = TH_LEV
# import pdb;pdb.set_trace()
th_levels = np.float32(th_levels)
if dimvars is None:
dimvars = {'lev': 'level', 'lat': 'lat', 'lon': 'lon'}
vlev = dimvars['lev']
# Calculate potential temperature on isobaric (pressure) levels
thta = xrtheta(tair, pvar=vlev)
# Interpolate zonal, meridional wind, pressure to isentropic from
# isobaric levels
u_th = xrvinterp(uwnd, thta, th_levels, levname=vlev,
newlevname=vlev)
v_th = xrvinterp(vwnd, thta, th_levels, levname=vlev,
newlevname=vlev)
# Check the units of uwnd.level to be sure to use Pa
try:
_punits = uwnd.level['units']
except (KeyError, AttributeError):
_punits = None
if _punits in ['hPa', 'mb', 'millibar']:
scale = 100.
else:
scale = 1.
p_th = xrvinterp(scale * uwnd[vlev], thta, th_levels, levname=vlev,
newlevname=vlev)
# Calculate IPV on theta levels
ipv_out = xripv_theta(u_th, v_th, p_th, dimvars)
return ipv_out, p_th, u_th