Distributed processing#

One of the key features of geowombat is the ability to write dask tasks to file in a concurrent workflow. Below are several examples illustrating this process.

Import GeoWombat and Dask

In [1]: import geowombat as gw

In [2]: from geowombat.data import l8_224078_20200518, l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4

Dask diagnostics

In [3]: from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler

In [4]: from dask.diagnostics import visualize

Use dask to compute with concurrent workers#

Note

These examples illustrate the dask concurrency over an xarray.DataArray` task. An example showing how to write results to file concurrently are shown at the bottom of the page.

Chunk sizes of 64x64 require many reads for a simple calculation.

with Profiler() as prof, \
    ResourceProfiler(dt=0.25) as rprof, \
        CacheProfiler() as cprof:

    # Set the sensor name
    with gw.config.update(sensor='bgr'):

        with gw.open(l8_224078_20200518, chunks=64) as src:

            # Normalized difference index
            ndi = gw.norm_diff(src, 'green', 'red', nodata=0, scale_factor=0.0001)

            # Send the task to dask
            results = ndi.data.compute(num_workers=4)
prof.visualize()

Chunk sizes of 256x256 reduce the number of file reads.

with Profiler() as prof, \
    ResourceProfiler(dt=0.25) as rprof, \
        CacheProfiler() as cprof:

    # Set the sensor name
    with gw.config.update(sensor='bgr'):

        with gw.open(l8_224078_20200518, chunks=256) as src:

            # Normalized difference index
            ndi = gw.norm_diff(src, 'green', 'red', nodata=0, scale_factor=0.0001)

            # Send the task to dask
            results = ndi.data.compute(num_workers=4)
prof.visualize()

Increase the number of parallel workers

Note

The appropriate choice of chunk size is challenging and takes some practice. Start by reading Dask Best Practices. We find, however, that with some experimentation you can find a good chunk size for common tasks. One simple approach is to choose a chunk size that fills around 75-95% of memory on your system. Accidentally exceeding 100% of memory leads to significant slow-downs.

If you decide to manually calculate how large chunks should be to utilize all resources, keep in mind that “Dask will often have as many chunks in memory as twice the number of active threads” Orientation of chunks is also critical, especially if dealing with multiple bands or a time series of images. Chunks in this case should have three dimensions ([bands, y, x] or [time, bands, y, x]). So, a five-period image stack with a single band might have a chunk size of [5, 1, 256, 256]. Proper orientation will reduce the need to read the same data more than once.

with Profiler() as prof, \
    ResourceProfiler(dt=0.25) as rprof, \
        CacheProfiler() as cprof:

    # Set the sensor name
    with gw.config.update(sensor='bgr'):

        with gw.open(l8_224078_20200518, chunks=256) as src:

            # Normalized difference index
            ndi = gw.norm_diff(src, 'green', 'red', nodata=0, scale_factor=0.0001)

            # Send the task to dask
            results = ndi.data.compute(num_workers=8)
prof.visualize()

Increase the complexity of the parallel task#

Open bands as separate files#

In [5]: chunks = 256

In [6]: with Profiler() as prof, \
   ...:     ResourceProfiler(dt=0.25) as rprof, \
   ...:         CacheProfiler() as cprof:
   ...:     with gw.open(l8_224078_20200518_B2, band_names=['blue'], chunks=chunks) as src_b2, \
   ...:         gw.open(l8_224078_20200518_B3, band_names=['green'], chunks=chunks) as src_b3, \
   ...:             gw.open(l8_224078_20200518_B4, band_names=['red'], chunks=chunks) as src_b4:
   ...:         t2 = src_b2.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
   ...:         t3 = src_b3.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
   ...:         t4 = src_b4.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
   ...:         task = (
   ...:             t2.sel(band='blue') * t3.sel(band='green') * t4.sel(band='red')
   ...:         ).expand_dims(dim='band').assign_coords({'band': ['results']})
   ...:         print(task)
   ...:         results = task.data.compute(num_workers=8)
   ...: 
<xarray.DataArray (band: 1, y: 1860, x: 2041)> Size: 30MB
dask.array<broadcast_to, shape=(1, 1860, 2041), dtype=float64, chunksize=(1, 256, 256), chunktype=numpy.ndarray>
Coordinates:
  * x        (x) float64 16kB 7.174e+05 7.174e+05 ... 7.785e+05 7.786e+05
  * y        (y) float64 15kB -2.777e+06 -2.777e+06 ... -2.833e+06 -2.833e+06
  * band     (band) <U7 28B 'results'
prof.visualize()

Open bands as a stacked array#

In [7]: chunks = 256

In [8]: with Profiler() as prof, \
   ...:     ResourceProfiler(dt=0.25) as rprof, \
   ...:         CacheProfiler() as cprof:
   ...:     with gw.config.update(sensor='bgr'):
   ...:         with gw.open(
   ...:             [
   ...:                 l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4
   ...:             ],
   ...:             stack_dim='band',
   ...:             chunks=chunks
   ...:         ) as src:
   ...:             attrs = src.attrs.copy()
   ...:             t = src.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
   ...:             task = (
   ...:                 t.sel(band='blue') * t.sel(band='green') * t.sel(band='red')
   ...:             ).expand_dims(dim='band').assign_coords({'band': ['results']})
   ...:             task.attrs = attrs
   ...:             print(task)
   ...:             results = task.data.compute(num_workers=8)
   ...: 
<xarray.DataArray (band: 1, y: 1860, x: 2041)> Size: 30MB
dask.array<broadcast_to, shape=(1, 1860, 2041), dtype=float64, chunksize=(1, 256, 256), chunktype=numpy.ndarray>
Coordinates:
  * x        (x) float64 16kB 7.174e+05 7.174e+05 ... 7.785e+05 7.786e+05
  * y        (y) float64 15kB -2.777e+06 -2.777e+06 ... -2.833e+06 -2.833e+06
  * band     (band) <U7 28B 'results'
Attributes: (12/14)
    transform:           (30.0, 0.0, 717345.0, 0.0, -30.0, -2776995.0)
    crs:                 32621
    res:                 (30.0, 30.0)
    is_tiled:            1
    nodatavals:          (nan,)
    _FillValue:          nan
    ...                  ...
    filename:            ['LC08_L1TP_224078_20200518_20200518_01_RT_B2.TIF', ...
    resampling:          nearest
    AREA_OR_POINT:       Point
    sensor:              blue, green, and red
    _data_are_separate:  1
    _data_are_stacked:   1
prof.visualize()

Use GeoWombat to write a task to file#

In the previous examples, the call to dask compute() lets dask manage the task distribution. When writing results to file with geowombat.save(), individual chunks are managed by Dask. Writing results to file in a parallel environment can be performed on a laptop or a distributed compute system. With the former, a call to geowombat.save() is all that is needed. On a distributed compute system, one might instead use a distributed client to manage the task concurrency.

The code block below is a simple example that would use 8 threads within 1 process to write the task to a GeoTiff.

with gw.config.update(sensor='bgr'):
    with gw.open(
        [l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4],
        stack_dim='band',
        chunks=chunks
    ) as src:

        attrs = src.attrs.copy()

        # Mask 'no data' values and scale the data
        t = src.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)

        task = (
            t.sel(band='blue') * t.sel(band='green') * t.sel(band='red')
        ).expand_dims(dim='band').assign_coords({'band': ['results']})
        task.attrs = attrs

        # The previous example using dask compute returns
        #   the results as a numpy array.
        # results = task.data.compute(num_workers=8)

        # Use geowombat to write the task to file where
        #   chunks are processed concurrently.
        task.gw.save('results.tif', num_workers=8, compress='lzw')

The same task might be executed on a distributed system in the following way.

from geowombat.backends.dask_ import Cluster

cluster = Cluster(
    n_workers=4,
    threads_per_worker=2,
    scheduler_port=0,
    processes=False
)
cluster.start()

with gw.config.update(sensor='bgr'):
    with gw.open(
        [l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4],
        stack_dim='band',
        chunks=chunks
    ) as src:

        attrs = src.attrs.copy()

        # Mask 'no data' values and scale the data
        t = src.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)

        task = (
            t.sel(band='blue') * t.sel(band='green') * t.sel(band='red')
        ).expand_dims(dim='band').assign_coords({'band': ['results']})
        task.attrs = attrs

        # The previous example using dask compute returns
        #   the results as a numpy array.
        # results = task.data.compute(num_workers=8)

        # Use geowombat to write the task to file where
        #   chunks are processed concurrently.
        #
        # The results will be written under a distributed cluster environment.
        task.gw.save('results.tif', compress='lzw')

cluster.stop()

One could also do the following.

from dask.distributed import Client, LocalCluster

with Cluster(
    n_workers=4,
    threads_per_worker=2,
    scheduler_port=0,
    processes=False
) as cluster:
    with Client(cluster) as client:

        with gw.config.update(sensor='bgr'):
            with gw.open(
                [l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4],
                stack_dim='band',
                chunks=chunks
            ) as src:

                ...

                task.gw.save('results.tif', compress='lzw')

Use GeoWombat to gather block-level results in parallel#

If you wish to retrieve values for each block without writing the entire blocks to file, use geowombat.core.parallel.ParallelTask. In the example below, a custom function (user_func) is processed in parallel over each raster chunk/block.

import itertools
import geowombat as gw
from geowombat.core.parallel import ParallelTask

def user_func(*args):

    """
    Block-level function to be executed in parallel. The first argument is the block data,
    the second argument is the block id, and the third argument is the number of parallel
    worker threads for dask.compute().
    """

    # Gather function arguments
    data, window_id, num_workers = list(itertools.chain(*args))

    # Send the computation to Dask
    return data.data.sum().compute(scheduler='threads', num_workers=num_workers)

# Process 8 windows in parallel using threads
# Process 1 dask chunks in parallel using threads
# 8 total workers are needed
with gw.open('image.tif', chunks=512) as src:

    # Each block is a 512x512 dask array
    # with chunks of 512x512
    pt = ParallelTask(
        src,
        scheduler='threads',
        n_workers=8
    t)

    # There is only 1 chunk per block, so no
    # point in using multiple threads here
    res = pt.map(user_func, 1)

In the example above, geowombat.core.parallel.ParallelTask reads row and column chunks of src.gw.row_chunks and src.gw.col_chunks size (which is set with geowombat.open()). Let’s say we open a raster with chunks of 512x512. In the above example, the data.data.sum().compute(scheduler='threads', num_workers=num_workers) dask computation only has 1 chunk to process because the chunk sizes are the same size as the blocks being passed to user_func. We can specify a larger block size to read in parallel (the dask chunk size will remain the same) with row_chunks and col_chunks.

# Process 8 windows in parallel using threads
# Process 4 dask chunks in parallel using threads
# 32 total workers are needed
with gw.open('image.tif', chunks=512) as src:

    # Each block is a 1024x1024 dask array
    # with chunks of 512x512
    pt = ParallelTask(
        src,
        row_chunks=1024,
        col_chunks=1024,
        scheduler='threads',
        n_workers=8
    )

    # Now, each block has 4 chunks, so we can use dask
    # to process them in parallel
    res = pt.map(user_func, 4)