import logging
from collections import namedtuple
from copy import copy
import dask.array as da
import numpy as np
import xarray as xr
from ..core.util import project_coords
from ..handler import add_handler
from .angles import relative_azimuth
logger = logging.getLogger(__name__)
logger = add_handler(logger)
[docs]class Special(object):
[docs] @staticmethod
def get_distance(tan1, tan2, cos3):
"""Gets distance component of Li kernels."""
temp = tan1 * tan1 + tan2 * tan2 - 2.0 * tan1 * tan2 * cos3
return da.sqrt(da.maximum(temp, 0))
[docs] @staticmethod
def get_overlap(cos1, cos2, tan1, tan2, sin3, distance, hb, m_pi):
"""Applies the HB ratio transformation."""
OverlapInfo = namedtuple('OverlapInfo', 'tvar sint overlap temp')
temp = (1.0 / cos1) + (1.0 / cos2)
cost = da.clip(
hb
* da.sqrt(
distance * distance + tan1 * tan1 * tan2 * tan2 * sin3 * sin3
)
/ temp,
-1,
1,
)
tvar = da.arccos(cost)
sint = da.sin(tvar)
overlap = 1.0 / m_pi * (tvar - sint * cost) * temp
overlap = da.maximum(overlap, 0)
return OverlapInfo(tvar=tvar, sint=sint, overlap=overlap, temp=temp)
[docs]class Angles(object):
[docs] @staticmethod
def get_phaang(cos_vza, cos_sza, sin_vza, sin_sza, cos_raa):
"""Gets the phase angle."""
cos_phase_angle = da.clip(
cos_vza * cos_sza + sin_vza * sin_sza * cos_raa, -1, 1
)
phase_angle = da.arccos(cos_phase_angle)
sin_phase_angle = da.sin(phase_angle)
return cos_phase_angle, phase_angle, sin_phase_angle
[docs] @staticmethod
def get_pangles(tan1, br, nearly_zero):
"""Get the prime angles."""
tanp = br * tan1
tanp = da.where(tanp < 0, 0, tanp)
angp = da.arctan(tanp)
sinp = da.sin(angp)
cosp = da.cos(angp)
# have to make sure c is not 0
cosp = da.where(cosp == 0, nearly_zero, cosp)
return cosp, sinp, tanp
[docs] @staticmethod
def get_angle_info(vza, sza, raa, m_pi):
"""Gets the angle information."""
AngleInfo = namedtuple(
'AngleInfo', 'vza sza raa vza_rad sza_rad raa_rad'
)
# View zenith angle
vza_rad = da.deg2rad(vza)
# Solar zenith angle
sza_rad = da.deg2rad(sza)
# Relative azimuth angle
raa_rad = da.deg2rad(raa)
vza_abs = da.fabs(vza_rad)
sza_abs = da.fabs(sza_rad)
raa_abs = da.where((vza_rad < 0) | (sza_rad < 0), m_pi, raa_rad)
return AngleInfo(
vza=vza,
sza=sza,
raa=raa,
vza_rad=vza_abs,
sza_rad=sza_abs,
raa_rad=raa_abs,
)
[docs]class LiKernel(Special, Angles):
[docs] def get_li(self, kernel_type, li_recip):
# relative azimuth angle
# ensure it is in a [0,2] pi range
phi = da.fabs(
(self.angle_info.raa_rad % (2.0 * self.global_args.m_pi))
)
cos_phi = da.cos(phi)
sin_phi = da.sin(phi)
tanti = da.tan(self.angle_info.sza_rad)
tantv = da.tan(self.angle_info.vza_rad)
cos1, sin1, tan1 = self.get_pangles(
tantv, self.global_args.br, self.global_args.nearly_zero
)
cos2, sin2, tan2 = self.get_pangles(
tanti, self.global_args.br, self.global_args.nearly_zero
)
# sets cos & sin phase angle terms
cos_phaang, phaang, sin_phaang = self.get_phaang(
cos1, cos2, sin1, sin2, cos_phi
)
distance = self.get_distance(tan1, tan2, cos_phi)
overlap_info = self.get_overlap(
cos1,
cos2,
tan1,
tan2,
sin_phi,
distance,
self.global_args.hb,
self.global_args.m_pi,
)
if kernel_type.lower() == 'sparse':
if li_recip:
li = (
overlap_info.overlap
- overlap_info.temp
+ 0.5 * (1.0 + cos_phaang) / cos1 / cos2
)
else:
li = (
overlap_info.overlap
- overlap_info.temp
+ 0.5 * (1.0 + cos_phaang) / cos1
)
else:
if kernel_type.lower() == 'dense':
if li_recip:
li = (1.0 + cos_phaang) / (
cos1
* cos2
* (overlap_info.temp - overlap_info.overlap)
) - 2.0
else:
li = (1.0 + cos_phaang) / (
cos1 * (overlap_info.temp - overlap_info.overlap)
) - 2.0
return li
[docs]class RossKernel(Special, Angles):
[docs] def ross_part(self, angle_info, global_args):
"""Calculates the main part of Ross kernel."""
RossKernelOutputs = namedtuple(
'RossKernelOutputs',
'cos_vza cos_sza sin_vza sin_sza cos_raa ross_element cos_phase_angle phase_angle sin_phase_angle ross',
)
cos_vza = da.cos(angle_info.vza_rad)
cos_sza = da.cos(angle_info.sza_rad)
sin_vza = da.sin(angle_info.vza_rad)
sin_sza = da.sin(angle_info.sza_rad)
cos_raa = da.cos(angle_info.raa_rad)
cos_phase_angle, phase_angle, sin_phase_angle = self.get_phaang(
cos_vza, cos_sza, sin_vza, sin_sza, cos_raa
)
ross_element = (
global_args.m_pi / 2.0 - phase_angle
) * cos_phase_angle + sin_phase_angle
return RossKernelOutputs(
cos_vza=cos_vza,
cos_sza=cos_sza,
sin_vza=sin_vza,
sin_sza=sin_sza,
cos_raa=cos_raa,
ross_element=ross_element,
cos_phase_angle=cos_phase_angle,
phase_angle=phase_angle,
sin_phase_angle=sin_phase_angle,
ross=None,
)
[docs] @staticmethod
def ross_thin(ross_outputs):
RossThinOutputs = namedtuple('RossThinOutputs', 'ross phase_angle')
ross_ = ross_outputs.ross_element / (
ross_outputs.cos_vza * ross_outputs.cos_sza
)
return RossThinOutputs(
ross=ross_, phase_angle=ross_outputs.phase_angle
)
[docs] @staticmethod
def ross_thick(ross_outputs):
RossThickOutputs = namedtuple('RossThickOutputs', 'ross phase_angle')
ross_ = ross_outputs.ross_element / (
ross_outputs.cos_vza + ross_outputs.cos_sza
)
return RossThickOutputs(
ross=ross_, phase_angle=ross_outputs.phase_angle
)
[docs] def get_ross(self, kernel_type):
ross_outputs = self.ross_part(self.angle_info, self.global_args)
if kernel_type.lower() == 'thin':
ross_kernel_outputs = self.ross_thin(ross_outputs)
else:
ross_kernel_outputs = self.ross_thick(ross_outputs)
if self.global_args.hs:
ross = ross_kernel_outputs.ross * (
1.0 + 1.0 / (1.0 + ross_kernel_outputs.phase_angle / 0.25)
)
else:
ross = ross_kernel_outputs.ross - self.global_args.m_pi / 4.0
return ross
[docs]class BRDFKernels(LiKernel, RossKernel):
"""A class for the Li and Ross BRDF kernels.
Args:
vza (dask.array): The view zenith angle.
sza (dask.array): The solar zenith angle.
raa (dask.array): The relative azimuth angle.
li_type (Optional[str]): The Li kernel type. Choices are ['sparse', 'dense'].
ross_type (Optional[str]): The Ross kernel type. Choices are ['thin', 'thick'].
br (Optional[float]): The BR ratio.
hb (Optional[float]): The HB ratio.
"""
def __init__(
self,
vza,
sza,
raa,
li_type='sparse',
ross_type='thick',
li_recip=True,
br=1.0,
hb=2.0,
hs=False,
):
GlobalArgs = namedtuple('GlobalArgs', 'br m_pi hb hs nearly_zero')
self.global_args = GlobalArgs(
br=br, m_pi=np.pi, hb=hb, hs=hs, nearly_zero=1e-20
)
self.angle_info = self.get_angle_info(
vza, sza, raa, self.global_args.m_pi
)
self.li_k = self.get_li(li_type, li_recip)
self.ross_k = self.get_ross(ross_type)
[docs]class GeoVolKernels(object):
[docs] @staticmethod
def get_mean_sza(central_latitude):
"""Returns the mean solar zenith angle (SZA) as a function of the
central latitude.
Args:
central_latitude (float): The central latitude.
Reference:
See :cite:`zhang_etal_2016`
Returns:
``float``
"""
return (
31.0076
+ -0.1272 * central_latitude
+ 0.01187 * (central_latitude**2)
+ 2.40e-05 * (central_latitude**3)
+ -9.48e-07 * (central_latitude**4)
+ -1.95e-09 * (central_latitude**5)
+ 6.15e-11 * (central_latitude**6)
)
[docs] def get_kernels(
self, central_latitude, solar_za, solar_az, sensor_za, sensor_az
):
# Get the geometric scattering kernel.
#
# HLS uses a constant (per location) sun zenith angle (`solar_za`).
# HLS uses 0 for sun azimuth angle (`solar_az`).
# theta_v, theta_s, delta_gamma
kl = BRDFKernels(0.0, self.get_mean_sza(central_latitude), 0.0)
# Copy the geometric scattering
# coefficients so they are
# not overwritten by the
# volume scattering coefficients.
self.geo_norm = copy(kl.li_k)
self.vol_norm = copy(kl.ross_k)
# Get the volume scattering kernel.
#
# theta_v=0 for nadir view zenith angle, theta_s, delta_gamma
kl = BRDFKernels(
sensor_za.data,
solar_za.data,
relative_azimuth(solar_az, sensor_az).data,
)
self.geo_sensor = kl.li_k
self.vol_sensor = kl.ross_k
[docs]class BRDF(GeoVolKernels):
"""A class for Bidirectional Reflectance Distribution Function (BRDF)
normalization."""
def __init__(self):
self.geo_norm = None
self.vol_norm = None
self.geo_sensor = None
self.vol_sensor = None
# Setup the c-factor equation.
#
# `SA` = the sensor array
self.c_equation = 'SA * ((fiso + fvol*vol_norm + fgeo*geo_norm) / (fiso + fvol*vol_sensor + fgeo*geo_sensor))'
# A dictionary of BRDF kernel coefficients
self.coeff_dict = dict(
blue=dict(fiso=0.0774, fgeo=0.0079, fvol=0.0372),
green=dict(fiso=0.1306, fgeo=0.0178, fvol=0.058),
red=dict(fiso=0.169, fgeo=0.0227, fvol=0.0574),
nir=dict(fiso=0.3093, fgeo=0.033, fvol=0.1535),
swir1=dict(fiso=0.343, fgeo=0.0453, fvol=0.1154),
swir2=dict(fiso=0.2658, fgeo=0.0387, fvol=0.0639),
pan=dict(fiso=0.12567, fgeo=0.01613, fvol=0.0509),
)
def _get_coeffs(self, sensor_band):
return self.coeff_dict[sensor_band]
[docs] def norm_brdf(
self,
data,
solar_za,
solar_az,
sensor_za,
sensor_az,
central_latitude=None,
sensor=None,
wavelengths=None,
src_nodata=-32768,
dst_nodata=-32768,
mask=None,
scale_factor=1.0,
out_range=None,
scale_angles=True,
vol_weight=1.0,
):
r"""Applies Nadir Bidirectional Reflectance Distribution Function (BRDF) normalization
using the global c-factor method
Args:
data (2d or 3d DataArray): The data to normalize.
solar_za (2d DataArray): The solar zenith angles (degrees).
solar_az (2d DataArray): The solar azimuth angles (degrees).
sensor_za (2d DataArray): The sensor azimuth angles (degrees).
sensor_az (2d DataArray): The sensor azimuth angles (degrees).
central_latitude (Optional[float or 2d DataArray]): The central latitude.
sensor (Optional[str]): The satellite sensor.
wavelengths (str list): The wavelength(s) to normalize.
src_nodata (Optional[int or float]): The input 'no data' value.
dst_nodata (Optional[int or float]): The output 'no data' value.
mask (Optional[DataArray]): A data mask, where clear values are 0.
scale_factor (Optional[float]): A scale factor to apply to the input data.
out_range (Optional[float]): The out data range. If not given, the output data are return in a 0-1 range.
scale_angles (Optional[bool]): Whether to scale the pixel angle arrays.
vol_weight (Optional[float]): Weight to be applied to the volumetric kernel in the c-factor equation
References:
See :cite:`roy_etal_2016` for the c-factor method.
For further background on BRDF:
:cite:`li_strahler_1992`
:cite:`roujean_etal_1992`
:cite:`schaaf_etal_2002`
Returns:
``xarray.DataArray``
Examples:
>>> import geowombat as gw
>>> from geowombat.radiometry import BRDF
>>>
>>> brdf = BRDF()
>>>
>>> # Example where pixel angles are stored in separate GeoTiff files
>>> with gw.config.update(sensor='l7', scale_factor=0.0001):
>>>
>>> with gw.open('solarz.tif') as solarz,
>>> gw.open('solara.tif') as solara,
>>> gw.open('sensorz.tif') as sensorz,
>>> gw.open('sensora.tif') as sensora:
>>>
>>> with gw.open('landsat.tif') as src:
>>> src_norm = brdf.norm_brdf(src, solarz, solara, sensorz, sensora)
"""
if not wavelengths:
if sensor:
wavelengths = list(data.gw.wavelengths[sensor]._fields)
else:
if not data.gw.sensor:
logger.exception(' The sensor must be supplied.')
wavelengths = list(data.gw.wavelengths[data.gw.sensor]._fields)
if not wavelengths:
logger.exception(' The sensor or wavelength must be supplied.')
if not isinstance(dst_nodata, (int, float)):
dst_nodata = data.gw.nodataval
if not isinstance(central_latitude, np.ndarray):
if not isinstance(central_latitude, xr.DataArray):
if not isinstance(central_latitude, float):
central_latitude = project_coords(
np.array(
[data.x.values[int(data.x.shape[0] / 2)]],
dtype='float64',
),
np.array(
[data.y.values[int(data.y.shape[0] / 2)]],
dtype='float64',
),
data.crs,
{'init': 'epsg:4326'},
)[1][0]
attrs = data.attrs.copy()
# Set 'no data' as nans and scale the reflectance data
data = data.gw.set_nodata(
src_nodata,
np.nan,
out_range=(0, 1),
dtype='float64',
scale_factor=scale_factor,
offset=0,
)
if scale_angles:
# Scale the angle data to degrees
solar_za = solar_za * 0.01
solar_za.coords['band'] = [1]
solar_az = solar_az * 0.01
solar_az.coords['band'] = [1]
sensor_za = sensor_za * 0.01
sensor_za.coords['band'] = [1]
sensor_az = sensor_az * 0.01
sensor_az.coords['band'] = [1]
# Get the Ross and Li coefficients
self.get_kernels(
central_latitude, solar_za, solar_az, sensor_za, sensor_az
)
results = []
for si, wavelength in enumerate(wavelengths):
# Get the band iso, geo, and vol coefficients.
coeffs = self._get_coeffs(wavelength)
# c-factor
c_factor = (
coeffs['fiso']
+ coeffs['fvol'] * self.vol_norm * vol_weight
+ coeffs['fgeo'] * self.geo_norm
) / (
coeffs['fiso']
+ coeffs['fvol'] * self.vol_sensor * vol_weight
+ coeffs['fgeo'] * self.geo_sensor
)
p_norm = data.sel(band=wavelength).data * c_factor
# Apply the adjustment to the current layer.
results.append(p_norm)
data = xr.DataArray(
data=da.concatenate(results),
dims=('band', 'y', 'x'),
coords={'band': data.band.values, 'y': data.y, 'x': data.x},
attrs=data.attrs,
).fillna(src_nodata)
if isinstance(out_range, (int, float, tuple)):
if isinstance(out_range, (int, float)):
range_max = out_range
else:
range_max = out_range[1]
if range_max <= 1:
dtype = 'float64'
elif 1 < range_max <= 255:
dtype = 'uint8'
else:
dtype = 'uint16'
drange = (0, range_max)
data = xr.where(
data == src_nodata,
src_nodata,
(data * range_max).clip(0, range_max),
)
else:
drange = (0, 1)
dtype = 'float64'
# Mask data
if isinstance(mask, xr.DataArray):
data = xr.where(
(mask.sel(band=1) == 1)
| (solar_za.sel(band=1) == -32768 * 0.01)
| (data == src_nodata),
dst_nodata,
data,
)
else:
data = xr.where(
(solar_za.sel(band=1) == -32768 * 0.01) | (data == src_nodata),
dst_nodata,
data,
)
data = data.transpose('band', 'y', 'x').astype(dtype)
attrs['sensor'] = sensor
attrs['calibration'] = 'BRDF-adjusted surface reflectance'
attrs['drange'] = drange
return data.assign_attrs(**attrs).gw.assign_nodata_attrs(dst_nodata)