import logging
import dask
import dask.array as da
import numpy as np
import xarray as xr
from osgeo import gdal, gdal_array
from sklearn.linear_model import LinearRegression, TheilSenRegressor
from ..handler import add_handler
try:
import cv2
OPENCV_INSTALLED = True
except ImportError:
OPENCV_INSTALLED = False
logger = logging.getLogger(__name__)
logger = add_handler(logger)
def _resize_elev(elev, proc_dims):
if OPENCV_INSTALLED:
elev = cv2.resize(
elev.astype('float32'), proc_dims, interpolation=cv2.INTER_LINEAR
)
else:
logger.warning(
' OpenCV is not installed, so using scipy.ndimage.zoom to resize the data.'
)
from scipy.ndimage import zoom
elev = zoom(elev.astype('float32'), proc_dims, order=2)
return elev
[docs]def calc_slope(elev, proc_dims=None, w=None, **kwargs):
"""Calculates slope from elevation.
Args:
elev (2d array): The elevation data.
proc_dims (Optional[tuple]): Dimensions to resize to.
w (Optional[int]): The smoothing window size when ``proc_dims`` is given.
kwargs (Optional[dict]): Keyword arguments passed to ``gdal.DEMProcessingOptions``.
Returns:
``numpy.ndarray``
"""
inrows, incols = elev.shape
if proc_dims:
elev = _resize_elev(elev, proc_dims)
ds = gdal_array.OpenArray(elev.astype('float64'))
slope_options = gdal.DEMProcessingOptions(**kwargs)
out_ds = gdal.DEMProcessing('', ds, 'slope', options=slope_options)
dst_array = out_ds.GetRasterBand(1).ReadAsArray()
ds = None
out_ds = None
if proc_dims:
dst_array = cv2.resize(
dst_array.astype('float32'),
(incols, inrows),
interpolation=cv2.INTER_LINEAR,
)
if not w:
w = 15
return np.float64(
cv2.bilateralFilter(np.float32(dst_array), w, 10, 10)
)
else:
return np.float64(dst_array)
[docs]def calc_aspect(elev, proc_dims=None, w=None, **kwargs):
"""Calculates aspect from elevation.
Args:
elev (2d array): The elevation data.
proc_dims (Optional[tuple]): Dimensions to resize to.
w (Optional[int]): The smoothing window size when ``proc_dims`` is given.
kwargs (Optional[dict]): Keyword arguments passed to ``gdal.DEMProcessingOptions``.
Returns:
``numpy.ndarray``
"""
inrows, incols = elev.shape
if proc_dims:
if proc_dims:
elev = _resize_elev(elev, proc_dims)
ds = gdal_array.OpenArray(elev.astype('float64'))
aspect_options = gdal.DEMProcessingOptions(**kwargs)
out_ds = gdal.DEMProcessing('', ds, 'aspect', options=aspect_options)
dst_array = out_ds.GetRasterBand(1).ReadAsArray()
ds = None
out_ds = None
if proc_dims:
dst_array = cv2.resize(
dst_array.astype('float32'),
(incols, inrows),
interpolation=cv2.INTER_LINEAR,
)
if not w:
w = 15
return np.float64(
cv2.bilateralFilter(np.float32(dst_array), w, 10, 10)
)
else:
return np.float64(dst_array)
calc_slope_delayed = dask.delayed(calc_slope)
calc_aspect_delayed = dask.delayed(calc_aspect)
[docs]class Topo(object):
"""A class for topographic normalization."""
@staticmethod
def _regress_a(X, y, robust, n_jobs):
"""Calculates the slope and intercept."""
if robust:
model = TheilSenRegressor(n_jobs=n_jobs)
else:
model = LinearRegression(n_jobs=n_jobs)
model.fit(X, y)
slope_m = model.coef_[0]
intercept_b = model.intercept_
return slope_m, intercept_b
def _method_empirical_rotation(
self,
sr,
il,
cos_z,
nodata_samps,
min_samples,
n_jobs,
robust,
band_coeffs,
band,
):
r"""
Normalizes terrain using the Empirical Rotation method
Args:
sr (Dask Array): The surface reflectance data.
il (Dask Array): The solar illumination.
cos_z (Dask Array): The cosine of the solar zenith angle.
nodata_samps (Dask Array): Samples where 1='no data' and 0='valid data'.
min_samples (Optional[int]): The minimum number of samples required to fit a regression.
n_jobs (Optional[int]): The number of parallel workers for ``LinearRegression.fit`` or
``TheilSenRegressor.fit``.
robust (Optional[bool]): Whether to fit a robust regression.
band_coeffs (dict): Slope and intercept coefficients for each band.
band (int | str): The band.
References:
See :cite:`tan_etal_2010` for the Empirical Rotation method.
Returns:
``dask.array``
"""
nodata = nodata_samps.compute().flatten()
idx = np.where(nodata == 0)[0]
if idx.shape[0] < min_samples:
return sr
X = il.compute().flatten()[idx][:, np.newaxis]
if band_coeffs:
slope_m, intercept_b = band_coeffs[band]
else:
y = sr.compute().flatten()[idx]
slope_m, intercept_b = self._regress_a(X, y, robust, n_jobs)
# https://reader.elsevier.com/reader/sd/pii/S0034425713001673?token=6C93FB2E69ABF5729CE9ECBBDFD9C2D985613156753C822A3D160102D46135E01457EE33500DB4648C6AF636F39D8B62
# Improved forest change detection with terrain illumination corrected Landsat images
sr_a = sr - slope_m * (il - cos_z)
# sr_a = sr - (slope_m * il + intercept_b)
return da.where(nodata_samps == 1, sr, sr_a).clip(0, 1)
def _method_cos(self, sr, il, cos_z, nodata_samps):
r"""
Normalizes terrain using the Cosine method
Args:
sr (Dask Array): The surface reflectance data.
il (Dask Array): The solar illumination.
cos_z (Dask Array): The cosine of the solar zenith angle.
nodata_samps (Dask Array): Samples where 1='no data' and 0='valid data'.
References:
See :cite:`teillet_etal_1982` for the C-correction method.
Returns:
``dask.array``
"""
sr_a = sr * (cos_z / il)
return da.where(nodata_samps == 1, sr, sr_a).clip(0, 1)
def _method_c(
self,
sr,
il,
cos_z,
nodata_samps,
min_samples,
n_jobs,
robust,
band_coeffs,
band,
):
r"""
Normalizes terrain using the C-correction method
Args:
sr (Dask Array): The surface reflectance data.
il (Dask Array): The solar illumination.
cos_z (Dask Array): The cosine of the solar zenith angle.
nodata_samps (Dask Array): Samples where 1='no data' and 0='valid data'.
min_samples (Optional[int]): The minimum number of samples required to fit a regression.
n_jobs (Optional[int]): The number of parallel workers for ``LinearRegression.fit`` or
``TheilSenRegressor.fit``.
robust (Optional[bool]): Whether to fit a robust regression.
band_coeffs (dict): Slope and intercept coefficients for each band.
band (int | str): The band.
References:
See :cite:`teillet_etal_1982` for the C-correction method.
Returns:
``dask.array``
"""
nodata = nodata_samps.compute().flatten()
idx = np.where(nodata == 0)[0]
if idx.shape[0] < min_samples:
return sr
X = il.compute().flatten()[idx][:, np.newaxis]
if band_coeffs:
slope_m, intercept_b = band_coeffs[band]
else:
y = sr.compute().flatten()[idx]
slope_m, intercept_b = self._regress_a(X, y, robust, n_jobs)
c = intercept_b / slope_m
# Get the A-factor
a_factor = (cos_z + c) / (il + c)
a_factor = da.where(da.isnan(a_factor), 1, a_factor)
sr_a = sr * a_factor
return da.where((sr_a > 1) | (nodata_samps == 1), sr, sr_a).clip(0, 1)
[docs] def norm_topo(
self,
data,
elev,
solar_za,
solar_az,
slope=None,
aspect=None,
method='empirical-rotation',
slope_thresh=2,
nodata=0,
elev_nodata=-32768,
scale_factor=1,
angle_scale=0.01,
n_jobs=1,
robust=False,
min_samples=100,
slope_kwargs=None,
aspect_kwargs=None,
band_coeffs=None,
):
"""Applies topographic normalization.
Args:
data (2d or 3d DataArray): The data to normalize, in the range 0-1.
elev (2d DataArray): The elevation data.
solar_za (2d DataArray): The solar zenith angles (degrees).
solar_az (2d DataArray): The solar azimuth angles (degrees).
slope (2d DataArray): The slope data. If not given, slope is calculated from ``elev``.
aspect (2d DataArray): The aspect data. If not given, aspect is calculated from ``elev``.
method (Optional[str]): The method to apply. Choices are ['c', 'empirical-rotation'].
slope_thresh (Optional[float or int]): The slope threshold. Any samples with
values < ``slope_thresh`` are not adjusted.
nodata (Optional[int or float]): The 'no data' value for ``data``.
elev_nodata (Optional[float or int]): The 'no data' value for ``elev``.
scale_factor (Optional[float]): A scale factor to apply to the input data.
angle_scale (Optional[float]): The angle scale factor.
n_jobs (Optional[int]): The number of parallel workers for ``LinearRegression.fit``.
robust (Optional[bool]): Whether to fit a robust regression.
min_samples (Optional[int]): The minimum number of samples required to fit a regression.
slope_kwargs (Optional[dict]): Keyword arguments passed to ``gdal.DEMProcessingOptions``
to calculate the slope.
aspect_kwargs (Optional[dict]): Keyword arguments passed to ``gdal.DEMProcessingOptions``
to calculate the aspect.
band_coeffs (Optional[dict]): Slope and intercept coefficients for each band.
References:
See :cite:`teillet_etal_1982` for the C-correction method.
See :cite:`tan_etal_2010` for the Empirical Rotation method.
Returns:
``xarray.DataArray``
Examples:
>>> import geowombat as gw
>>> from geowombat.radiometry import Topo
>>>
>>> topo = Topo()
>>>
>>> # Example where pixel angles are stored in separate GeoTiff files
>>> with gw.config.update(sensor='l7', scale_factor=0.0001, nodata=0):
>>>
>>> with gw.open('landsat.tif') as src,
>>> gw.open('srtm') as elev,
>>> gw.open('solarz.tif') as solarz,
>>> gw.open('solara.tif') as solara:
>>>
>>> src_norm = topo.norm_topo(src, elev, solarz, solara, n_jobs=-1)
"""
method = method.strip().lower()
if method not in ['c', 'empirical-rotation']:
logger.exception(
" Currently, the only supported methods are 'c' and 'empirical-rotation'."
)
raise NameError
attrs = data.attrs.copy()
if not nodata:
nodata = data.gw.nodata
if scale_factor == 1.0:
scale_factor = data.gw.scale_factor
# Scale the reflectance data
if scale_factor != 1:
data = data * scale_factor
if not slope_kwargs:
slope_kwargs = dict(
format='MEM',
computeEdges=True,
alg='ZevenbergenThorne',
slopeFormat='degree',
)
if not aspect_kwargs:
aspect_kwargs = dict(
format='MEM',
computeEdges=True,
alg='ZevenbergenThorne',
trigonometric=False,
zeroForFlat=True,
)
slope_kwargs['format'] = 'MEM'
slope_kwargs['slopeFormat'] = 'degree'
aspect_kwargs['format'] = 'MEM'
# Force to SRTM resolution
proc_dims = (
int((data.gw.ncols * data.gw.cellx) / 30.0),
int((data.gw.nrows * data.gw.celly) / 30.0),
)
w = int((5 * 30.0) / data.gw.celly)
if w % 2 == 0:
w += 1
if isinstance(slope, xr.DataArray):
slope_deg_fd = slope.squeeze().data
else:
slope_deg = calc_slope_delayed(
elev.squeeze().data, proc_dims=proc_dims, w=w, **slope_kwargs
)
slope_deg_fd = da.from_delayed(
slope_deg, (data.gw.nrows, data.gw.ncols), dtype='float64'
)
if isinstance(aspect, xr.DataArray):
aspect_deg_fd = aspect.squeeze().data
else:
aspect_deg = calc_aspect_delayed(
elev.squeeze().data, proc_dims=proc_dims, w=w, **aspect_kwargs
)
aspect_deg_fd = da.from_delayed(
aspect_deg, (data.gw.nrows, data.gw.ncols), dtype='float64'
)
nodata_samps = da.where(
(elev.data == elev_nodata)
| (data.max(dim='band').data == nodata)
| (slope_deg_fd < slope_thresh),
1,
0,
)
slope_rad = da.deg2rad(slope_deg_fd)
aspect_rad = da.deg2rad(aspect_deg_fd)
# Convert degrees to radians
solar_za = da.deg2rad(solar_za.squeeze().data * angle_scale)
solar_az = da.deg2rad(solar_az.squeeze().data * angle_scale)
cos_z = da.cos(solar_za)
# Calculate the illumination angle
il = da.cos(slope_rad) * cos_z + da.sin(slope_rad) * da.sin(
solar_za
) * da.cos(solar_az - aspect_rad)
sr_adj = list()
for band in data.band.values.tolist():
if method == 'c':
sr_adj.append(
self._method_c(
data.sel(band=band).data,
il,
cos_z,
nodata_samps,
min_samples,
n_jobs,
robust,
band_coeffs,
band,
)
)
else:
sr_adj.append(
self._method_empirical_rotation(
data.sel(band=band).data,
il,
cos_z,
nodata_samps,
min_samples,
n_jobs,
robust,
band_coeffs,
band,
)
)
adj_data = xr.DataArray(
data=da.concatenate(sr_adj).reshape(
(data.gw.nbands, data.gw.nrows, data.gw.ncols)
),
coords={
'band': data.band.values.tolist(),
'y': data.y.values,
'x': data.x.values,
},
dims=('band', 'y', 'x'),
attrs=data.attrs,
)
attrs['calibration'] = 'Topographic-adjusted'
attrs['nodata'] = nodata
attrs['drange'] = (0, 1)
adj_data.attrs = attrs
return adj_data