Source code for geowombat.util.web

import concurrent.futures
import fnmatch
import logging
import os
import shutil
import subprocess
import tarfile
import time
from collections import namedtuple
from datetime import datetime
from pathlib import Path

import geopandas as gpd
import numpy as np
import pandas as pd
import psutil
import xarray as xr
from osgeo import gdal
from shapely.geometry import Polygon

import geowombat as gw

from ..backends.gdal_ import warp
from ..core import ndarray_to_xarray
from ..core.properties import get_sensor_info
from ..handler import add_handler
from ..radiometry import (
    BRDF,
    DOS,
    LinearAdjustments,
    QAMasker,
    RadTransforms,
    SixS,
    landsat_pixel_angles,
    sentinel_pixel_angles,
)
from ..radiometry.angles import estimate_cloud_shadows

try:
    import requests

    REQUESTS_INSTALLED = True
except ImportError:
    REQUESTS_INSTALLED = False

try:
    from s2cloudless import S2PixelCloudDetector

    S2CLOUDLESS_INSTALLED = True
except ImportError:
    S2CLOUDLESS_INSTALLED = False

logger = logging.getLogger(__name__)
logger = add_handler(logger)


RESAMPLING_DICT = dict(
    bilinear=gdal.GRA_Bilinear,
    cubic=gdal.GRA_Cubic,
    nearest=gdal.GRA_NearestNeighbour,
)

OrbitDates = namedtuple('OrbitDates', 'start end')
FileInfo = namedtuple('FileInfo', 'name key')
GoogleFileInfo = namedtuple('GoogleFileInfo', 'url url_file meta angles')


def _rmdir(pathdir):

    """Removes a directory path."""

    if pathdir.is_dir():

        for child in pathdir.iterdir():

            if child.is_file():

                try:
                    child.unlink()
                except Exception:
                    pass

        try:
            pathdir.rmdir()
        except Exception:

            try:
                shutil.rmtree(str(pathdir))
            except Exception:
                pass


def _delayed_read(fn):

    attempt = 0
    max_attempts = 10

    while True:

        if Path(fn).is_file():
            break
        else:
            time.sleep(2)

        attempt += 1

        if attempt >= max_attempts:
            break

    with open(str(fn), mode='r') as tx:
        lines = tx.readlines()

    return lines


def _update_status_file(fn, log_name):

    attempt = 0
    max_attempts = 10

    while True:

        wait_on_file = False

        # Check if the file is open by another process
        for proc in psutil.process_iter():

            try:

                for item in proc.open_files():

                    if item.path == str(fn):
                        wait_on_file = True
                        break

            except Exception:
                pass

            if wait_on_file:
                break

        if wait_on_file:
            time.sleep(2)
        else:
            break

        attempt += 1

        if attempt >= max_attempts:
            break

    with open(str(fn), mode='r') as tx:

        lines = tx.readlines()

        if lines:
            lines = list(set(lines))

        if log_name + '\n' not in lines:
            lines.append(log_name + '\n')

    fn.unlink()

    with open(str(fn), mode='w') as tx:
        tx.writelines(lines)


def _clean_and_update(
    outdir_angles,
    finfo_dict,
    meta_name,
    check_angles=True,
    check_downloads=True,
    load_bands_names=None,
):

    if check_angles:
        _rmdir(outdir_angles)

    if check_downloads:

        for k, v in finfo_dict.items():

            if Path(v.name).is_file():

                try:
                    Path(v.name).unlink()
                except Warning:
                    logger.warning('  Could not delete {}.'.format(v.name))

            else:
                logger.warning(
                    '  The {} file does not exist to delete.'.format(v.name)
                )

    # if update_status:
    #     _update_status_file(status, meta_name)

    if load_bands_names:

        for loaded_band in load_bands_names:

            if Path(loaded_band).is_file():

                try:
                    Path(loaded_band).unlink()
                except Warning:
                    logger.warning(
                        '  Could not delete {}.'.format(loaded_band)
                    )


def _assign_attrs(data, attrs, bands_out):

    if bands_out:
        data = data.sel(band=bands_out)

    data = data.transpose('band', 'y', 'x')
    data.attrs = attrs

    return data


def _parse_google_filename(
    filename, landsat_parts, sentinel_parts, public_url
):

    file_info = GoogleFileInfo(url=None, url_file=None, meta=None, angles=None)

    f_base, f_ext = os.path.splitext(filename)

    fn_parts = f_base.split('_')

    if fn_parts[0].lower() in landsat_parts:

        # Collection 1
        url_ = '{PUBLIC}-landsat/{SENSOR}/01/{PATH}/{ROW}/{FDIR}'.format(
            PUBLIC=public_url,
            SENSOR=fn_parts[0],
            PATH=fn_parts[2][:3],
            ROW=fn_parts[2][3:],
            FDIR='_'.join(fn_parts[:-1]),
        )

        url_filename = '{URL}/{FN}'.format(URL=url_, FN=filename)
        url_meta = '{URL}/{FN}_MTL.txt'.format(
            URL=url_, FN='_'.join(fn_parts[:-1])
        )
        url_angles = '{URL}/{FN}_ANG.txt'.format(
            URL=url_, FN='_'.join(fn_parts[:-1])
        )

        file_info = GoogleFileInfo(
            url=url_, url_file=url_filename, meta=url_meta, angles=url_angles
        )

    return file_info


def _download_workers(
    gcp_str, poutdir, outdir, fname, fn, null_items, verbose
):

    # Renaming Sentinel data
    rename = False

    # Full path of GCP local download
    down_file = str(poutdir.joinpath(fname))

    if down_file.endswith('_ANG.txt'):
        fbase = fname.replace('_ANG.txt', '')
        key = 'angle'
    elif down_file.endswith('_MTL.txt'):
        fbase = fname.replace('_MTL.txt', '')
        key = 'meta'
    elif down_file.endswith('MTD_TL.xml'):

        fbase = Path(fn).parent.name
        down_file = str(poutdir.joinpath(fbase + '_MTD_TL.xml'))
        key = 'meta'
        rename = True

    elif down_file.endswith('_BQA.TIF'):
        fbase = fname.replace('_BQA.TIF', '')
        key = 'qa'
    else:

        if fname.endswith('.jp2'):

            fbase = Path(fn).parent.parent.name
            key = Path(fn).name.split('.')[0].split('_')[-1]
            down_file = str(poutdir.joinpath(fbase + '_' + key + '.jp2'))
            rename = True

        else:

            fsplit = fname.split('_')
            fbase = '_'.join(fsplit[:-1])
            key = fsplit[-1].split('.')[0]

        # TODO: QA60

    continue_download = True

    if fbase in null_items:
        continue_download = False

    if continue_download:

        ###################
        # Download the file
        ###################

        if not Path(down_file).is_file():

            if fn.lower().startswith('gs://gcp-public-data'):
                com = 'gsutil cp -r {} {}'.format(fn, outdir)
            else:
                com = 'gsutil cp -r {}/{} {}'.format(gcp_str, fn, outdir)

            if verbose > 0:
                logger.info('  Downloading {} ...'.format(fname))

            subprocess.call(com, shell=True)

            if rename:
                os.rename(str(Path(outdir).joinpath(Path(fn).name)), down_file)

        # Store file information
        return key, FileInfo(name=down_file, key=key)

    else:
        return None, None


[docs]class DownloadMixin(object):
[docs] def download_gcp( self, sensor, downloads=None, outdir='.', outdir_brdf=None, search_wildcards=None, search_dict=None, n_jobs=1, verbose=0, ): """Downloads a file from Google Cloud platform. Args: sensor (str): The sensor to query. Choices are ['l5', 'l7', 'l8', 's2a', 's2c']. downloads (Optional[str or list]): The file or list of keys to download. If not given, keys will be taken from ``search_dict`` or ``self.search_dict``. outdir (Optional[str | Path]): The output directory. outdir_brdf (Optional[Path]): The output directory. search_wildcards (Optional[list]): A list of search wildcards. search_dict (Optional[dict]): A keyword search dictionary to override ``self.search_dict``. n_jobs (Optional[int]): The number of files to download in parallel. verbose (Optional[int]): The verbosity level. Returns: ``dict`` of ``dicts`` where sub-dictionaries contain a ``namedtuple`` of the downloaded file and tag """ if not search_dict: if not self.search_dict: logger.exception( ' A keyword search dictionary must be provided, either from `self.list_gcp` or the `search_dict` argument.' ) else: search_dict = self.search_dict poutdir = Path(outdir) if outdir != '.': poutdir.mkdir(parents=True, exist_ok=True) if not downloads: downloads = list(search_dict.keys()) if not isinstance(downloads, list): downloads = [downloads] if sensor in ['s2', 's2a', 's2b', 's2c']: gcp_str = 'gsutil cp -r gs://gcp-public-data-sentinel-2' else: gcp_str = 'gsutil cp -r gs://gcp-public-data-landsat' downloaded = {} null_items = [] for search_key in downloads: download_list = self.search_dict[search_key] if search_wildcards: download_list_ = [] for swild in search_wildcards: download_list_ += fnmatch.filter( download_list, '*{}'.format(swild) ) download_list = download_list_ download_list_names = [Path(dfn).name for dfn in download_list] logger.info( ' The download contains {:d} items: {}'.format( len(download_list_names), ','.join(download_list_names) ) ) # Separate each scene if sensor.lower() in ['l5', 'l7', 'l8']: # list of file ids id_list = [ '_'.join(fn.split('_')[:-1]) for fn in download_list_names if fn.endswith('_MTL.txt') ] # list of lists where each sub-list is unique download_list_unique = [ [fn for fn in download_list if sid in Path(fn).name] for sid in id_list ] else: id_list = list( set( [ '_'.join(fn.split('_')[:-1]) for fn in download_list_names ] ) ) download_list_unique = [download_list] for scene_id, sub_download_list in zip( id_list, download_list_unique ): logger.info(' Checking scene {} ...'.format(scene_id)) downloaded_sub = {} # Check if the file has been downloaded if sensor.lower() in ['l5', 'l7', 'l8']: if not scene_id.lower().startswith( self.sensor_collections[sensor.lower()] ): logger.exception( ' The scene id {SCENE_ID} does not match the sensor {SENSOR}.'.format( SCENE_ID=scene_id, SENSOR=sensor ) ) raise NameError # Path of BRDF stack out_brdf = outdir_brdf.joinpath(scene_id + '.tif') else: fn = sub_download_list[0] fname = Path(fn).name if fname.lower().endswith('.jp2'): fbase = Path(fn).parent.parent.name key = Path(fn).name.split('.')[0].split('_')[-1] down_file = str( poutdir.joinpath(fbase + '_' + key + '.jp2') ) brdfp = '_'.join(Path(down_file).name.split('_')[:-1]) out_brdf = outdir_brdf.joinpath(brdfp + '_MTD.tif') else: out_brdf = None if out_brdf: if ( out_brdf.is_file() or Path(str(out_brdf).replace('.tif', '.nc')).is_file() or Path( str(out_brdf).replace('.tif', '.nodata') ).is_file() ): logger.warning( f' The output BRDF file, {str(out_brdf)}, already exists.' ) _clean_and_update( None, None, None, check_angles=False, check_downloads=False, ) continue else: logger.warning( f' Continuing with the download for {str(out_brdf)}.' ) # Move the metadata file to the front of the # list to avoid unnecessary downloads. if sensor.lower() in ['l5', 'l7', 'l8']: meta_index = [ i for i in range(0, len(sub_download_list)) if sub_download_list[i].endswith('_MTL.txt') ][0] sub_download_list.insert( 0, sub_download_list.pop(meta_index) ) else: # The Sentinel 2 metadata files come in their own list pass download_list_names = [ Path(dfn).name for dfn in sub_download_list ] results = [] with concurrent.futures.ThreadPoolExecutor( max_workers=n_jobs ) as executor: futures = [ executor.submit( _download_workers, gcp_str, poutdir, outdir, fname, fn, null_items, verbose, ) for fname, fn in zip( download_list_names, sub_download_list ) ] for f in concurrent.futures.as_completed(futures): results.append(f.result()) for key, finfo_ in results: if finfo_: downloaded_sub[key] = finfo_ if downloaded_sub: if len(downloaded_sub) < len(sub_download_list): downloaded_names = [ Path(v.name).name for v in list(downloaded_sub.values()) ] missing_items = ','.join( list( set(download_list_names).difference( downloaded_names ) ) ) logger.warning( ' Only {:d} files out of {:d} were downloaded.'.format( len(downloaded_sub), len(sub_download_list) ) ) logger.warning( ' {} are missing.'.format(missing_items) ) downloaded[search_key] = downloaded_sub return downloaded
[docs] def download_aws(self, landsat_id, band_list, outdir='.'): """Downloads Landsat 8 data from Amazon AWS. Args: landsat_id (str): The Landsat id to download. band_list (list): The Landsat bands to download. outdir (Optional[str]): The output directory. Examples: >>> from geowombat.util import GeoDownloads >>> >>> dl = GeoDownloads() >>> dl.download_aws('LC08_L1TP_224077_20200518_20200518_01_RT', ['b2', 'b3', 'b4']) """ if not REQUESTS_INSTALLED: logger.exception('Requests must be installed.') if not isinstance(outdir, Path): outdir = Path(outdir) parts = landsat_id.split('_') path_row = parts[2] path = int(path_row[:3]) row = int(path_row[3:]) def _download_file(in_file, out_file): response = requests.get(in_file) with open(out_file, 'wb') as f: f.write(response.content) mtl_id = '{landsat_id}_MTL.txt'.format(landsat_id=landsat_id) url = '{aws_l8_public}/{path:03d}/{row:03d}/{landsat_id}/{mtl_id}'.format( aws_l8_public=self.aws_l8_public, path=path, row=row, landsat_id=landsat_id, mtl_id=mtl_id, ) mtl_out = outdir / mtl_id _download_file(url, str(mtl_out)) angle_id = '{landsat_id}_ANG.txt'.format(landsat_id=landsat_id) url = '{aws_l8_public}/{path:03d}/{row:03d}/{landsat_id}/{angle_id}'.format( aws_l8_public=self.aws_l8_public, path=path, row=row, landsat_id=landsat_id, angle_id=angle_id, ) angle_out = outdir / angle_id _download_file(url, str(angle_out)) for band in band_list: band_id = '{landsat_id}_{band}.TIF'.format( landsat_id=landsat_id, band=band.upper() ) url = '{aws_l8_public}/{path:03d}/{row:03d}/{landsat_id}/{band_id}'.format( aws_l8_public=self.aws_l8_public, path=path, row=row, landsat_id=landsat_id, band_id=band_id, ) band_out = outdir / band_id _download_file(url, str(band_out))
# def download_landsat_range(self, sensors, bands, path_range, row_range, date_range, **kwargs): # # """ # Downloads Landsat data from iterables # # Args: # sensors (str): A list of sensors to download. # bands (str): A list of bands to download. # path_range (iterable): A list of paths. # row_range (iterable): A list of rows. # date_range (iterable): A list of ``datetime`` objects or a list of strings as yyyymmdd. # kwargs (Optional[dict]): Keyword arguments to pass to ``download``. # # Examples: # >>> from geowombat.util import GeoDownloads # >>> # >>> dl = GeoDownloads() # >>> dl.download_landsat_range(['lc08'], ['b4'], [42], [34], ['20170616', '20170620']) # """ # # if (len(date_range) == 2) and not isinstance(date_range[0], datetime): # start_date = date_range[0] # end_date = date_range[1] # # sdt = datetime.strptime(start_date, '%Y%m%d') # edt = datetime.strptime(end_date, '%Y%m%d') # # date_range = pd.date_range(start=sdt, end=edt).to_pydatetime().tolist() # # for sensor in sensors: # for band in bands: # for path in path_range: # for row in row_range: # for dt in date_range: # str_date = '{:d}{:02d}{:02d}'.format(dt.year, dt.month, dt.day) # # # TODO: check if L1TP is used for all sensors # # TODO: fixed DATE2 # filename = '{SENSOR}_L1TP_{PATH:03d}{ROW:03d}_{DATE}_{DATE2}_01_T1_{BAND}.TIF'.format( # SENSOR=sensor.upper(), # PATH=path, # ROW=row, # DATE=str_date, # DATE2=None, # BAND=band) # # self.download(filename, **kwargs)
[docs]class CloudPathMixin(object):
[docs] @staticmethod def get_landsat_urls(scene_id, bands=None, cloud='gcp'): """Gets Google Cloud Platform COG urls for Landsat. Args: scene_id (str): The Landsat scene id. bands (Optional[list]): The list of band names. cloud (Optional[str]): The cloud strorage to get the URL from. For now, only 'gcp' is supported. Returns: ``tuple`` of band URLs and metadata URL as strings Example: >>> import os >>> import geowombat as gw >>> from geowombat.util import GeoDownloads >>> >>> os.environ['CURL_CA_BUNDLE'] = '/etc/ssl/certs/ca-certificates.crt' >>> >>> gdl = GeoDownloads() >>> >>> scene_urls, meta_url = gdl.get_landsat_urls('LC08_L1TP_042034_20171225_20180103_01_T1', >>> bands=['blue', 'green', 'red']) >>> >>> with gw.open(urls) as src: >>> print(src) """ gcp_base = 'https://storage.googleapis.com/gcp-public-data-landsat' ( sensor_collection, level, path_row, date_acquire, date_other, collection, tier, ) = scene_id.split('_') path = path_row[:3] row = path_row[3:] if bands: sensor = f'{sensor_collection[0].lower()}{sensor_collection[3]}' # Landsat 7 has the thermal band sensor = 'l7th' if sensor == 'l7' else sensor wavelengths = get_sensor_info(key='wavelength', sensor=sensor) band_pos = [getattr(wavelengths, b) for b in bands] else: band_pos = [1] lid = f'{sensor_collection}/01/{path}/{row}/{scene_id}' scene_urls = [ f'{gcp_base}/{lid}/{scene_id}_B{band_pos}.TIF' for band_pos in band_pos ] meta_url = f'{gcp_base}/{lid}/{scene_id}_MTL.txt' return scene_urls, meta_url
[docs] @staticmethod def get_sentinel2_urls(safe_id, bands=None, cloud='gcp'): """Gets Google Cloud Platform COG urls for Sentinel 2. Args: safe_id (str): The Sentinel 2 SAFE id. bands (Optional[list]): The list of band names. cloud (Optional[str]): The cloud strorage to get the URL from. For now, only 'gcp' is supported. Returns: ``tuple`` of band URLs and metadata URL as strings Example: >>> import os >>> import geowombat as gw >>> from geowombat.util import GeoDownloads >>> >>> os.environ['CURL_CA_BUNDLE'] = '/etc/ssl/certs/ca-certificates.crt' >>> >>> gdl = GeoDownloads() >>> >>> safe_id = 'S2A_MSIL1C_20180109T135101_N0206_R024_T21HUD_20180109T171608.SAFE/GRANULE/L1C_T21HUD_A013320_20180109T135310' >>> >>> scene_urls, meta_url = gdl.get_sentinel2_urls(safe_id, >>> bands=['blue', 'green', 'red', 'nir']) >>> >>> with gw.open(urls) as src: >>> print(src) """ gcp_base = 'https://storage.googleapis.com/gcp-public-data-sentinel-2' sensor, level, date, __, __, mgrs, __ = safe_id.split('/')[0].split( '_' ) utm = mgrs[1:3] zone = mgrs[3] id_ = mgrs[4:] if bands: sensor = sensor.lower() wavelengths = get_sensor_info(key='wavelength', sensor=sensor) band_pos = [getattr(wavelengths, b) for b in bands] else: band_pos = [1] lid = f'{utm}/{zone}/{id_}/{safe_id}/IMG_DATA/{mgrs}_{date}' scene_urls = [ f'{gcp_base}/tiles/{lid}_B{band_pos:02d}.jp2' for band_pos in band_pos ] meta_url = f'{utm}/{zone}/{id_}/{safe_id}/MTD_TL.xml' return scene_urls, meta_url
def _is_within_directory(directory: Path, target: Path) -> bool: abs_directory = str(directory.absolute()) abs_target = str(target.absolute()) prefix = os.path.commonprefix([abs_directory, abs_target]) return prefix == abs_directory def _safe_extract( tar: object, path: Path = Path('.'), members=None, *, numeric_owner: bool = False, ) -> None: for member in tar.getmembers(): member_path = path / member.name if not _is_within_directory(path, member_path): raise Exception("Attempted Path Traversal in Tar File") tar.extractall(path, members, numeric_owner=numeric_owner)
[docs]class GeoDownloads(CloudPathMixin, DownloadMixin): def __init__(self): self._gcp_search_dict = None self.search_dict = None self.gcp_public = 'https://storage.googleapis.com/gcp-public-data' self.aws_l8_public = 'https://landsat-pds.s3.amazonaws.com/c1/L8' self.landsat_parts = ['lt05', 'le07', 'lc08'] self.sentinel_parts = ['s2a', 's2b'] s2_dict = dict( coastal=1, blue=2, green=3, red=4, nir1=5, nir2=6, nir3=7, nir=8, rededge=8, water=9, cirrus=10, swir1=11, swir2=12, ) self.gcp_dict = dict( l5='LT05/01', l7='LE07/01', l8='LC08/01', s2='tiles', s2a='tiles', s2b='tiles', s2c='tiles', ) self.sensor_collections = dict(l5='lt05', l7='le07', l8='lc08') self.orbit_dates = dict( l5=OrbitDates( start=datetime.strptime('1984-3-1', '%Y-%m-%d'), end=datetime.strptime('2013-6-5', '%Y-%m-%d'), ), l7=OrbitDates( start=datetime.strptime('1999-4-15', '%Y-%m-%d'), end=datetime.strptime('2100-1-1', '%Y-%m-%d'), ), l8=OrbitDates( start=datetime.strptime('2013-2-11', '%Y-%m-%d'), end=datetime.strptime('2100-1-1', '%Y-%m-%d'), ), s2a=OrbitDates( start=datetime.strptime('2015-6-23', '%Y-%m-%d'), end=datetime.strptime('2100-1-1', '%Y-%m-%d'), ), s2b=OrbitDates( start=datetime.strptime('2017-3-7', '%Y-%m-%d'), end=datetime.strptime('2100-1-1', '%Y-%m-%d'), ), ) self.associations = dict( l5=dict( blue=1, green=2, red=3, nir=4, swir1=5, thermal=6, swir2=7 ), l7=dict( blue=1, green=2, red=3, nir=4, swir1=5, thermal=6, swir2=7, pan=8, ), l8=dict( coastal=1, blue=2, green=3, red=4, nir=5, swir1=6, swir2=7, pan=8, cirrus=9, tirs1=10, tirs2=11, ), s2=s2_dict, s2a=s2_dict, s2b=s2_dict, s2c=s2_dict, )
[docs] def download_cube( self, sensors, date_range, bounds, bands, bands_out=None, crs=None, out_bounds=None, outdir='.', resampling='bilinear', ref_res=None, l57_angles_path=None, l8_angles_path=None, subsample=1, write_format='gtiff', write_angle_files=False, mask_qa=False, lqa_mask_items=None, chunks=512, cloud_heights=None, sr_method='srem', earthdata_username=None, earthdata_key_file=None, earthdata_code_file=None, srtm_outdir=None, n_jobs=1, num_workers=1, num_threads=1, **kwargs, ): """Downloads a cube of Landsat and/or Sentinel 2 imagery. Args: sensors (str or list): The sensors, or sensor, to download. date_range (list): The date range, given as [date1, date2], where the date format is yyyy-mm. bounds (GeoDataFrame, list, or tuple): The geometry bounds (in WGS84 lat/lon) that define the cube extent to download. If given as a ``GeoDataFrame``, only the first ``DataFrame`` record will be used. If given as a ``tuple`` or a ``list``, the order should be (left, bottom, right, top). bands (str or list): The bands to download. E.g.: Sentinel s2cloudless bands: bands = ['coastal', 'blue', 'red', 'nir1', 'nir', 'rededge', 'water', 'cirrus', 'swir1', 'swir2'] bands_out (Optional[list]): The bands to write to file. This might be useful after downloading all bands to mask clouds, but are only interested in subset of those bands. crs (Optional[str or object]): The output CRS. If ``bounds`` is a ``GeoDataFrame``, the CRS is taken from the object. out_bounds (Optional[list or tuple]): The output bounds in ``crs``. If not given, the bounds are taken from ``bounds``. outdir (Optional[str]): The output directory. ref_res (Optional[tuple]): A reference cell resolution. resampling (Optional[str]): The resampling method. l57_angles_path (str): The path to the Landsat 5 and 7 angles bin. l8_angles_path (str): The path to the Landsat 8 angles bin. subsample (Optional[int]): The sub-sample factor when calculating the angles. write_format (Optional[bool]): The data format to write. Choices are ['gtiff', 'netcdf']. write_angle_files (Optional[bool]): Whether to write the angles to file. mask_qa (Optional[bool]): Whether to mask data with the QA file. lqa_mask_items (Optional[list]): A list of QA mask items for Landsat. chunks (Optional[int]): The chunk size to read at. cloud_heights (Optional[list]): The cloud heights, in kilometers. sr_method (Optional[str]): The surface reflectance correction method. Choices are ['srem', '6s']. earthdata_username (Optional[str]): The EarthData username. earthdata_key_file (Optional[str]): The EarthData secret key file. earthdata_code_file (Optional[str]): The EarthData secret passcode file. srtm_outdir (Optional[str]): The output SRTM directory. n_jobs (Optional[int]): The number of parallel download workers for ``joblib``. num_workers (Optional[int]): The number of parallel workers for ``dask.compute``. num_threads (Optional[int]): The number of GDAL warp threads. kwargs (Optional[dict]): Keyword arguments passed to ``to_raster``. Examples: >>> from geowombat.util import GeoDownloads >>> gdl = GeoDownloads() >>> >>> # Download a Landsat 7 panchromatic cube >>> gdl.download_cube(['l7'], >>> ['2010-01-01', '2010-02-01'], >>> (-91.57, 40.37, -91.46, 40.42), >>> ['pan'], >>> crs="+proj=aea +lat_1=-5 +lat_2=-42 +lat_0=-32 +lon_0=-60 +x_0=0 +y_0=0 +ellps=aust_SA +units=m +no_defs") >>> >>> # Download a Landsat 7, 8 and Sentinel 2 cube of the visible spectrum >>> gdl.download_cube(['l7', 'l8', 's2a'], >>> ['2017-01-01', '2018-01-01'], >>> (-91.57, 40.37, -91.46, 40.42), >>> ['blue', 'green', 'red'], >>> crs={'init': 'epsg:102033'}, >>> readxsize=1024, >>> readysize=1024, >>> n_workers=1, >>> n_threads=8) """ if write_format not in ['gtiff', 'netcdf']: logger.warning( f' Did not recognize {write_format}. Setting the output data format as gtiff.' ) write_format = 'gtiff' if not lqa_mask_items: lqa_mask_items = [ 'fill', 'saturated', 'cloudconf', 'shadowconf', 'cirrusconf', ] if isinstance(sensors, str): sensors = [sensors] angle_kwargs = kwargs.copy() angle_kwargs['nodata'] = -32768 nodataval = kwargs['nodata'] if 'nodata' in kwargs else 65535 angle_infos = {} rt = RadTransforms() br = BRDF() la = LinearAdjustments() dos = DOS() sixs = SixS() main_path = Path(outdir) outdir_tmp = main_path.joinpath('tmp') outdir_brdf = main_path.joinpath('brdf') main_path.mkdir(parents=True, exist_ok=True) outdir_tmp.mkdir(parents=True, exist_ok=True) outdir_brdf.mkdir(parents=True, exist_ok=True) # Logging file # status = Path(outdir).joinpath('status.txt') # # if not status.is_file(): # # with open(str(status), mode='w') as tx: # pass # Get bounds from geometry if isinstance(bounds, tuple) or isinstance(bounds, list): bounds = Polygon( [ (bounds[0], bounds[3]), # upper left (bounds[2], bounds[3]), # upper right (bounds[2], bounds[1]), # lower right (bounds[0], bounds[1]), # lower left (bounds[0], bounds[3]), ] ) # upper left bounds = gpd.GeoDataFrame( [0], geometry=[bounds], crs={'init': 'epsg:4326'} ) bounds_object = bounds.geometry.values[0] if not out_bounds: # Project the bounds out_bounds = bounds.to_crs(crs).bounds.values[0].tolist() # Get WRS file data_bin = os.path.realpath(os.path.dirname(__file__)) data_dir = Path(data_bin).joinpath('../data') shp_dict = {} if ('l5' in sensors) or ('l7' in sensors) or ('l8' in sensors): path_tar = Path(data_dir).joinpath('wrs2.tar.gz') path_shp = Path(data_dir).joinpath('wrs2_descending.shp') wrs = os.path.realpath(path_shp.as_posix()) if not path_shp.is_file(): with tarfile.open( os.path.realpath(path_tar.as_posix()), mode='r:gz' ) as tf: _safe_extract(tf, Path(data_dir)) df_wrs = gpd.read_file(wrs) df_wrs = df_wrs[df_wrs.geometry.intersects(bounds_object)] if df_wrs.empty: logger.warning(' The geometry bounds is empty.') return shp_dict['wrs'] = df_wrs if ('s2a' in sensors) or ('s2b' in sensors) or ('s2c' in sensors): path_tar = Path(data_dir).joinpath('mgrs.tar.gz') path_shp = Path(data_dir).joinpath('sentinel2_grid.shp') mgrs = os.path.realpath(path_shp.as_posix()) if not path_shp.is_file(): with tarfile.open( os.path.realpath(path_tar.as_posix()), mode='r:gz' ) as tf: _safe_extract(tf, Path(data_dir)) df_mgrs = gpd.read_file(mgrs) df_mgrs = df_mgrs[df_mgrs.geometry.intersects(bounds_object)] if df_mgrs.empty: logger.warning(' The geometry bounds is empty.') return shp_dict['mgrs'] = df_mgrs dt1 = datetime.strptime(date_range[0], '%Y-%m') dt2 = datetime.strptime(date_range[1], '%Y-%m') months = list(range(1, 13)) year_months = {} if dt1.month <= dt2.month: month_range = months[ months.index(dt1.month) : months.index(dt2.month) + 1 ] else: month_range = ( months[months.index(dt1.month) :] + months[: months.index(dt2.month) + 1] ) if dt1.year == dt2.year: year_months[dt1.year] = month_range else: for y in range(dt1.year, dt2.year + 1): if y == dt1.year: year_months[y] = list(range(dt1.month, 13)) elif y == dt2.year: year_months[y] = list(range(1, dt2.month + 1)) else: year_months[y] = months year = dt1.year while True: if year > dt2.year: break for m in year_months[year]: yearmonth_query = '{:d}{:02d}'.format(year, m) target_date = datetime.strptime(yearmonth_query, '%Y%m') for sensor in sensors: # Avoid unnecessary GCP queries if ( target_date < self.orbit_dates[sensor.lower()].start ) or (target_date > self.orbit_dates[sensor.lower()].end): continue band_associations = self.associations[sensor] if sensor.lower() in ['s2', 's2a', 's2b', 's2c']: locations = [ '{}/{}/{}'.format( dfrow.Name[:2], dfrow.Name[2], dfrow.Name[3:] ) for dfi, dfrow in shp_dict['mgrs'].iterrows() ] else: locations = [ '{:03d}/{:03d}'.format( int(dfrow.PATH), int(dfrow.ROW) ) for dfi, dfrow in shp_dict['wrs'].iterrows() ] for location in locations: if sensor.lower() in ['s2', 's2a', 's2b', 's2c']: query = '{LOCATION}/{LEVEL}*{YM}*.SAFE/GRANULE/*'.format( LOCATION=location, LEVEL=sensor.upper(), YM=yearmonth_query, ) else: query = '{LOCATION}/*{PATHROW}_{YM}*_T1'.format( LOCATION=location, PATHROW=location.replace('/', ''), YM=yearmonth_query, ) # Query and list available files on the GCP self.list_gcp(sensor, query) self.search_dict = self.get_gcp_results if not self.search_dict: logger.warning( ' No results found for {SENSOR} at location {LOC}, year {YEAR:d}, month {MONTH:d}.'.format( SENSOR=sensor, LOC=location, YEAR=year, MONTH=m, ) ) continue # Download data if sensor.lower() in ['s2', 's2a', 's2b', 's2c']: load_bands = [ f'B{band_associations[bd]:02d}' if bd != 'rededge' else f'B{band_associations[bd]:01d}A' for bd in bands ] search_wildcards = ['MTD_TL.xml'] + [ bd + '.jp2' for bd in load_bands ] file_info = self.download_gcp( sensor, outdir=outdir_tmp, outdir_brdf=outdir_brdf, search_wildcards=search_wildcards, n_jobs=n_jobs, verbose=1, ) # Reorganize the dictionary to combine bands and metadata new_dict_ = {} for finfo_key, finfo_dict in file_info.items(): sub_dict_ = {} if 'meta' in finfo_dict: key = finfo_dict['meta'].name sub_dict_['meta'] = finfo_dict['meta'] for ( finfo_key_, finfo_dict_, ) in file_info.items(): if 'meta' not in finfo_dict_: for ( bdkey_, bdinfo_, ) in finfo_dict_.items(): if ( '_'.join( bdinfo_.name.split( '_' )[:-1] ) in key ): sub_dict_[bdkey_] = bdinfo_ new_dict_[finfo_key] = sub_dict_ file_info = new_dict_ else: del_keys = [ k for k, v in self.search_dict.items() if 'gap_mask' in k ] for dk in del_keys: del self.search_dict[dk] load_bands = sorted( [ 'B{:d}'.format(band_associations[bd]) for bd in bands ] ) search_wildcards = [ 'ANG.txt', 'MTL.txt', 'BQA.TIF', ] + [bd + '.TIF' for bd in load_bands] file_info = self.download_gcp( sensor, outdir=outdir_tmp, outdir_brdf=outdir_brdf, search_wildcards=search_wildcards, n_jobs=n_jobs, verbose=1, ) logger.info( ' Finished downloading files for yyyymm query, {}.'.format( yearmonth_query ) ) # Create pixel angle files # TODO: this can be run in parallel for finfo_key, finfo_dict in file_info.items(): # Incomplete dictionary because file was checked, existed, and cleaned if 'meta' not in finfo_dict: logger.warning( ' The metadata does not exist.' ) _clean_and_update( None, finfo_dict, None, check_angles=False ) continue brdfp = '_'.join( Path(finfo_dict['meta'].name).name.split('_')[ :-1 ] ) out_brdf = outdir_brdf.joinpath(brdfp + '.tif') out_angles = outdir_brdf.joinpath( brdfp + '_angles.tif' ) if sensor in ['s2', 's2a', 's2b', 's2c']: outdir_angles = outdir_tmp.joinpath( 'angles_{}'.format( Path( finfo_dict['meta'].name ).name.replace('_MTD_TL.xml', '') ) ) else: outdir_angles = outdir_tmp.joinpath( 'angles_{}'.format( Path( finfo_dict['meta'].name ).name.replace('_MTL.txt', '') ) ) if not Path(finfo_dict['meta'].name).is_file(): logger.warning( ' The metadata does not exist.' ) _clean_and_update( outdir_angles, finfo_dict, finfo_dict['meta'].name, check_angles=False, ) continue if out_brdf.is_file(): logger.warning( ' The output BRDF file, {}, already exists.'.format( brdfp ) ) _clean_and_update( outdir_angles, finfo_dict, finfo_dict['meta'].name, check_angles=False, ) continue if load_bands[0] not in finfo_dict: logger.warning( ' The download for {} was incomplete.'.format( brdfp ) ) _clean_and_update( outdir_angles, finfo_dict, finfo_dict['meta'].name, check_angles=False, ) continue outdir_angles.mkdir(parents=True, exist_ok=True) ref_file = finfo_dict[load_bands[0]].name logger.info( ' Processing angles for {} ...'.format(brdfp) ) if sensor.lower() in ['s2', 's2a', 's2b', 's2c']: meta = rt.get_sentinel_coefficients( finfo_dict['meta'].name ) angle_info = sentinel_pixel_angles( finfo_dict['meta'].name, ref_file, str(outdir_angles), nodata=-32768, overwrite=False, verbose=1, ) if ( ' '.join(bands) == 'coastal blue green red nir1 nir2 nir3 nir rededge water cirrus swir1 swir2' ): rad_sensor = ( 's2af' if angle_info.sensor == 's2a' else 's2bf' ) elif ( ' '.join(bands) == 'coastal blue red nir1 nir rededge water cirrus swir1 swir2' ): rad_sensor = ( 's2acloudless' if angle_info.sensor == 's2a' else 's2bcloudless' ) elif ( ' '.join(bands) == 'blue green red nir1 nir2 nir3 nir rededge swir1 swir2' ): rad_sensor = angle_info.sensor elif ( ' '.join(bands) == 'blue green red nir swir1 swir2' ): rad_sensor = ( 's2al7' if angle_info.sensor == 's2a' else 's2bl7' ) elif ( ' '.join(bands) == 'nir1 nir2 nir3 rededge swir1 swir2' ): rad_sensor = ( 's2a20' if angle_info.sensor == 's2a' else 's2b20' ) elif ' '.join(bands) == 'blue green red nir': rad_sensor = ( 's2a10' if angle_info.sensor == 's2a' else 's2b10' ) else: rad_sensor = angle_info.sensor bandpass_sensor = angle_info.sensor else: meta = rt.get_landsat_coefficients( finfo_dict['meta'].name ) angle_info = landsat_pixel_angles( finfo_dict['angle'].name, ref_file, str(outdir_angles), meta.sensor, l57_angles_path=l57_angles_path, l8_angles_path=l8_angles_path, subsample=subsample, resampling='bilinear', num_threads=num_workers, verbose=1, ) if (len(bands) == 1) and (bands[0] == 'pan'): rad_sensor = sensor + bands[0] else: if (len(bands) == 6) and ( meta.sensor == 'l8' ): rad_sensor = 'l8l7' elif ( (len(bands) == 7) and (meta.sensor == 'l8') and ('pan' in bands) ): rad_sensor = 'l8l7mspan' elif ( (len(bands) == 7) and (meta.sensor == 'l7') and ('pan' in bands) ): rad_sensor = 'l7mspan' else: rad_sensor = meta.sensor bandpass_sensor = sensor if sensor in ['s2', 's2a', 's2b', 's2c']: logger.info( f' Translating jp2 files to gtiff for {brdfp} ...' ) load_bands_names = [] # Convert to GeoTiffs to avoid CRS issue with jp2 format for bd in load_bands: # Check if the file exists to avoid duplicate GCP filenames` if Path(finfo_dict[bd].name).is_file(): warp( finfo_dict[bd].name, finfo_dict[bd].name.replace( '.jp2', '.tif' ), overwrite=True, delete_input=True, multithread=True, warpMemoryLimit=256, outputBounds=out_bounds, xRes=ref_res[0], yRes=ref_res[1], resampleAlg=RESAMPLING_DICT[ resampling ], creationOptions=[ 'TILED=YES', 'COMPRESS=LZW', 'BLOCKXSIZE={CHUNKS:d}'.format( CHUNKS=chunks ), 'BLOCKYSIZE={CHUNKS:d}'.format( CHUNKS=chunks ), ], ) load_bands_names.append( finfo_dict[bd].name.replace( '.jp2', '.tif' ) ) else: # Get band names from user try: load_bands_names = [ finfo_dict[bd].name for bd in load_bands ] except Exception: logger.exception( ' Could not get all band name associations.' ) raise NameError logger.info( f' Applying BRDF and SR correction for {brdfp} ...' ) with gw.config.update( sensor=rad_sensor, ref_bounds=out_bounds, ref_crs=crs, ref_res=ref_res if ref_res else load_bands_names[-1], ignore_warnings=True, nasa_earthdata_user=earthdata_username, nasa_earthdata_key=earthdata_key_file, nasa_earthdata_code=earthdata_code_file, ): valid_data = True # Ensure there is data with gw.open( load_bands_names[0], band_names=[1], chunks=chunks, num_threads=num_threads, ) as data: if ( data.sel(band=1) .min() .data.compute(num_workers=num_workers) > 10000 ): valid_data = False if valid_data: if ( data.sel(band=1) .max() .data.compute( num_workers=num_workers ) == 0 ): valid_data = False if valid_data: with gw.open( angle_info.sza, chunks=chunks, resampling='bilinear', ) as sza, gw.open( angle_info.vza, chunks=chunks, resampling='bilinear', ) as vza, gw.open( angle_info.saa, chunks=chunks, resampling='bilinear', ) as saa, gw.open( angle_info.vaa, chunks=chunks, resampling='bilinear', ) as vaa, gw.open( load_bands_names, band_names=bands, stack_dim='band', chunks=chunks, resampling=resampling, num_threads=num_threads, ) as data: attrs = data.attrs.copy() if mask_qa: if sensor.lower() in [ 's2', 's2a', 's2b', 's2c', ]: if S2CLOUDLESS_INSTALLED: cloud_detector = ( S2PixelCloudDetector( threshold=0.4, average_over=1, dilation_size=5, all_bands=False, ) ) # Get the S2Cloudless bands data_cloudless = data.sel( band=[ 'coastal', 'blue', 'red', 'nir1', 'nir', 'rededge', 'water', 'cirrus', 'swir1', 'swir2', ] ) # Scale from 0-10000 to 0-1 and reshape X = ( ( data_cloudless * 0.0001 ) .clip(0, 1) .data.compute( num_workers=num_workers ) .transpose(1, 2, 0)[ np.newaxis, :, :, : ] ) # Predict clouds # Potential classes? Currently, only clear and clouds are returned. # clear=0, clouds=1, shadow=2, snow=3, cirrus=4, water=5 mask = ndarray_to_xarray( data, cloud_detector.get_cloud_masks( X ), ['mask'], ) else: if bands_out: # If there are extra bands, remove them because they # are not supported in the BRDF kernels. data = _assign_attrs( data, attrs, bands_out, ) logger.warning( ' S2Cloudless is not installed, so skipping Sentinel cloud masking.' ) if sr_method == 'srem': if sensor.lower() in [ 's2', 's2a', 's2b', 's2c', ]: # The S-2 data are in TOAR (0-10000) toar_scaled = ( (data * 0.0001) .astype('float64') .clip(0, 1) .assign_attrs(**attrs) ) # Convert TOAR to surface reflectance sr = rt.toar_to_sr( toar_scaled, sza, saa, vza, vaa, rad_sensor, method='srem', dst_nodata=nodataval, ) else: # Convert DN to surface reflectance sr = rt.dn_to_sr( data, sza, saa, vza, vaa, method='srem', sensor=rad_sensor, meta=meta, src_nodata=nodataval, dst_nodata=nodataval, ) else: if sensor.lower() in [ 's2', 's2a', 's2b', 's2c', ]: # The S-2 data are in TOAR (0-10000) data = ( (data * 0.0001) .astype('float64') .assign_attrs(**attrs) ) data_values = 'toar' else: data_values = 'dn' if ( isinstance( earthdata_username, str ) and isinstance( earthdata_key_file, str ) and isinstance( earthdata_code_file, str ) ): altitude = ( sixs.get_mean_altitude( data, srtm_outdir, n_jobs=n_jobs, ) ) altitude *= 0.0001 else: altitude = 0.0 # Resample to 100m x 100m data_coarse = data.sel( band=['blue', 'swir2'] ).gw.transform_crs( dst_res=500.0, resampling='med' ) aot = dos.get_aot( data_coarse, meta.sza, meta, data_values=data_values, dn_interp=data, angle_factor=1.0, interp_method='fast', aot_fallback=0.3, h2o=2.0, o3=0.3, # global average of total ozone in a vertical column (3 cm) altitude=altitude, w=151, n_jobs=n_jobs, ) if sensor.lower() in [ 's2', 's2a', 's2b', 's2c', ]: sr = rt.toar_to_sr( data, meta.sza, None, None, None, meta=meta, src_nodata=nodataval, dst_nodata=nodataval, angle_factor=1.0, method='6s', interp_method='fast', h2o=2.0, o3=0.3, aot=aot, altitude=altitude, n_jobs=n_jobs, ) else: sr = rt.dn_to_sr( data, meta.sza, None, None, None, meta=meta, src_nodata=nodataval, dst_nodata=nodataval, angle_factor=1.0, method='6s', interp_method='fast', h2o=2.0, o3=0.3, aot=aot, altitude=altitude, n_jobs=n_jobs, ) # BRDF normalization sr_brdf = br.norm_brdf( sr, sza, saa, vza, vaa, sensor=rad_sensor, wavelengths=data.band.values.tolist(), out_range=10000.0, src_nodata=nodataval, dst_nodata=nodataval, ) if bandpass_sensor.lower() in [ 'l5', 'l7', 's2', 's2a', 's2b', 's2c', ]: # Linearly adjust to Landsat 8 sr_brdf = la.bandpass( sr_brdf, bandpass_sensor.lower(), to='l8', scale_factor=0.0001, src_nodata=nodataval, dst_nodata=nodataval, ) if mask_qa: if sensor.lower() in [ 's2', 's2a', 's2b', 's2c', ]: if S2CLOUDLESS_INSTALLED: wavel_sub = ( sr_brdf.gw.set_nodata( nodataval, nodataval, out_range=(0, 1), dtype='float64', ) ) # Estimate the cloud shadows mask = estimate_cloud_shadows( wavel_sub, mask, sza, saa, vza, vaa, heights=cloud_heights, num_workers=num_workers, ) # Update the bands with the mask sr_brdf = xr.where( ( mask.sel( band='mask' ) == 0 ) & ( sr_brdf != nodataval ), sr_brdf.clip(0, 10000), nodataval, ).astype('uint16') sr_brdf = _assign_attrs( sr_brdf, attrs, bands_out ) if write_format == 'gtiff': sr_brdf.gw.to_raster( str(out_brdf), **kwargs ) else: sr_brdf.gw.to_netcdf( str(out_brdf), zlib=True, complevel=5, ) else: with gw.open( finfo_dict['qa'].name, band_names=['qa'], ) as qa: if sensor.lower() == 'l8': qa_sensor = 'l8-c1' else: qa_sensor = 'l-c1' mask = QAMasker( qa, qa_sensor, mask_items=lqa_mask_items, confidence_level='maybe', ).to_mask() # Mask non-clear pixels sr_brdf = xr.where( mask.sel(band='mask') < 2, sr_brdf.clip(0, 10000), nodataval, ).astype('uint16') sr_brdf = _assign_attrs( sr_brdf, attrs, bands_out, ) if write_format == 'gtiff': sr_brdf.gw.to_raster( str(out_brdf), **kwargs, ) else: sr_brdf.gw.to_netcdf( str(out_brdf), zlib=True, complevel=5, ) else: # Set 'no data' values sr_brdf = sr_brdf.gw.set_nodata( nodataval, nodataval, out_range=(0, 10000), dtype='uint16', ) sr_brdf = _assign_attrs( sr_brdf, attrs, bands_out ) if write_format == 'gtiff': sr_brdf.gw.to_raster( str(out_brdf), **kwargs ) else: sr_brdf.gw.to_netcdf( str(out_brdf).replace( '.tif', '.nc' ), zlib=True, complevel=5, ) if write_angle_files: angle_stack = ( xr.concat( (sza, saa), dim='band' ) .astype('int16') .assign_coords( band=['sza', 'saa'] ) .assign_attrs( **sza.attrs.copy() ) ) if write_format == 'gtiff': angle_stack.gw.to_raster( str(out_angles), **kwargs ) else: angle_stack.gw.to_netcdf( str(out_angles).replace( '.tif', '.nc' ), zlib=True, complevel=5, ) else: logger.warning( ' Not enough data for {} to store on disk.'.format( str(out_brdf) ) ) # Write an empty file for tracking with open( str(out_brdf).replace( '.tif', '.nodata' ), 'w', ) as tx: tx.writelines([]) angle_infos[finfo_key] = angle_info _clean_and_update( outdir_angles, finfo_dict, finfo_dict['meta'].name, load_bands_names=load_bands_names, ) year += 1
[docs] def list_gcp(self, sensor, query): """Lists files from Google Cloud Platform. Args: sensor (str): The sensor to query. Choices are ['l5', 'l7', 'l8', 's2a', 's2c']. query (str): The query string. Examples: >>> dl = GeoDownloads() >>> >>> # Query from a known directory >>> dl.list_gcp('landsat', 'LC08/01/042/034/LC08_L1TP_042034_20161104_20170219_01_T1/') >>> >>> # Query a date for Landsat 5 >>> dl.list_gcp('l5', '042/034/*2016*') >>> >>> # Query a date for Landsat 7 >>> dl.list_gcp('l7', '042/034/*2016*') >>> >>> # Query a date for Landsat 8 >>> dl.list_gcp('l8', '042/034/*2016*') >>> >>> # Query Sentinel-2 >>> dl.list_gcp('s2a', '21/H/UD/*2019*.SAFE/GRANULE/*') Returns: ``dict`` """ if sensor not in ['l5', 'l7', 'l8', 's2', 's2a', 's2b', 's2c']: logger.exception( " The sensor must be 'l5', 'l7', 'l8', 's2', 's2a', 's2b', or 's2c'." ) raise NameError if sensor in ['s2', 's2a', 's2b', 's2c']: gcp_str = "gsutil ls -r gs://gcp-public-data-sentinel-2" else: gcp_str = "gsutil ls -r gs://gcp-public-data-landsat" gsutil_str = gcp_str + "/" + self.gcp_dict[sensor] + "/" + query try: proc = subprocess.run( gsutil_str.split(' '), stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except Exception: logger.exception('gsutil must be installed.') output = proc.stdout gcp_search_list = [ outp for outp in output.decode('utf-8').split('\n') if '$folder$' not in outp ] self._gcp_search_dict = {} if gcp_search_list: # Check for length-1 lists with empty strings if gcp_search_list[0]: if sensor in ['s2', 's2a', 's2b', 's2c']: self._gcp_search_dict = self._prepare_gcp_dict( gcp_search_list, 'gs://gcp-public-data-sentinel-2/' ) else: self._gcp_search_dict = self._prepare_gcp_dict( gcp_search_list, 'gs://gcp-public-data-landsat/' )
@property def get_gcp_results(self): return self._gcp_search_dict.copy() @staticmethod def _prepare_gcp_dict(search_list, gcp_str): """Prepares a list of GCP keys into a dictionary. Args: search_list (list) Returns: ``dict`` """ df = pd.DataFrame(data=search_list, columns=['url']) df['mask'] = df.url.str.strip().str.endswith('/:') mask_idx = np.where(df['mask'].values)[0] mask_range = mask_idx.shape[0] - 1 if mask_idx.shape[0] > 1 else 1 url_dict = {} for mi in range(0, mask_range): m1 = mask_idx[mi] if mask_range > 1: m2 = mask_idx[mi + 1] - 1 else: m2 = len(search_list) key = search_list[m1].replace(gcp_str, '').replace('/:', '') values = search_list[m1:m2] values = [value for value in values if value] url_dict[key] = [ value for value in values if not value.endswith('/:') ] return url_dict