"""interpolated_LUTs.py.
The Interpolated_LUTs class handles loading, downloading and interpolating
of LUTs (look up tables) used by the 6S emulator
Reference:
https://github.com/samsammurphy/6S_emulator/blob/master/bin/interpolated_LUTs.py
"""
import logging
import random
import shutil
import string
from collections import namedtuple
from pathlib import Path
import geopandas as gpd
import joblib
import numpy as np
import xarray as xr
import geowombat as gw
from ..core import ndarray_to_xarray
from ..data import (
LUTDownloader,
NASAEarthdataDownloader,
srtm30m_bounding_boxes,
)
from ..handler import add_handler
logger = logging.getLogger(__name__)
logger = add_handler(logger)
LUTNames = namedtuple('LUTNames', 'name path')
DATA_PATH = Path.home() / '.geowombat/datasets'
def _set_names(sensor_name):
lut_path = DATA_PATH / 'lut'
lut_path.mkdir(parents=True, exist_ok=True)
return LUTNames(name=sensor_name, path=lut_path)
SENSOR_LOOKUP = {
'l5': _set_names('l5'),
'l7': _set_names('l7'),
'l8': _set_names('l8'),
's2a': _set_names('s2a'),
's2b': _set_names('s2b'),
}
def _random_id(string_length):
"""Generates a random string of letters and digits."""
letters_digits = string.ascii_letters + string.digits
return ''.join(random.choice(letters_digits) for i in range(string_length))
[docs]class Altitude(object):
[docs] @staticmethod
def get_mean_altitude(
data,
out_dir,
username=None,
key_file=None,
code_file=None,
chunks=512,
n_jobs=1,
delete_downloaded=False,
):
if not username:
username = data.gw.nasa_earthdata_user
if not key_file:
key_file = data.gw.nasa_earthdata_key
if not code_file:
code_file = data.gw.nasa_earthdata_code
if not username or not key_file or not code_file:
logger.exception(
' The NASA EarthData username, secret key file, and secret code file must be provided to download SRTM data.'
)
raise AttributeError
if not Path(out_dir).is_dir():
Path(out_dir).mkdir(parents=True, exist_ok=True)
srtm_grid_path_temp = (
Path(out_dir) / f'srtm30m_bounding_boxes_{_random_id(9)}.gpkg'
)
shutil.copy(str(srtm30m_bounding_boxes), str(srtm_grid_path_temp))
srtm_df = gpd.read_file(srtm_grid_path_temp)
srtm_grid_path_temp.unlink()
srtm_df_int = srtm_df[
srtm_df.geometry.intersects(
data.gw.geodataframe.to_crs(epsg=4326).geometry.values[0]
)
]
nedd = NASAEarthdataDownloader(username, key_file, code_file)
hgt_files = []
zip_paths = []
for dfn in srtm_df_int.dataFile.values.tolist():
zip_file = f"{out_dir}/NASADEM_HGT_{dfn.split('.')[0].lower()}.zip"
nedd.download_srtm(dfn.split('.')[0].lower(), zip_file)
src_zip = f"zip+file://{zip_file}!/{Path(zip_file).stem.split('_')[-1]}.hgt"
hgt_files.append(Path(zip_file))
zip_paths.append(src_zip)
if len(zip_paths) == 1:
zip_paths = zip_paths[0]
mosaic = False
else:
mosaic = True
with gw.open(zip_paths, mosaic=mosaic, chunks=chunks) as src:
mean_elev = (
src.transpose('band', 'y', 'x')
.mean()
.data.compute(num_workers=n_jobs)
)
if delete_downloaded:
for fn in hgt_files:
fn.unlink()
return mean_elev
[docs]class SixSMixin(object):
@staticmethod
def _load(sensor, wavelength, interp_method, from_toar=False):
if from_toar:
raise NotImplementedError(
'Lookup tables from top of atmosphere reflectance are not supported.'
)
# lut_path = SENSOR_LOOKUP[sensor].path / f'{sensor}_{wavelength}_from_toar.lut'
else:
lut_path = (
SENSOR_LOOKUP[sensor].path / f'{sensor}_{wavelength}.lut'
)
if not lut_path.is_file():
logger.info(
f' Downloading {lut_path.name} into {SENSOR_LOOKUP[sensor].path}.'
)
lutd = LUTDownloader()
lutd.download(
f'https://s3geowombat.s3.amazonaws.com/{sensor}_{wavelength}.lut',
str(lut_path),
safe_download=False,
)
lut_ = joblib.load(str(lut_path))
return lut_[interp_method]
@staticmethod
def _rad_to_sr_from_coeffs(rad, xa, xb, xc):
"""Transforms radiance to surface reflectance using 6S coefficients.
Args:
rad (float | DataArray): The radiance.
xa (float | DataArray): The inverse of the transmittance.
xb (float | DataArray): The scattering term of the atmosphere.
xc (float | DataArray): The spherical albedo (atmospheric reflectance for isotropic light).
Returns:
``float`` | ``xarray.DataArray``
References:
https://py6s.readthedocs.io/en/latest/installation.html
y = xa * (measured radiance) - xb
acr = y / (1. + xc * y)
"""
y = xa * rad - xb
return y / (1.0 + xc * y)
[docs]class SixS(Altitude, SixSMixin):
"""A class to handle loading, downloading and interpolating of LUTs (look
up tables) used by the 6S emulator.
Args:
sensor (str): The sensor to adjust.
rad_scale (Optional[float]): The radiance scale factor. Scaled values should be in the range [0,1000].
angle_factor (Optional[float]): The angle scale factor.
Example:
>>> sixs = SixS('l5', verbose=1)
>>>
>>> with gw.config.update(sensor='l7'):
>>> with gw.open('image.tif') as src, gw.open('solar_za') as sza:
>>> sixs.rad_to_sr(src, 'blue', sza, doy, h2o=1.0, o3=0.4, aot=0.3)
"""
@staticmethod
def _toar_to_sr_from_coeffs(toar, t_g, p_alpha, s, t_s, t_v):
"""Transforms top of atmosphere reflectance to surface reflectance
using 6S coefficients.
Args:
toar (float | DataArray): The top of atmosphere reflectance.
t_g (float): The total gaseous transmission of the atmosphere.
p_alpha (float): The atmospheric reflectance.
s (float): The spherical albedo of the atmosphere.
t_s (float): The atmospheric transmittance from sun to target.
t_v (float): The atmospheric transmittance from target to satellite.
Returns:
``float`` | ``xarray.DataArray``
"""
sr_s = ((toar / t_g) - p_alpha) / (t_s * t_v)
return sr_s / (1.0 + s * sr_s)
[docs] @staticmethod
def prepare_coeff(band_data, coeffs, cindex):
return ndarray_to_xarray(band_data, coeffs[:, :, cindex], ['coeff'])
@staticmethod
def _mask_nodata(data, other_data, src_nodata, dst_nodata):
# Create a 'no data' mask
mask = (
data.where((data != src_nodata) & (other_data != src_nodata))
.count(dim='band')
.astype('uint8')
)
# Mask 'no data' values
return xr.where(
mask < data.gw.nbands, dst_nodata, data.clip(0, 1)
).transpose('band', 'y', 'x')
[docs] def toar_to_sr(
self,
data,
sensor,
wavelength,
sza,
doy,
src_nodata=-32768,
dst_nodata=-32768,
angle_factor=0.01,
interp_method='fast',
h2o=1.0,
o3=0.4,
aot=0.3,
altitude=0.0,
n_jobs=1,
):
"""Converts top of atmosphere reflectance to surface reflectance using
6S outputs.
Args:
data (DataArray): The top of atmosphere reflectance.
sensor (str): The sensor name.
wavelength (str): The band wavelength to process.
sza (float | DataArray): The solar zenith angle.
doy (int): The day of year.
src_nodata (Optional[int or float]): The input 'no data' value.
dst_nodata (Optional[int or float]): The output 'no data' value.
angle_factor (Optional[float]): The scale factor for angles.
interp_method (Optional[str]): The LUT interpolation method. Choices are ['fast', 'slow'].
'fast': Uses nearest neighbor lookup with ``scipy.interpolate.NearestNDInterpolator``.
'slow': Uses linear interpolation with ``scipy.interpolate.LinearNDInterpolator``.
h2o (Optional[float]): The water vapor (g/m^2). [0,8.5].
o3 (Optional[float]): The ozone (cm-atm). [0,8].
aot (Optional[float | DataArray]): The aerosol optical thickness (unitless). [0,3].
altitude (Optional[float]): The altitude over the sensor acquisition location.
n_jobs (Optional[int]): The number of parallel jobs for ``dask.compute``.
6S model outputs:
t_g (float): The total gaseous transmission of the atmosphere.
s.run() --> s.outputs.total_gaseous_transmittance
p_alpha (float): The atmospheric reflectance.
s.run() --> s.outputs.atmospheric_intrinsic_reflectance
s (float): The spherical albedo of the atmosphere.
s.run() --> s.outputs.spherical_albedo
t_s (float): The atmospheric transmittance from sun to target.
s.run() --> s.outputs.transmittance_total_scattering.downward
t_v (float): The atmospheric transmittance from target to satellite.
s.run() --> s.outputs.transmittance_total_scattering.upward
"""
attrs = data.attrs.copy()
# Load the LUT
lut = self._load(sensor, wavelength, interp_method, from_toar=True)
band_data = data.sel(band=wavelength)
if isinstance(sza, xr.DataArray):
sza = (
sza.squeeze()
.astype('float64')
.data.compute(num_workers=n_jobs)
)
sza *= angle_factor
if not isinstance(aot, xr.DataArray):
aot = xr.zeros_like(data[0]).squeeze() + aot
# t_g, p_alpha, s, t_s, t_v
coeffs = lut(sza, h2o, o3, aot, altitude)
elliptical_orbit_correction = (
0.03275104 * np.cos(doy / 59.66638337) + 0.96804905
)
coeffs *= elliptical_orbit_correction
t_g = self.prepare_coeff(band_data, coeffs, 0)
p_alpha = self.prepare_coeff(band_data, coeffs, 1)
s = self.prepare_coeff(band_data, coeffs, 2)
t_s = self.prepare_coeff(band_data, coeffs, 3)
t_v = self.prepare_coeff(band_data, coeffs, 4)
sr = (
self._toar_to_sr_from_coeffs(
band_data,
t_g.sel(band='coeff'),
p_alpha.sel(band='coeff'),
s.sel(band='coeff'),
t_s.sel(band='coeff'),
t_v.sel(band='coeff'),
)
.fillna(src_nodata)
.expand_dims(dim='band')
.assign_coords(coords={'band': [wavelength]})
.astype('float64')
)
sr = self._mask_nodata(sr, band_data, src_nodata, dst_nodata)
attrs['sensor'] = sensor
attrs['nodata'] = dst_nodata
attrs['calibration'] = 'surface reflectance'
attrs['method'] = '6s radiative transfer model'
attrs['drange'] = (0, 1)
return sr.assign_attrs(**attrs)
[docs] def rad_to_sr(
self,
data,
sensor,
wavelength,
sza,
doy,
src_nodata=-32768,
dst_nodata=-32768,
angle_factor=0.01,
interp_method='fast',
h2o=1.0,
o3=0.4,
aot=0.3,
altitude=0.0,
n_jobs=1,
):
"""Converts radiance to surface reflectance using a 6S radiative
transfer model lookup table.
Args:
data (DataArray): The data to correct, in radiance.
sensor (str): The sensor name.
wavelength (str): The band wavelength to process.
sza (float | DataArray): The solar zenith angle.
doy (int): The day of year.
src_nodata (Optional[int or float]): The input 'no data' value.
dst_nodata (Optional[int or float]): The output 'no data' value.
angle_factor (Optional[float]): The scale factor for angles.
interp_method (Optional[str]): The LUT interpolation method. Choices are ['fast', 'slow'].
'fast': Uses nearest neighbor lookup with ``scipy.interpolate.NearestNDInterpolator``.
'slow': Uses linear interpolation with ``scipy.interpolate.LinearNDInterpolator``.
h2o (Optional[float]): The water vapor (g/m^2). [0,8.5].
o3 (Optional[float]): The ozone (cm-atm). [0,8].
aot (Optional[float | DataArray]): The aerosol optical thickness (unitless). [0,3].
altitude (Optional[float]): The altitude over the sensor acquisition location.
n_jobs (Optional[int]): The number of parallel jobs for ``dask.compute``.
Returns:
``xarray.DataArray``:
Data range: 0-1
"""
# Load the LUT
lut = self._load(sensor, wavelength, interp_method)
attrs = data.attrs.copy()
band_data = data.sel(band=wavelength)
if isinstance(sza, xr.DataArray):
sza = (
sza.squeeze()
.astype('float64')
.data.compute(num_workers=n_jobs)
)
if isinstance(aot, xr.DataArray):
aot = (
aot.squeeze()
.astype('float64')
.data.compute(num_workers=n_jobs)
)
else:
aot = (
np.zeros((data.gw.nrows, data.gw.ncols), dtype='float64') + aot
)
sza *= angle_factor
coeffs = lut(sza, h2o, o3, aot, altitude)
elliptical_orbit_correction = (
0.03275104 * np.cos(doy / 59.66638337) + 0.96804905
)
coeffs *= elliptical_orbit_correction
xa = self.prepare_coeff(band_data, coeffs, 0)
xb = self.prepare_coeff(band_data, coeffs, 1)
xc = self.prepare_coeff(band_data, coeffs, 2)
sr = (
self._rad_to_sr_from_coeffs(
band_data,
xa.sel(band='coeff'),
xb.sel(band='coeff'),
xc.sel(band='coeff'),
)
.fillna(src_nodata)
.expand_dims(dim='band')
.assign_coords(coords={'band': [wavelength]})
.astype('float64')
)
sr = self._mask_nodata(sr, band_data, src_nodata, dst_nodata)
attrs['sensor'] = sensor
attrs['nodata'] = dst_nodata
attrs['calibration'] = 'surface reflectance'
attrs['method'] = '6s radiative transfer model'
attrs['drange'] = (0, 1)
return sr.assign_attrs(**attrs)
[docs]class AOT(SixSMixin):
[docs] def get_optimized_aot(
self,
blue_rad_dark,
blue_p_dark,
sensor,
wavelength,
interp_method,
sza,
doy,
h2o,
o3,
altitude,
max_aot=0.5,
):
"""Gets the optimal aerosol optical thickness.
Args:
blue_rad_dark (DataArray)
blue_p_dark (DataArray)
sensor (str)
wavelength (str)
interp_method (str)
sza (float): The solar zenith angle (in degrees).
doy (int): The day of year.
h2o (float): The water vapor (g/m^2). [0,8.5].
o3 (float): The ozone (cm-atm). [0,8].
altitude (float)
max_aot (float)
"""
# Load the LUT
lut = self._load(sensor, wavelength, interp_method)
min_score = np.zeros(blue_rad_dark.shape, dtype='float64') + 1e9
aot = np.zeros(blue_rad_dark.shape, dtype='float64')
elliptical_orbit_correction = (
0.03275104 * np.cos(doy / 59.66638337) + 0.96804905
)
for aot_iter in np.arange(0.01, max_aot + 0.01, 0.01):
xa, xb, xc = lut(sza, h2o, o3, aot_iter, altitude)
xa *= elliptical_orbit_correction
xb *= elliptical_orbit_correction
xc *= elliptical_orbit_correction
res = self._rad_to_sr_from_coeffs(blue_rad_dark, xa, xb, xc)
score = np.abs(res - blue_p_dark)
aot = np.where(score < min_score, aot_iter, aot)
min_score = np.where(score < min_score, score, min_score)
return aot.clip(0, max_aot)