Source code for geowombat.radiometry.sr
import datetime
import logging
import math
import xml.etree.ElementTree as ET
from collections import namedtuple
from datetime import datetime as dtime
import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr
from rasterio.fill import fillnodata
from scipy.ndimage import zoom
from ..core import ndarray_to_xarray
from ..core.properties import get_sensor_info
from ..handler import add_handler
from ..moving import moving_window
from .angles import parse_sentinel_angles, relative_azimuth
from .sixs import AOT, SixS
logger = logging.getLogger(__name__)
logger = add_handler(logger)
[docs]def coeffs_to_array(coeffs, band_names):
"""Converts coefficients to a DataArray."""
return xr.DataArray(
data=[coeffs[bi] for bi in band_names],
coords={'band': band_names},
dims='band',
)
[docs]def p_r(m, r, rphase, cos_solar_za, cos_sensor_za):
"""Calculates atmospheric reflectance due to Rayleigh scattering.
Args:
m (float): The air mass.
r (float): The Rayleigh optical depth.
rphase (float): The Rayleigh phase function.
cos_solar_za (DataArray): The cosine of the solar zenith angle.
cos_sensor_za (DataArray): The cosine of the sensor zenith angle.
Returns:
``xarray.DataArray``
"""
rphase_stack = xr.concat([rphase] * len(m.band), dim='band')
rphase_stack.coords['band'] = m.band.values
cos_solar_za_stack = xr.concat([cos_solar_za] * len(m.band), dim='band')
cos_solar_za_stack.coords['band'] = m.band.values
cos_sensor_za_stack = xr.concat([cos_sensor_za] * len(m.band), dim='band')
cos_sensor_za_stack.coords['band'] = m.band.values
return rphase_stack * (
(1.0 - np.exp(-m * r))
/ (4.0 * (cos_solar_za_stack + cos_sensor_za_stack))
)
[docs]def t_sv(r, cos_zenith):
"""Calculates atmospheric transmittance of sun-surface path.
Args:
r (float): The Rayleigh optical depth.
cos_zenith (DataArray): The cosine of the zenith angle.
Returns:
``xarray.DataArray``
"""
cos_zenith_stack = xr.concat([cos_zenith] * len(r.band), dim='band')
cos_zenith_stack.coords['band'] = r.band.values
cose1 = np.exp(-r / cos_zenith_stack)
cose2 = np.exp(0.52 * r / cos_zenith_stack)
return cose1 + cose1 * (cose2 - 1.0)
[docs]def s_atm(r):
"""Calculates atmospheric backscattering ratio to count multiple
reflections between the surface and atmosphere.
Args:
r (float): The Rayleigh optical depth.
Returns:
``float``
"""
return (0.92 * r) * np.exp(-r)
def _format_coeff(dataframe, sensor, key):
bands_dict = dict(
l5={
'1': 'blue',
'2': 'green',
'3': 'red',
'4': 'nir',
'5': 'swir1',
'6': 'th',
'7': 'swir2',
},
l7={
'1': 'blue',
'2': 'green',
'3': 'red',
'4': 'nir',
'5': 'swir1',
'6VCID1': 'th1',
'6VCID2': 'th2',
'7': 'swir2',
'8': 'pan',
},
l8={
'1': 'coastal',
'2': 'blue',
'3': 'green',
'4': 'red',
'5': 'nir',
'6': 'swir1',
'7': 'swir2',
'8': 'pan',
'9': 'cirrus',
'10': 'th1',
'11': 'th2',
},
l9={
'1': 'coastal',
'2': 'blue',
'3': 'green',
'4': 'red',
'5': 'nir',
'6': 'swir1',
'7': 'swir2',
'8': 'pan',
'9': 'cirrus',
'10': 'th1',
'11': 'th2',
},
)
sensor_dict = bands_dict[sensor]
dataframe_ = dataframe[dataframe.iloc[:, 0].str.startswith(key)].values
pairs = {}
for di in range(dataframe_.shape[0]):
bd = dataframe_[di, 0]
cf = dataframe_[di, 1]
# e.g., REFLECTANCE_ADD_BAND_1 -> 1
band_name = ''.join(bd.split('_')[3:])
if band_name in sensor_dict:
var_band = sensor_dict[band_name]
pairs[var_band] = float(cf)
if not pairs:
logger.warning(' No metadata coefficients were acquired.')
return pairs
[docs]class MetaData(object):
"""A class for sensor metadata."""
[docs] @staticmethod
def get_landsat_coefficients(meta_file):
"""Gets coefficients from a Landsat metadata file.
Args:
meta_file (str): The text metadata file.
Returns:
``namedtuple``:
sensor, m_l, a_l, m_p, a_p, date_acquired, sza
"""
associations = {
'LANDSAT_5': 'l5',
'LANDSAT_7': 'l7',
'LANDSAT_8': 'l8',
'LANDSAT_9': 'l9',
}
MetaCoeffs = namedtuple(
'MetaCoeffs',
'sensor m_l a_l m_p a_p date_acquired sza left bottom right top',
)
df = pd.read_csv(meta_file, sep='=')
df.iloc[:, 0] = df.iloc[:, 0].str.strip()
df.iloc[:, 1] = df.iloc[:, 1].str.strip()
spacecraft_id = dict(
df[df.iloc[:, 0].str.startswith('SPACECRAFT_ID')].values
)
spacecraft_id['SPACECRAFT_ID'] = spacecraft_id[
'SPACECRAFT_ID'
].replace('"', '')
sensor = associations[spacecraft_id['SPACECRAFT_ID']]
m_l = _format_coeff(df, sensor, 'RADIANCE_MULT_BAND_')
a_l = _format_coeff(df, sensor, 'RADIANCE_ADD_BAND_')
m_p = _format_coeff(df, sensor, 'REFLECTANCE_MULT_BAND_')
a_p = _format_coeff(df, sensor, 'REFLECTANCE_ADD_BAND_')
ux_dict = dict(
df[
df.iloc[:, 0].str.startswith('CORNER_UL_PROJECTION_X_PRODUCT')
].values
)
lx_dict = dict(
df[
df.iloc[:, 0].str.startswith('CORNER_LR_PROJECTION_X_PRODUCT')
].values
)
uy_dict = dict(
df[
df.iloc[:, 0].str.startswith('CORNER_UL_PROJECTION_Y_PRODUCT')
].values
)
ly_dict = dict(
df[
df.iloc[:, 0].str.startswith('CORNER_LR_PROJECTION_Y_PRODUCT')
].values
)
left = float(ux_dict['CORNER_UL_PROJECTION_X_PRODUCT'])
bottom = float(ly_dict['CORNER_LR_PROJECTION_Y_PRODUCT'])
right = float(lx_dict['CORNER_LR_PROJECTION_X_PRODUCT'])
top = float(uy_dict['CORNER_UL_PROJECTION_Y_PRODUCT'])
solar_elev = dict(
df[df.iloc[:, 0].str.startswith('SUN_ELEVATION')].values
)
solar_elev = solar_elev['SUN_ELEVATION'].replace('"', '')
solar_zenith = 90.0 - float(solar_elev)
date_acquired_ = dict(
df[df.iloc[:, 0].str.startswith('DATE_ACQUIRED')].values
)
date_acquired_ = date_acquired_['DATE_ACQUIRED'].replace('"', '')
year, month, day = date_acquired_.split('-')
year = int(year)
month = int(month)
day = int(day)
scene_center_time = dict(
df[df.iloc[:, 0].str.startswith('SCENE_CENTER_TIME')].values
)
scene_center_time = scene_center_time['SCENE_CENTER_TIME'].replace(
'"', ''
)
hour = int(scene_center_time.split(':')[0])
date_acquired = dtime(
year, month, day, hour, tzinfo=datetime.timezone.utc
)
return MetaCoeffs(
sensor=sensor,
m_l=m_l,
a_l=a_l,
m_p=m_p,
a_p=a_p,
date_acquired=date_acquired,
sza=solar_zenith,
left=left,
bottom=bottom,
right=right,
top=top,
)
[docs] @staticmethod
def get_sentinel_coefficients(meta_file):
"""Gets coefficients from a Sentinel metadata file.
Args:
meta_file (str): The XML metadata file.
Returns:
``namedtuple``:
sensor, date_acquired, sza
"""
solar_zenith = None
MetaCoeffs = namedtuple(
'MetaCoeffs',
'sensor date_acquired sza grid_sza grid_saa grid_vza grid_vaa',
)
tree = ET.parse(meta_file)
root = tree.getroot()
# Get the sensor
for child in root:
if child.tag.split('}')[-1] == 'General_Info':
for segment in child:
if segment.tag == 'TILE_ID':
tile_id = segment.text
break
sensor = tile_id.split('_')[0].lower()
# Solar zenith angle
for child in root:
if child.tag.split('}')[-1] == 'Geometric_Info':
for segment in child:
if segment.tag == 'Tile_Angles':
for sub_segment in segment:
if sub_segment.tag == 'Mean_Sun_Angle':
for angle in sub_segment:
if angle.tag == 'ZENITH_ANGLE':
solar_zenith = float(angle.text)
break
# Acquisition date
for child in root:
if child.tag.split('}')[-1] == 'General_Info':
for segment in child:
if segment.tag == 'SENSING_TIME':
year, month, day = segment.text.split('-')
break
date_acquired = dtime(
int(year),
int(month),
int(day[:2]),
int(day[3:5]),
tzinfo=datetime.timezone.utc,
)
sza, saa = parse_sentinel_angles(meta_file, 'solar', 0)
vza, vaa = parse_sentinel_angles(meta_file, 'view', 0)
def grid_to_data_array(grid, band_names):
if len(grid.shape) == 2:
grid = grid[np.newaxis, :, :]
return xr.DataArray(
grid,
dims=('band', 'y', 'x'),
coords={
'band': band_names,
'y': range(0, grid.shape[1]),
'x': range(0, grid.shape[2]),
},
)
return MetaCoeffs(
sensor=sensor,
date_acquired=date_acquired,
sza=solar_zenith,
grid_sza=grid_to_data_array(sza, ['sza']),
grid_saa=grid_to_data_array(saa, ['saa']),
grid_vza=grid_to_data_array(
vza,
[
'coastal',
'blue',
'green',
'red',
'nir1',
'nir2',
'nir3',
'nir',
'rededge',
'water',
'cirrus',
'swir1',
'swir2',
],
),
grid_vaa=grid_to_data_array(
vaa,
[
'coastal',
'blue',
'green',
'red',
'nir1',
'nir2',
'nir3',
'nir',
'rededge',
'water',
'cirrus',
'swir1',
'swir2',
],
),
)
[docs]class LinearAdjustments(object):
"""A class for linear bandpass adjustments."""
def __init__(self):
self.coefficients = dict(
s2a=dict(
l5=None,
l7=None,
l8=dict(
alphas=dict(
coastal=-0.0002,
blue=-0.004,
green=-0.0009,
red=0.0009,
nir=-0.0001,
swir1=-0.0011,
swir2=-0.0012,
),
betas=dict(
coastal=0.9959,
blue=0.9778,
green=1.0053,
red=0.9765,
nir=0.9983,
swir1=0.9987,
swir2=1.003,
),
),
),
s2b=dict(
l5=None,
l7=None,
l8=dict(
alphas=dict(
coastal=-0.0002,
blue=-0.004,
green=-0.0008,
red=0.001,
nir=0.0,
swir1=-0.0003,
swir2=0.0004,
),
betas=dict(
coastal=0.9959,
blue=0.9778,
green=1.0075,
red=0.9761,
nir=0.9966,
swir1=1.0,
swir2=0.9867,
),
),
),
l5=dict(
l7=None,
l8=dict(
alphas=dict(
blue=-0.0095,
green=-0.0016,
red=-0.0022,
nir=-0.0021,
swir1=-0.003,
swir2=0.0029,
pan=-0.00443,
),
betas=dict(
blue=0.9785,
green=0.9542,
red=0.9825,
nir=1.0073,
swir1=1.0171,
swir2=0.9949,
pan=0.9717,
),
),
s2=None,
),
l7=dict(
l5=None,
l8=dict(
alphas=dict(
blue=-0.0095,
green=-0.0016,
red=-0.0022,
nir=-0.0021,
swir1=-0.003,
swir2=0.0029,
pan=-0.00443,
),
betas=dict(
blue=0.9785,
green=0.9542,
red=0.9825,
nir=1.0073,
swir1=1.0171,
swir2=0.9949,
pan=0.9717,
),
),
s2=None,
),
)
[docs] def bandpass(
self,
data,
sensor=None,
to='l8',
band_names=None,
scale_factor=1,
src_nodata=0,
dst_nodata=0,
):
"""Applies a bandpass adjustment by applying a linear function to
surface reflectance values.
Args:
data (DataArray): The data to adjust.
sensor (Optional[str]): The sensor to adjust.
to (Optional[str]): The sensor to adjust to.
band_names (Optional[list]): The bands to adjust. If not given, all bands are adjusted.
scale_factor (Optional[float]): A scale factor to apply to the input data.
src_nodata (Optional[int or float]): The input 'no data' value.
dst_nodata (Optional[int or float]): The output 'no data' value.
Reference:
Sentinel-2 and Landsat 8:
https://hls.gsfc.nasa.gov/algorithms/bandpass-adjustment/
See :cite:`chastain_etal_2019` for further details
Landsat 7 and Landsat 8:
See :cite:`roy_etal_2016` (Table 2)
Returns:
``xarray.DataArray``
Examples:
>>> import geowombat as gw
>>> from geowombat.radiometry import LinearAdjustments
>>>
>>> la = LinearAdjustments()
>>>
>>> # Adjust all Sentinel-2 bands to Landsat 8
>>> with gw.config.update(sensor='s2'):
>>> with gw.open('sentinel-2.tif') as ds:
>>> ds_adjusted = la.bandpass(ds, to='l8')
"""
attrs = data.attrs.copy()
# Set 'no data' as nans
data = data.where(data != src_nodata)
if scale_factor == 1.0:
scale_factor = data.gw.scale_factor
# Scale the reflectance data
if scale_factor != 1:
data = data * scale_factor
if not band_names:
band_names = data.band.values.tolist()
if band_names[0] == 1:
band_names = list(data.gw.wavelengths[sensor]._fields)
coeff_dict = self.coefficients[sensor][to]
alphas = np.array(
[coeff_dict['alphas'][bd] for bd in band_names], dtype='float64'
)
betas = np.array(
[coeff_dict['betas'][bd] for bd in band_names], dtype='float64'
)
alphas = xr.DataArray(
data=alphas, coords={'band': band_names}, dims='band'
)
betas = xr.DataArray(
data=betas, coords={'band': band_names}, dims='band'
)
# Apply the linear bandpass adjustment
data = alphas + betas * data
if scale_factor != 1:
data = data / scale_factor
data = data.fillna(dst_nodata)
data.attrs['adjustment'] = '{} to {}'.format(sensor, to)
data.attrs['alphas'] = alphas.data.tolist()
data.attrs['betas'] = betas.data.tolist()
data.attrs = attrs
return data
[docs]class RadTransforms(MetaData):
"""A class for radiometric transformations."""
[docs] def dn_to_sr(
self,
dn,
solar_za,
solar_az,
sensor_za,
sensor_az,
src_nodata=-32768,
dst_nodata=-32768,
sensor=None,
method='srem',
angle_factor=0.01,
meta=None,
interp_method='fast',
**kwargs
):
"""Converts digital numbers to surface reflectance.
Args:
dn (DataArray): The digital number data to calibrate.
solar_za (DataArray): The solar zenith angle.
solar_az (DataArray): The solar azimuth angle.
sensor_za (DataArray): The sensor, or view, zenith angle.
sensor_az (DataArray): The sensor, or view, azimuth angle.
src_nodata (Optional[int or float]): The input 'no data' value.
dst_nodata (Optional[int or float]): The output 'no data' value.
sensor (Optional[str]): The data's sensor.
method (Optional[str]): The correction method to use. Choices are ['srem', '6s'].
angle_factor (Optional[float]): The scale factor for angles.
meta (Optional[namedtuple]): A metadata object with gain and bias coefficients.
interp_method (Optional[str]): The LUT interpolation method if ``method`` = '6s'. Choices are ['fast', 'slow'].
'fast': Uses nearest neighbor lookup with ``scipy.interpolate.NearestNDInterpolator``.
'slow': Uses linear interpolation with ``scipy.interpolate.LinearNDInterpolator``.
kwargs (Optional[dict]): Extra keyword arguments passed to ``radiometry.sixs.SixS().rad_to_sr``.
References:
https://www.usgs.gov/land-resources/nli/landsat/using-usgs-landsat-level-1-data-product
Returns:
``xarray.DataArray``:
Data range: 0-1
Examples:
>>> from geowombat.radiometry import RadTransforms
>>>
>>> sr = RadTransforms()
>>> meta = sr.get_landsat_coefficients('file.MTL')
>>>
>>> # Convert DNs to surface reflectance using Landsat metadata
>>> with gw.open('dn.tif') as ds:
>>> sr_data = sr.dn_to_sr(ds, solar_za, sensor_za, meta=meta)
"""
attrs = dn.attrs.copy()
# Get the data band names and positional indices
band_names = dn.band.values.tolist()
if meta:
if not sensor:
sensor = meta.sensor
# Ensure that the metadata holder has matching bands
for bi in band_names:
if bi not in meta.m_p:
logger.warning(meta.m_p)
logger.warning(band_names)
logger.exception(
' The metadata holder does not have matching bands.'
)
raise ValueError
if bi not in meta.a_p:
logger.warning(meta.a_p)
logger.warning(band_names)
logger.exception(
' The metadata holder does not have matching bands.'
)
raise ValueError
if method == '6s':
# Get the gain and offsets and
# convert the gain and offsets
# to named coordinates.
# m_p = coeffs_to_array(meta.m_p, band_names)
# a_p = coeffs_to_array(meta.a_p, band_names)
m_l = coeffs_to_array(meta.m_l, band_names)
a_l = coeffs_to_array(meta.a_l, band_names)
# Convert DN to TOAR, with sun angle correction
# toar = self.dn_to_toar(dn, m_p, a_p, solar_za=solar_za, angle_factor=angle_factor, sun_angle=True)
# Invert TOAR to DN
# dn = (1.0 / m_p) * toar - a_p / m_p
# Convert DN to Radiance
radiance = self.dn_to_radiance(dn, m_l, a_l)
sr_data = []
sxs = SixS()
for band in band_names:
sr_data.append(
sxs.rad_to_sr(
radiance,
meta.sensor,
band,
solar_za,
meta.date_acquired.timetuple().tm_yday,
angle_factor=angle_factor,
interp_method=interp_method,
**kwargs
)
)
sr_data = xr.concat(sr_data, dim='band')
elif method == 'srem':
# Get the gain and offsets and
# convert the gain and offsets
# to named coordinates.
m_p = coeffs_to_array(meta.m_p, band_names)
a_p = coeffs_to_array(meta.a_p, band_names)
toar = self.dn_to_toar(
dn,
m_p,
a_p,
solar_za=solar_za,
angle_factor=angle_factor,
sun_angle=True,
)
else:
if not sensor:
sensor = dn.gw.sensor
# d = distance between the Earth and Sun in the astronomical unit
# ESUN = mean solar exoatmospheric radiation
GlobalArgs = namedtuple('GlobalArgs', 'pi d esun')
# TODO: set global arguments
global_args = GlobalArgs(pi=math.pi, d=None, esun=None)
radiance = self.dn_to_radiance(dn, None, None)
toar = self.radiance_to_toar(radiance, solar_za, global_args)
if method == 'srem':
sr_data = self.toar_to_sr(
toar,
solar_za,
solar_az,
sensor_za,
sensor_az,
sensor,
src_nodata=src_nodata,
dst_nodata=dst_nodata,
)
attrs['sensor'] = sensor
attrs['nodata'] = dst_nodata
attrs['calibration'] = 'surface reflectance'
attrs['method'] = method
attrs['drange'] = (0, 1)
return sr_data.assign_attrs(**attrs)
@staticmethod
def _linear_transform(data, gain, bias):
return gain * data + bias
[docs] def dn_to_radiance(self, dn, gain, bias):
"""Converts digital numbers to radiance.
Args:
dn (DataArray): The digital number data to calibrate.
gain (DataArray): A gain value.
bias (DataArray): A bias value.
Returns:
``xarray.DataArray``
"""
attrs = dn.attrs.copy()
attrs['calibration'] = 'radiance'
return self._linear_transform(dn, gain, bias).assign_attrs(**attrs)
[docs] def dn_to_toar(
self, dn, gain, bias, solar_za=None, angle_factor=0.01, sun_angle=True
):
"""Converts digital numbers to top-of-atmosphere reflectance.
Args:
dn (DataArray): The digital number data to calibrate.
gain (DataArray | dict): A gain value.
bias (DataArray | dict): A bias value.
solar_za (DataArray): The solar zenith angle.
angle_factor (Optional[float]): The scale factor for angles.
sun_angle (Optional[bool]): Whether to correct for the sun angle.
Returns:
``xarray.DataArray``
"""
if isinstance(gain, dict):
gain = coeffs_to_array(gain, dn.band.values.tolist())
if isinstance(bias, dict):
bias = coeffs_to_array(bias, dn.band.values.tolist())
attrs = dn.attrs.copy()
toar = self._linear_transform(dn, gain, bias)
if sun_angle:
if not isinstance(solar_za, xr.DataArray):
logger.exception(' The solar zenith must be supplied.')
raise NameError
# TOA reflectance with sun angle correction
cos_sza = xr.concat(
[np.cos(np.deg2rad(solar_za * angle_factor))] * len(toar.band),
dim='band',
)
cos_sza.coords['band'] = toar.band.values
toar = toar / cos_sza
attrs['calibration'] = 'top-of-atmosphere reflectance'
return toar.assign_attrs(**attrs)
[docs] @staticmethod
def radiance_to_toar(radiance, solar_za, global_args):
"""Converts radiance to top-of-atmosphere reflectance.
Args:
radiance (DataArray): The radiance data to calibrate.
solar_za (DataArray): The solar zenith angle.
global_args (namedtuple): Global arguments.
Returns:
``xarray.DataArray``
"""
attrs = radiance.attrs.copy()
solar_zenith_angle = solar_za * 0.01
toar_data = (global_args.pi * radiance * global_args.d**2) / (
global_args.esun * da.cos(solar_zenith_angle)
)
attrs['calibration'] = 'top-of-atmosphere reflectance'
toar_data.attrs = attrs
return toar_data
[docs] @staticmethod
def toar_to_rad(toar, meta):
"""Converts top of atmosphere reflectance to top of atmosphere
radiance.
Args:
toar (DataArray): The top of atmosphere reflectance (0-1).
sza (float | DataArray): The solar zenith angle (in degrees).
meta (Optional[namedtuple]): A metadata object with gain and bias coefficients.
Returns:
``xarray.DataArray``
"""
attrs = toar.attrs.copy()
# Get the mean incidence angle for each band
vza = (
meta.grid_vza.sel(band=toar.band.values.tolist())
.where(lambda x: x != 0)
.reduce(np.nanmean, dim=('y', 'x'))
)
band_names = toar.band.values.tolist()
# Julian day with ESA reference date
# https://sentinel.esa.int/web/sentinel/technical-guides/sentinel-2-msi/level-1c/algorithm
julian_day = (
meta.date_acquired.toordinal()
+ dtime.strptime('1950-1-1', '%Y-%m-%d').toordinal()
) # + 1721424.5
# Earth-sun distance
# This could be replaced with 1/U from the ESA metadata. However, since the
# U variable is stored in a separate metadata file it's easier to just
# calculate it here.
d2 = 1.0 / ((1.0 - 0.0167 * np.cos(0.0172 * (julian_day - 2.0))) ** 2)
# Band solar irradiance
solar_irradiances = get_sensor_info(
key='solar_irradiance', sensor=meta.sensor
)
solar_irradiances = {
b: getattr(solar_irradiances, b) for b in band_names
}
solar_irradiances = coeffs_to_array(solar_irradiances, band_names)
# Convert TOAR to radiance
rad = (
((toar * 10000.0) * np.cos(np.deg2rad(vza)) * solar_irradiances)
/ (np.pi * d2)
) * 0.0001
return rad.assign_attrs(**attrs)
[docs] def toar_to_sr(
self,
toar,
solar_za,
solar_az,
sensor_za,
sensor_az,
sensor=None,
src_nodata=-32768,
dst_nodata=-32768,
method='srem',
angle_factor=0.01,
meta=None,
interp_method='fast',
**kwargs
):
"""Converts top-of-atmosphere reflectance to surface reflectance.
Args:
toar (DataArray): The top-of-atmosphere reflectance (0-1).
solar_za (float | DataArray): The solar zenith angle.
solar_az (DataArray): The solar azimuth angle.
sensor_za (DataArray): The sensor zenith angle.
sensor_az (DataArray): The sensor azimuth angle.
sensor (Optional[str]): The data's sensor.
src_nodata (Optional[int or float]): The input 'no data' value.
dst_nodata (Optional[int or float]): The output 'no data' value.
method (Optional[str]): The method to use. Choices are ['srem', '6s'].
Choices:
'srem': A Simplified and Robust Surface Reflectance Estimation Method (SREM)
angle_factor (Optional[float]): The scale factor for angles.
meta (Optional[namedtuple]): A metadata object with gain and bias coefficients.
interp_method (Optional[str]): The LUT interpolation method if ``method`` = '6s'. Choices are ['fast', 'slow'].
'fast': Uses nearest neighbor lookup with ``scipy.interpolate.NearestNDInterpolator``.
'slow': Uses linear interpolation with ``scipy.interpolate.LinearNDInterpolator``.
kwargs (Optional[dict]): Extra keyword arguments passed to ``radiometry.sixs.SixS().toar_to_sr``.
References:
See :cite:`bilal_etal_2019` for the SREM method.
See :cite:`proud_etal_2010` and :cite:`lee_etal_2020` for the 6S method.
Returns:
``xarray.DataArray``:
Data range: 0-1
"""
attrs = toar.attrs.copy()
# Get the data band names and positional indices
band_names = toar.band.values.tolist()
# Set 'no data' as nans
toar = toar.where(toar != src_nodata)
if method == '6s':
if not sensor:
sensor = meta.sensor
# Convert TOAR to radiance
rad = self.toar_to_rad(toar, meta)
sr_data = []
sxs = SixS()
for band in band_names:
sr_data.append(
sxs.rad_to_sr(
rad,
sensor,
band,
solar_za,
meta.date_acquired.timetuple().tm_yday,
src_nodata=src_nodata,
dst_nodata=dst_nodata,
angle_factor=angle_factor,
interp_method=interp_method,
**kwargs
)
)
sr_data = xr.concat(sr_data, dim='band')
else:
# Get the central wavelength (in micrometers)
central_um = toar.gw.central_um[sensor]
band_names = list(toar.gw.wavelengths[sensor]._fields)
band_um = [getattr(central_um, p) * 1000.0 for p in band_names]
um = xr.DataArray(
data=band_um, coords={'band': band_names}, dims='band'
)
# Scale the angles to degrees
sza = solar_za * angle_factor
sza.coords['band'] = [1]
saa = solar_az * angle_factor
saa.coords['band'] = [1]
vza = sensor_za * angle_factor
vza.coords['band'] = [1]
vaa = sensor_az * angle_factor
vaa.coords['band'] = [1]
# Convert to radians
rad_sza = np.deg2rad(sza)
rad_vza = np.deg2rad(vza)
# Cosine(deg2rad(angles)) = angles x (pi / 180)
cos_sza = np.cos(rad_sza)
cos_sza.coords['band'] = [1]
cos_vza = np.cos(rad_vza)
cos_vza.coords['band'] = [1]
sin_sza = np.sin(rad_sza)
sin_sza.coords['band'] = [1]
sin_vza = np.sin(rad_vza)
sin_vza.coords['band'] = [1]
# air mass
m = (1.0 / cos_sza.sel(band=1)) + (1.0 / cos_vza.sel(band=1))
m = m.expand_dims(dim='band')
m = m.assign_coords(band=[1])
m = xr.concat([m] * len(toar.band), dim='band')
m.coords['band'] = toar.band.values
# Rayleigh optical depth
# Hansen, JF and Travis, LD (1974) LIGHT SCATTERING IN PLANETARY ATMOSPHERES
# Eq. 2.30, p. 544
r = (
0.008569
* um**-4
* (1.0 + 0.0113 * um**-2 + 0.0013 * um**-4)
)
# Relative azimuth angle
# TODO: doesn't work if the band coordinate is named
raa = relative_azimuth(saa, vaa)
rad_raa = np.deg2rad(raa)
cos_raa = np.cos(rad_raa)
# scattering angle = the angle between the direction of incident and scattered radiation
# Liu, CH and Liu GR (2009) AEROSOL OPTICAL DEPTH RETRIEVAL FOR SPOT HRV IMAGES, Journal of Marine Science and Technology
# http://stcorp.github.io/harp/doc/html/algorithms/derivations/scattering_angle.html
# cos_sza = cos(pi/180 x sza)
# cos_vza = cos(pi/180 x vza)
# sin_sza = sin(pi/180 x sza)
# sin_vza = sin(pi/180 x vza)
scattering_angle = np.arccos(
-cos_sza * cos_vza - sin_sza * sin_vza * cos_raa
)
cos2_scattering_angle = np.cos(scattering_angle) ** 2
# Rayleigh phase function
rayleigh_a = 0.9587256
rayleigh_b = 1.0 - rayleigh_a
rphase = ((3.0 * rayleigh_a) / (4.0 + rayleigh_b)) * (
1.0 + cos2_scattering_angle
)
# Get the air mass
pr_data = p_r(m, r, rphase, cos_sza, cos_vza)
toar_diff = toar - pr_data
# Total transmission = downward x upward
transmission = t_sv(r, cos_sza) * t_sv(r, cos_vza)
# Atmospheric backscattering ratio
ab_ratio = s_atm(r)
sr_data = (
(toar_diff / (toar_diff * ab_ratio + transmission))
.fillna(src_nodata)
.astype('float64')
)
# Create a 'no data' mask
mask = (
sr_data.where((sr_data != src_nodata) & (toar != src_nodata))
.count(dim='band')
.astype('uint8')
)
# Mask 'no data' values
sr_data = xr.where(
mask < sr_data.gw.nbands, dst_nodata, sr_data.clip(0, 1)
).transpose('band', 'y', 'x')
attrs['sensor'] = sensor
attrs['calibration'] = 'surface reflectance'
attrs['nodata'] = dst_nodata
attrs['method'] = method
attrs['drange'] = (0, 1)
return sr_data.assign_attrs(**attrs)
[docs]class DOS(AOT, RadTransforms):
[docs] def get_aot(
self,
data,
sza,
meta,
data_values='dn',
angle_factor=0.01,
dn_interp=None,
interp_method='fast',
aot_fallback=0.3,
h2o=2.0,
o3=0.3,
altitude=0.0,
w=None,
n_jobs=1,
):
"""Gets the aerosol optical thickness (AOT) from dark objects.
Args:
data (DataArray): The digital numbers or top of atmosphere reflectance at a coarse resolution.
sza (float | DataArray): The solar zenith angle.
meta (Optional[namedtuple]): A metadata object with gain and bias coefficients.
data_values (Optional[str]): The values of ``data``. Choices are ['dn', 'toar'].
angle_factor (Optional[float]): The scale factor for angles.
dn_interp (Optional[DataArray]): A source ``DataArray`` at the target resolution.
interp_method (Optional[str]): The LUT interpolation method. Choices are ['fast', 'slow'].
'fast': Uses nearest neighbor lookup with ``scipy.interpolate.NearestNDInterpolator``.
'slow': Uses linear interpolation with ``scipy.interpolate.LinearNDInterpolator``.
aot_fallback (Optional[float | DataArray]): The aerosol optical thickness fallback if no dark objects
are found (unitless). [0,3].
h2o (Optional[float]): The water vapor (g/m^2). [0,8.5].
o3 (Optional[float]): The ozone (cm-atm). [0,8].
altitude (Optional[float]): The altitude over the sensor acquisition location (km above sea level).
w (Optional[int]): The smoothing window size (in pixels).
n_jobs (Optional[int]): The number of parallel jobs for ``moving_window`` and ``dask.compute``.
Returns:
``xarray.DataArray``:
Data range: 0-3
References:
See :cite:`masek_etal_2006`, :cite:`kaufman_etal_1997`, and :cite:`ouaidrari_vermote_1999`.
"""
if data_values not in ['dn', 'toar']:
logger.exception(" The data values should be 'dn' or 'toar'")
raise NameError
if isinstance(sza, xr.DataArray):
sza = sza.squeeze().data.compute(num_workers=n_jobs)
sza *= angle_factor
band_names = data.band.values.tolist()
doy = meta.date_acquired.timetuple().tm_yday
if data_values == 'dn':
m_p = coeffs_to_array(meta.m_p, band_names)
a_p = coeffs_to_array(meta.a_p, band_names)
m_l = coeffs_to_array(meta.m_l, band_names)
a_l = coeffs_to_array(meta.a_l, band_names)
toar = self.dn_to_toar(data, m_p, a_p, sun_angle=False)
rad = self.dn_to_radiance(data, m_l, a_l)
else:
toar = data
rad = self.toar_to_rad(data, meta)
# Get the SWIR2 band TOAR
swir2_toar = toar.sel(band='swir2')
# Get the blue band Radiance
blue_rad = rad.sel(band='blue')
# Get SWIR2 TOAR dark pixels
swir2_toar_dark = xr.where(
(swir2_toar >= 0.01) & (swir2_toar <= 0.15), swir2_toar, np.nan
)
blue_rad_dark = xr.where(
(swir2_toar >= 0.01) & (swir2_toar <= 0.15), blue_rad, np.nan
)
# Estimate the blue surface reflectance with
# a simple linear transformation (Masek et al., 2006)
blue_p = swir2_toar_dark * 0.33
# Get reflectance and radiance data as numpy arrays
blue_p_data = blue_p.squeeze().data.compute(num_workers=n_jobs)
blue_rad_dark_data = blue_rad_dark.squeeze().data.compute(
num_workers=n_jobs
)
valid_idx = np.where(~np.isnan(blue_p_data))
if valid_idx[0].shape[0] > 0:
aot = self.get_optimized_aot(
blue_rad_dark_data,
blue_p_data,
meta.sensor,
'blue',
interp_method,
sza,
doy,
h2o,
o3,
altitude,
)
mask = np.ones(aot.shape, dtype='uint8')
mask[np.isnan(blue_p_data)] = 0
aot = fillnodata(aot, mask=mask, max_search_distance=100)
if isinstance(dn_interp, xr.DataArray):
aot = self._resize(aot, dn_interp, w, n_jobs)
return ndarray_to_xarray(dn_interp, aot, ['aot'])
else:
return ndarray_to_xarray(data, aot, ['aot'])
else:
if isinstance(dn_interp, xr.DataArray):
return ndarray_to_xarray(
dn_interp,
np.zeros(
(dn_interp.gw.nrows, dn_interp.gw.ncols),
dtype='float64',
)
+ aot_fallback,
['aot'],
)
else:
return ndarray_to_xarray(
data,
np.zeros((data.gw.nrows, data.gw.ncols), dtype='float64')
+ aot_fallback,
['aot'],
)
@staticmethod
def _resize(aot, src_interp, w, n_jobs):
aot = zoom(
aot,
(
src_interp.gw.nrows / aot.shape[0],
src_interp.gw.ncols / aot.shape[1],
),
order=3,
)
if isinstance(w, int):
hw = int(w / 2.0)
aot_padded = np.pad(aot, pad_width=hw, mode='reflect')
aot = moving_window(
np.float64(aot_padded), stat='mean', w=w, n_jobs=n_jobs
)[hw:-hw, hw:-hw]
return aot