Source code for geowombat.core.conversion

import logging
import multiprocessing as multi
import os
import typing as T
from pathlib import Path

import dask.array as da
import geopandas as gpd
import numpy as np
import pandas as pd
import pyproj
import xarray as xr
from affine import Affine
from rasterio.coords import BoundingBox
from rasterio.crs import CRS
from rasterio.dtypes import get_minimum_dtype
from rasterio.features import rasterize, shapes
from rasterio.warp import aligned_target
from shapely.geometry import MultiPolygon, Polygon

from ..backends.rasterio_ import check_crs
from ..backends.xarray_ import _check_config_globals
from ..config import config
from ..handler import add_handler
from .util import lazy_wombat, sample_feature

logger = logging.getLogger(__name__)
logger = add_handler(logger)


def _iter_func(a):
    return a


[docs]class Converters(object):
[docs] def bounds_to_coords( self, bounds: T.Union[T.Sequence[float], BoundingBox], dst_crs: T.Union[str, object, xr.DataArray], ) -> T.Tuple[float, float, float, float]: """Converts bounds from longitude and latitude to native map coordinates. Args: bounds (``tuple`` | ``rasterio.coords.BoundingBox``): The lat/lon bounds to transform. dst_crs (str, object, or DataArray): The CRS to transform to. It can be provided as a string, a CRS instance (e.g., ``pyproj.crs.CRS``), or a ``geowombat.DataArray``. Returns: ``tuple``: (left, bottom, right, top) """ left, bottom, right, top = bounds left, bottom = self.lonlat_to_xy(left, bottom, dst_crs) right, top = self.lonlat_to_xy(left, top, dst_crs) return left, bottom, right, top
[docs] @staticmethod def lonlat_to_xy( lon: float, lat: float, dst_crs: T.Union[str, object, xr.DataArray] ) -> T.Tuple[float, float]: """Converts from longitude and latitude to native map coordinates. Args: lon (float): The longitude to convert. lat (float): The latitude to convert. dst_crs (str, object, or DataArray): The CRS to transform to. It can be provided as a string, a CRS instance (e.g., ``pyproj.crs.CRS``), or a ``geowombat.DataArray``. Returns: ``tuple``: (x, y) Example: >>> import geowombat as gw >>> from geowombat.core import lonlat_to_xy >>> >>> lon, lat = -55.56822206, -25.46214220 >>> >>> with gw.open('image.tif') as src: >>> x, y = lonlat_to_xy(lon, lat, src) """ if isinstance(dst_crs, xr.DataArray): dst_crs = dst_crs.crs return pyproj.Proj(dst_crs)(lon, lat)
[docs] @staticmethod def xy_to_lonlat( x: float, y: float, dst_crs: T.Union[str, object, xr.DataArray] ) -> T.Tuple[float, float]: """Converts from native map coordinates to longitude and latitude. Args: x (float): The x coordinate to convert. y (float): The y coordinate to convert. dst_crs (str, object, or DataArray): The CRS to transform to. It can be provided as a string, a CRS instance (e.g., ``pyproj.crs.CRS``), or a ``geowombat.DataArray``. Returns: ``tuple``: (longitude, latitude) Example: >>> import geowombat as gw >>> from geowombat.core import xy_to_lonlat >>> >>> x, y = 643944.6956113526, 7183104.984484519 >>> >>> with gw.open('image.tif') as src: >>> lon, lat = xy_to_lonlat(x, y, src) """ if isinstance(dst_crs, xr.DataArray): dst_crs = dst_crs.crs return pyproj.Proj(dst_crs)(x, y, inverse=True)
[docs] @staticmethod def indices_to_coords( col_index: int, row_index: int, transform: T.Union[Affine, xr.DataArray, T.Sequence[float]], ) -> T.Tuple[float, float]: """Converts array indices to map coordinates. Args: col_index (float or 1d array): The column index. row_index (float or 1d array): The row index. transform (Affine, DataArray, or tuple): The affine transform. Returns: ``tuple``: (x, y) Example: >>> import geowombat as gw >>> from geowombat.core import indices_to_coords >>> >>> with gw.open('image.tif') as src: >>> x, y = indices_to_coords(j, i, src) """ if not isinstance(transform, Affine): if isinstance(transform, tuple): transform = Affine(*transform) elif isinstance(transform, xr.DataArray): transform = transform.gw.affine else: logger.exception( ' The transform must be an instance of affine.Affine, an xarray.DataArray, or a tuple' ) raise TypeError return transform * (col_index, row_index)
[docs] @staticmethod def coords_to_indices( x: float, y: float, transform: T.Union[Affine, xr.DataArray, T.Sequence[float]], ) -> T.Tuple[int, int]: """Converts map coordinates to array indices. Args: x (float or 1d array): The x coordinates. y (float or 1d array): The y coordinates. transform (object): The affine transform. Returns: ``tuple``: (col_index, row_index) Example: >>> import geowombat as gw >>> from geowombat.core import coords_to_indices >>> >>> with gw.open('image.tif') as src: >>> j, i = coords_to_indices(x, y, src) """ if not isinstance(transform, Affine): if isinstance(transform, tuple): transform = Affine(*transform) elif isinstance(transform, xr.DataArray): transform = transform.gw.affine else: logger.exception( ' The transform must be an instance of affine.Affine, an xarray.DataArray, or a tuple' ) raise TypeError col_index, row_index = ~transform * (x, y) return np.int64(col_index), np.int64(row_index)
[docs] @staticmethod def dask_to_xarray( data: xr.DataArray, dask_data: da.Array, band_names: T.Sequence[T.Any] ) -> xr.DataArray: """Converts a Dask array to an Xarray DataArray. Args: data (DataArray): The DataArray with attribute information. dask_data (Dask Array): The Dask array to convert. band_names (1d array-like): The output band names. Returns: ``xarray.DataArray`` """ if len(dask_data.shape) == 2: dask_data = dask_data.reshape( 1, dask_data.shape[0], dask_data.shape[1] ) return xr.DataArray( dask_data, dims=('band', 'y', 'x'), coords={'band': band_names, 'y': data.y, 'x': data.x}, attrs=data.attrs, )
[docs] @staticmethod def ndarray_to_xarray( data: xr.DataArray, numpy_data: np.ndarray, band_names: T.Sequence[T.Any], row_chunks: T.Optional[int] = None, col_chunks: T.Optional[int] = None, y: T.Optional[float] = None, x: T.Optional[float] = None, attrs: T.Optional[dict] = None, ) -> xr.DataArray: """Converts a NumPy array to an Xarray DataArray. Args: data (DataArray): The DataArray with attribute information. numpy_data (ndarray): The ndarray to convert. band_names (1d array-like): The output band names. row_chunks (Optional[int]): Overrides ``data.gw.row_chunks``. col_chunks (Optional[int]): Overrides ``data.gw.col_chunks``. y (Optional[1d array]): Overrides ``data.y``. x (Optional[1d array]): Overrides ``data.x``. attrs (Optional[dict]): Overrides ``data.attrs``. Returns: ``xarray.DataArray`` """ if len(numpy_data.shape) == 2: numpy_data = numpy_data[np.newaxis, :, :] data_row_chunks = ( row_chunks if isinstance(row_chunks, int) else data.gw.row_chunks ) data_col_chunks = ( col_chunks if isinstance(col_chunks, int) else data.gw.col_chunks ) data_y = y if isinstance(y, np.ndarray) else data.y data_x = x if isinstance(x, np.ndarray) else data.x data_attrs = attrs if isinstance(attrs, dict) else data.attrs return xr.DataArray( da.from_array( numpy_data, chunks=(1, data_row_chunks, data_col_chunks) ), dims=('band', 'y', 'x'), coords={'band': band_names, 'y': data_y, 'x': data_x}, attrs=data_attrs, )
[docs] @staticmethod def xarray_to_xdataset( data_array: xr.DataArray, band_names: T.Sequence[T.Any], time_names: T.Sequence[T.Any], ycoords: T.Optional[T.Sequence[float]] = None, xcoords: T.Optional[T.Sequence[float]] = None, attrs: T.Optional[dict] = None, ) -> xr.Dataset: """Converts an Xarray DataArray to a Xarray Dataset. Args: data_array (DataArray) band_names (list) time_names (list) ycoords (1d array-like) xcoords (1d array-like) attrs (dict) Returns: ``xarray.Dataset`` """ if len(data_array.shape) == 2: data_array = data_array.expand_dims('band') if len(data_array.shape) == 4: n_bands = data_array.shape[1] else: n_bands = data_array.shape[0] if not band_names: if n_bands == 1: band_names = ['1'] else: band_names = list(map(str, range(1, n_bands + 1))) if time_names: return xr.Dataset( {'bands': (['date', 'band', 'y', 'x'], data_array)}, coords={ 'date': time_names, 'band': band_names, 'y': ('y', ycoords), 'x': ('x', xcoords), }, attrs=attrs, ) else: return xr.Dataset( {'bands': (['band', 'y', 'x'], data_array.data)}, coords={ 'band': band_names, 'y': ('y', data_array.y), 'x': ('x', data_array.x), }, attrs=data_array.attrs, )
[docs] def prepare_points( self, data, aoi, frac=1.0, min_frac_area=None, all_touched=False, id_column='id', mask=None, n_jobs=8, verbose=0, **kwargs ): if isinstance(aoi, gpd.GeoDataFrame): df = aoi else: if isinstance(aoi, str): if not os.path.isfile(aoi): logger.exception(' The AOI file does not exist.') raise OSError df = gpd.read_file(aoi) else: logger.exception( ' The AOI must be a vector file or a GeoDataFrame.' ) raise TypeError if id_column not in df.columns.tolist(): df.loc[:, id_column] = df.index.values df_crs = check_crs(df.crs).to_wkt() data_crs = check_crs(data.crs).to_wkt() # Re-project the data to match the image CRS if data_crs != df_crs: df = df.to_crs(data_crs) if verbose > 0: logger.info(' Checking geometry validity ...') # Ensure all geometry is valid df = df[df['geometry'].apply(lambda x_: x_ is not None)] if verbose > 0: logger.info(' Checking geometry extent ...') # Remove data outside of the image bounds if isinstance(df.iloc[0].geometry, (Polygon, MultiPolygon)): df = gpd.overlay( df, gpd.GeoDataFrame( data=[0], geometry=[data.gw.geometry], crs=df_crs ), how='intersection', ).drop(columns=[0]) else: # Clip points to the image bounds df = df[df.geometry.intersects(data.gw.geometry)] if isinstance(mask, (Polygon, MultiPolygon, gpd.GeoDataFrame)): if isinstance(mask, gpd.GeoDataFrame): if CRS.from_dict(mask.crs).to_wkt() != df_crs: mask = mask.to_crs(df_crs) if verbose > 0: logger.info(' Clipping geometry ...') df = df[df.within(mask)] if df.empty: logger.exception( ' No geometry intersects the user-provided mask.' ) raise LookupError if not df.empty: # Convert polygons to points if isinstance(df.iloc[0].geometry, (Polygon, MultiPolygon)): if verbose > 0: logger.info(' Converting polygons to points ...') df = self.polygons_to_points( data, df, frac=frac, min_frac_area=min_frac_area, all_touched=all_touched, id_column=id_column, n_jobs=n_jobs, **kwargs ) if not df.empty: # Ensure a unique index df.index = list(range(0, df.shape[0])) return df
[docs] @staticmethod def polygons_to_points( data: xr.DataArray, df: gpd.GeoDataFrame, frac: T.Optional[float] = 1.0, min_frac_area: T.Optional[T.Union[float, int]] = None, all_touched: T.Optional[bool] = False, id_column: T.Optional[str] = 'id', n_jobs: T.Optional[int] = 1, **kwargs ): """Converts polygons to points. Args: data (DataArray or Dataset): The ``xarray.DataArray`` or ``xarray.Dataset``. df (GeoDataFrame): The ``geopandas.GeoDataFrame`` containing the geometry to rasterize. frac (Optional[float]): A fractional subset of points to extract in each feature. min_frac_area (Optional[int | float]): A minimum polygon area to use ``frac``. Otherwise, use all samples within a polygon. all_touched (Optional[bool]): The ``all_touched`` argument is passed to ``rasterio.features.rasterize``. id_column (Optional[str]): The 'id' column. n_jobs (Optional[int]): The number of features to rasterize in parallel. kwargs (Optional[dict]): Keyword arguments passed to ``multiprocessing.Pool().imap``. Returns: ``geopandas.GeoDataFrame`` """ meta = data.gw.meta dataframes = [] df_columns = df.columns.tolist() with multi.Pool(processes=n_jobs) as pool: for i in pool.imap(_iter_func, range(0, len(df.index)), **kwargs): point_df = sample_feature( df.iloc[i], id_column, df_columns, data.crs, data.res, all_touched, meta, frac, min_frac_area, ) if not point_df.empty: dataframes.append(point_df) if dataframes: dataframes = pd.concat(dataframes, axis=0) # Make the points unique dataframes = dataframes.assign( point=np.arange(0, dataframes.shape[0]) ) return dataframes else: return gpd.GeoDataFrame(data=[])
[docs] @staticmethod def array_to_polygon( data: xr.DataArray, mask: T.Union[str, np.ndarray, object] = None, connectivity: T.Optional[int] = 4, num_workers: T.Optional[int] = 1, ) -> gpd.GeoDataFrame: """Converts an ``xarray.DataArray` to a ``geopandas.GeoDataFrame`` Args: data (DataArray): The ``xarray.DataArray`` to convert. mask (Optional[str, numpy ndarray, or rasterio Band object]): Must evaluate to bool (rasterio.bool_ or rasterio.uint8). Values of False or 0 will be excluded from feature generation. Note well that this is the inverse sense from Numpy's, where a mask value of True indicates invalid data in an array. If source is a Numpy masked array and mask is None, the source's mask will be inverted and used in place of mask. If ``mask`` is equal to 'source', then ``data`` is used as the mask. connectivity (Optional[int]): Use 4 or 8 pixel connectivity for grouping pixels into features. num_workers (Optional[int]): The number of parallel workers to send to :func:`dask.compute`. Returns: ``geopandas.GeoDataFrame`` Example: >>> import geowombat as gw >>> >>> with gw.open('image.tif') as src: >>> >>> # Convert the input image to a GeoDataFrame >>> df = gw.array_to_polygon( >>> src, >>> mask='source', >>> num_workers=8 >>> ) """ if not hasattr(data.gw, 'transform'): logger.exception(" The data should have a 'transform' object.") raise AttributeError if not hasattr(data, 'crs'): logger.exception(" The data should have a 'crs' object.") raise AttributeError if isinstance(mask, str): if mask == 'source': mask = data.astype('uint8').data.compute( num_workers=num_workers ) poly_objects = shapes( data.data.compute(num_workers=num_workers), mask=mask, connectivity=connectivity, transform=data.gw.transform, ) poly_data = [ (Polygon(p[0]['coordinates'][0]), p[1]) for p in poly_objects ] if poly_data: poly_geom = list(list(zip(*poly_data))[0]) poly_values = list(list(zip(*poly_data))[1]) out_df = gpd.GeoDataFrame( data=poly_values, columns=['value'], geometry=poly_geom, crs=data.crs, ) else: out_df = gpd.GeoDataFrame([], crs=data.crs) return out_df
[docs] @lazy_wombat def polygon_to_array( self, polygon: T.Union[gpd.GeoDataFrame, str, Path], col: T.Optional[str] = None, data: T.Optional[xr.DataArray] = None, cellx: T.Optional[float] = None, celly: T.Optional[float] = None, band_name: T.Optional[T.Sequence[T.Any]] = None, row_chunks: T.Optional[int] = 512, col_chunks: T.Optional[int] = 512, src_res: T.Optional[T.Sequence[float]] = None, fill: T.Optional[int] = 0, default_value: T.Optional[int] = 1, all_touched: T.Optional[bool] = True, dtype: T.Optional[str] = 'uint8', sindex: T.Optional[object] = None, tap: T.Optional[bool] = False, bounds_by: T.Optional[str] = 'intersection', ) -> xr.DataArray: """Converts a polygon geometry to an ``xarray.DataArray``. Args: polygon (GeoDataFrame | str): The ``geopandas.DataFrame`` or file with polygon geometry. col (Optional[str]): The column in ``polygon`` you want to assign values from. If not set, creates a binary raster. data (Optional[DataArray]): An ``xarray.DataArray`` to use as a reference for rasterizing. cellx (Optional[float]): The output cell x size. celly (Optional[float]): The output cell y size. band_name (Optional[list]): The ``xarray.DataArray`` band name. row_chunks (Optional[int]): The ``dask`` row chunk size. col_chunks (Optional[int]): The ``dask`` column chunk size. src_res (Optional[tuple]: A source resolution to align to. fill (Optional[int]): Used as fill value for all areas not covered by input geometries to ``rasterio.features.rasterize``. default_value (Optional[int]): Used as value for all geometries, if not provided in shapes to ``rasterio.features.rasterize``. all_touched (Optional[bool]): If True, all pixels touched by geometries will be burned in. If false, only pixels whose center is within the polygon or that are selected by Bresenham’s line algorithm will be burned in. The ``all_touched`` value for :func:`rasterio.features.rasterize`. dtype (Optional[str | numpy data type]): The output data type for :func:`rasterio.features.rasterize`. sindex (Optional[object]): An instanced of ``geopandas.GeoDataFrame.sindex``. tap (Optional[bool]): Whether to target align pixels. bounds_by (Optional[str]): How to concatenate the output extent. Choices are ['intersection', 'union', 'reference']. * reference: Use the bounds of the reference image * intersection: Use the intersection (i.e., minimum extent) of all the image bounds * union: Use the union (i.e., maximum extent) of all the image bounds Returns: ``xarray.DataArray`` Example: >>> import geowombat as gw >>> import geopandas as gpd >>> >>> df = gpd.read_file('polygons.gpkg') >>> >>> # 100x100 cell size >>> data = gw.polygon_to_array(df, 100.0, 100.0) >>> >>> # Align to an existing image >>> with gw.open('image.tif') as src: >>> data = gw.polygon_to_array(df, data=src) """ if not band_name: band_name = [1] if isinstance(polygon, gpd.GeoDataFrame): dataframe = polygon else: if os.path.isfile(polygon): dataframe = gpd.read_file(polygon) else: logger.exception(' The polygon file does not exist.') raise OSError ref_kwargs = { 'bounds': None, 'crs': None, 'res': None, 'tap': tap, 'tac': None, } if config['with_config'] and not isinstance(data, xr.DataArray): ref_kwargs = _check_config_globals( data.filename if isinstance(data, xr.DataArray) else None, bounds_by, ref_kwargs, ) if isinstance(data, xr.DataArray): if dataframe.crs != data.crs: # Transform the geometry dataframe = dataframe.to_crs(data.crs) if not sindex: # Get the R-tree spatial index sindex = dataframe.sindex # Get intersecting features int_idx = sorted( list( sindex.intersection( tuple(data.gw.geodataframe.total_bounds.flatten()) ) ) ) if not int_idx: return self.dask_to_xarray( data, da.zeros( (1, data.gw.nrows, data.gw.ncols), chunks=(1, data.gw.row_chunks, data.gw.col_chunks), dtype=data.dtype.name, ), band_names=band_name, ) # Subset to the intersecting features dataframe = dataframe.iloc[int_idx] # Clip the geometry dataframe = gpd.clip(dataframe, data.gw.geodataframe) if dataframe.empty: return self.dask_to_xarray( data, da.zeros( (1, data.gw.nrows, data.gw.ncols), chunks=(1, data.gw.row_chunks, data.gw.col_chunks), dtype=data.dtype.name, ), band_names=band_name, ) cellx = data.gw.cellx celly = data.gw.celly row_chunks = data.gw.row_chunks col_chunks = data.gw.col_chunks src_res = None if ref_kwargs['bounds']: left, bottom, right, top = ref_kwargs['bounds'] if 'res' in ref_kwargs and ref_kwargs['res'] is not None: if isinstance(ref_kwargs['res'], tuple) or isinstance( ref_kwargs['res'], list ): cellx, celly = ref_kwargs['res'] elif isinstance(ref_kwargs['res'], int) or isinstance( ref_kwargs['res'], float ): cellx = ref_kwargs['res'] celly = ref_kwargs['res'] else: logger.exception( 'The reference resolution must be a tuple, int, or float. Is type %s' % (type(ref_kwargs['res'])) ) raise TypeError else: left, bottom, right, top = data.gw.bounds else: if ref_kwargs['bounds']: left, bottom, right, top = ref_kwargs['bounds'] if 'res' in ref_kwargs and ref_kwargs['res'] is not None: if isinstance(ref_kwargs['res'], tuple) or isinstance( ref_kwargs['res'], list ): cellx, celly = ref_kwargs['res'] elif isinstance(ref_kwargs['res'], int) or isinstance( ref_kwargs['res'], float ): cellx = ref_kwargs['res'] celly = ref_kwargs['res'] else: logger.exception( 'The reference resolution must be a tuple, int, or float. Is type %s' % (type(ref_kwargs['res'])) ) raise TypeError else: ( left, bottom, right, top, ) = dataframe.total_bounds.flatten().tolist() dst_height = int((top - bottom) / abs(celly)) dst_width = int((right - left) / abs(cellx)) dst_transform = Affine(cellx, 0.0, left, 0.0, -celly, top) if src_res: dst_transform = aligned_target( dst_transform, dst_width, dst_height, src_res )[0] left = dst_transform[2] top = dst_transform[5] dst_transform = Affine(cellx, 0.0, left, 0.0, -celly, top) if col: shapes = ( (geom, value) for geom, value in zip(dataframe.geometry, dataframe[col]) ) # TODO: throw error if dataframe[col] is character dtype = get_minimum_dtype(dataframe[col]) else: shapes = dataframe.geometry.values varray = rasterize( shapes, out_shape=(dst_height, dst_width), transform=dst_transform, fill=fill, default_value=default_value, all_touched=all_touched, dtype=dtype, ) cellxh = abs(cellx) / 2.0 cellyh = abs(celly) / 2.0 if isinstance(data, xr.DataArray): # Ensure the coordinates align xcoords = data.x.values ycoords = data.y.values else: xcoords = np.arange( left + cellxh, left + cellxh + dst_width * abs(cellx), cellx ) ycoords = np.arange( top - cellyh, top - cellyh - dst_height * abs(celly), -celly ) if xcoords.shape[0] > dst_width: xcoords = xcoords[:dst_width] if ycoords.shape[0] > dst_height: ycoords = ycoords[:dst_height] attrs = { 'transform': dst_transform[:6], 'crs': dataframe.crs, 'res': (cellx, celly), 'is_tiled': 1, } return xr.DataArray( data=da.from_array( varray[np.newaxis, :, :], chunks=(1, row_chunks, col_chunks) ), coords={'band': band_name, 'y': ycoords, 'x': xcoords}, dims=('band', 'y', 'x'), attrs=attrs, )