Distributed processing#
One of the key features of geowombat
is the ability to write dask
tasks to file in a concurrent workflow. Below are
several examples illustrating this process.
Import GeoWombat and Dask
In [1]: import geowombat as gw
In [2]: from geowombat.data import l8_224078_20200518, l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4
Dask diagnostics
In [3]: from dask.diagnostics import Profiler, ResourceProfiler, CacheProfiler
In [4]: from dask.diagnostics import visualize
Use dask
to compute with concurrent workers#
Note
These examples illustrate the dask
concurrency over an xarray.DataArray`
task. An example showing
how to write results to file concurrently are shown at the bottom of the page.
Chunk sizes of 64x64 require many reads for a simple calculation.
with Profiler() as prof, \
ResourceProfiler(dt=0.25) as rprof, \
CacheProfiler() as cprof:
# Set the sensor name
with gw.config.update(sensor='bgr'):
with gw.open(l8_224078_20200518, chunks=64) as src:
# Normalized difference index
ndi = gw.norm_diff(src, 'green', 'red', nodata=0, scale_factor=0.0001)
# Send the task to dask
results = ndi.data.compute(num_workers=4)
prof.visualize()
Chunk sizes of 256x256 reduce the number of file reads.
with Profiler() as prof, \
ResourceProfiler(dt=0.25) as rprof, \
CacheProfiler() as cprof:
# Set the sensor name
with gw.config.update(sensor='bgr'):
with gw.open(l8_224078_20200518, chunks=256) as src:
# Normalized difference index
ndi = gw.norm_diff(src, 'green', 'red', nodata=0, scale_factor=0.0001)
# Send the task to dask
results = ndi.data.compute(num_workers=4)
prof.visualize()
Increase the number of parallel workers
Note
The appropriate choice of chunk size is challenging and takes some practice. Start by reading Dask Best Practices. We find, however, that with some experimentation you can find a good chunk size for common tasks. One simple approach is to choose a chunk size that fills around 75-95% of memory on your system. Accidentally exceeding 100% of memory leads to significant slow-downs.
If you decide to manually calculate how large chunks should be to utilize all resources, keep in mind that “Dask will often have as many chunks in memory as twice the number of active threads” Orientation of chunks is also critical, especially if dealing with multiple bands or a time series of images. Chunks in this case should have three dimensions ([bands, y, x] or [time, bands, y, x]). So, a five-period image stack with a single band might have a chunk size of [5, 1, 256, 256]. Proper orientation will reduce the need to read the same data more than once.
with Profiler() as prof, \
ResourceProfiler(dt=0.25) as rprof, \
CacheProfiler() as cprof:
# Set the sensor name
with gw.config.update(sensor='bgr'):
with gw.open(l8_224078_20200518, chunks=256) as src:
# Normalized difference index
ndi = gw.norm_diff(src, 'green', 'red', nodata=0, scale_factor=0.0001)
# Send the task to dask
results = ndi.data.compute(num_workers=8)
prof.visualize()
Increase the complexity of the parallel task#
Open bands as separate files#
In [5]: chunks = 256
In [6]: with Profiler() as prof, \
...: ResourceProfiler(dt=0.25) as rprof, \
...: CacheProfiler() as cprof:
...: with gw.open(l8_224078_20200518_B2, band_names=['blue'], chunks=chunks) as src_b2, \
...: gw.open(l8_224078_20200518_B3, band_names=['green'], chunks=chunks) as src_b3, \
...: gw.open(l8_224078_20200518_B4, band_names=['red'], chunks=chunks) as src_b4:
...: t2 = src_b2.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
...: t3 = src_b3.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
...: t4 = src_b4.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
...: task = (
...: t2.sel(band='blue') * t3.sel(band='green') * t4.sel(band='red')
...: ).expand_dims(dim='band').assign_coords({'band': ['results']})
...: print(task)
...: results = task.data.compute(num_workers=8)
...:
<xarray.DataArray (band: 1, y: 1860, x: 2041)> Size: 30MB
dask.array<broadcast_to, shape=(1, 1860, 2041), dtype=float64, chunksize=(1, 256, 256), chunktype=numpy.ndarray>
Coordinates:
* x (x) float64 16kB 7.174e+05 7.174e+05 ... 7.785e+05 7.786e+05
* y (y) float64 15kB -2.777e+06 -2.777e+06 ... -2.833e+06 -2.833e+06
* band (band) <U7 28B 'results'
prof.visualize()
Open bands as a stacked array#
In [7]: chunks = 256
In [8]: with Profiler() as prof, \
...: ResourceProfiler(dt=0.25) as rprof, \
...: CacheProfiler() as cprof:
...: with gw.config.update(sensor='bgr'):
...: with gw.open(
...: [
...: l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4
...: ],
...: stack_dim='band',
...: chunks=chunks
...: ) as src:
...: attrs = src.attrs.copy()
...: t = src.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
...: task = (
...: t.sel(band='blue') * t.sel(band='green') * t.sel(band='red')
...: ).expand_dims(dim='band').assign_coords({'band': ['results']})
...: task.attrs = attrs
...: print(task)
...: results = task.data.compute(num_workers=8)
...:
<xarray.DataArray (band: 1, y: 1860, x: 2041)> Size: 30MB
dask.array<broadcast_to, shape=(1, 1860, 2041), dtype=float64, chunksize=(1, 256, 256), chunktype=numpy.ndarray>
Coordinates:
* x (x) float64 16kB 7.174e+05 7.174e+05 ... 7.785e+05 7.786e+05
* y (y) float64 15kB -2.777e+06 -2.777e+06 ... -2.833e+06 -2.833e+06
* band (band) <U7 28B 'results'
Attributes: (12/14)
transform: (30.0, 0.0, 717345.0, 0.0, -30.0, -2776995.0)
crs: 32621
res: (30.0, 30.0)
is_tiled: 1
nodatavals: (nan,)
_FillValue: nan
... ...
AREA_OR_POINT: Point
filename: ['LC08_L1TP_224078_20200518_20200518_01_RT_B2.TIF', ...
resampling: nearest
sensor: blue, green, and red
_data_are_separate: 1
_data_are_stacked: 1
prof.visualize()
Use GeoWombat to write a task to file#
In the previous examples, the call to dask
compute()
lets dask
manage the task distribution. When writing
results to file with geowombat.save()
, individual chunks are managed by Dask. Writing results to file in a
parallel environment can be performed on a laptop or a distributed compute system. With the
former, a call to geowombat.save()
is all that is needed. On a distributed compute system, one might instead use
a distributed client to manage the task concurrency.
The code block below is a simple example that would use 8 threads within 1 process to write the task to a GeoTiff.
with gw.config.update(sensor='bgr'):
with gw.open(
[l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4],
stack_dim='band',
chunks=chunks
) as src:
attrs = src.attrs.copy()
# Mask 'no data' values and scale the data
t = src.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
task = (
t.sel(band='blue') * t.sel(band='green') * t.sel(band='red')
).expand_dims(dim='band').assign_coords({'band': ['results']})
task.attrs = attrs
# The previous example using dask compute returns
# the results as a numpy array.
# results = task.data.compute(num_workers=8)
# Use geowombat to write the task to file where
# chunks are processed concurrently.
task.gw.save('results.tif', num_workers=8, compress='lzw')
The same task might be executed on a distributed system in the following way.
from geowombat.backends.dask_ import Cluster
cluster = Cluster(
n_workers=4,
threads_per_worker=2,
scheduler_port=0,
processes=False
)
cluster.start()
with gw.config.update(sensor='bgr'):
with gw.open(
[l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4],
stack_dim='band',
chunks=chunks
) as src:
attrs = src.attrs.copy()
# Mask 'no data' values and scale the data
t = src.gw.set_nodata(0, 65535, (0, 1), 'float64', scale_factor=0.0001)
task = (
t.sel(band='blue') * t.sel(band='green') * t.sel(band='red')
).expand_dims(dim='band').assign_coords({'band': ['results']})
task.attrs = attrs
# The previous example using dask compute returns
# the results as a numpy array.
# results = task.data.compute(num_workers=8)
# Use geowombat to write the task to file where
# chunks are processed concurrently.
#
# The results will be written under a distributed cluster environment.
task.gw.save('results.tif', compress='lzw')
cluster.stop()
One could also do the following.
from dask.distributed import Client, LocalCluster
with Cluster(
n_workers=4,
threads_per_worker=2,
scheduler_port=0,
processes=False
) as cluster:
with Client(cluster) as client:
with gw.config.update(sensor='bgr'):
with gw.open(
[l8_224078_20200518_B2, l8_224078_20200518_B3, l8_224078_20200518_B4],
stack_dim='band',
chunks=chunks
) as src:
...
task.gw.save('results.tif', compress='lzw')
Use GeoWombat to gather block-level results in parallel#
If you wish to retrieve values for each block without writing the entire blocks to file,
use geowombat.core.parallel.ParallelTask
. In the example below, a custom function (user_func
) is
processed in parallel over each raster chunk/block.
import itertools
import geowombat as gw
from geowombat.core.parallel import ParallelTask
def user_func(*args):
"""
Block-level function to be executed in parallel. The first argument is the block data,
the second argument is the block id, and the third argument is the number of parallel
worker threads for dask.compute().
"""
# Gather function arguments
data, window_id, num_workers = list(itertools.chain(*args))
# Send the computation to Dask
return data.data.sum().compute(scheduler='threads', num_workers=num_workers)
# Process 8 windows in parallel using threads
# Process 1 dask chunks in parallel using threads
# 8 total workers are needed
with gw.open('image.tif', chunks=512) as src:
# Each block is a 512x512 dask array
# with chunks of 512x512
pt = ParallelTask(
src,
scheduler='threads',
n_workers=8
t)
# There is only 1 chunk per block, so no
# point in using multiple threads here
res = pt.map(user_func, 1)
In the example above, geowombat.core.parallel.ParallelTask
reads row and column chunks of src.gw.row_chunks
and src.gw.col_chunks
size (which is set with geowombat.open()
). Let’s say we open a raster with chunks of 512x512.
In the above example, the data.data.sum().compute(scheduler='threads', num_workers=num_workers)
dask
computation only
has 1 chunk to process because the chunk sizes are the same size as the blocks being passed to user_func
. We can
specify a larger block size to read in parallel (the dask chunk size will remain the same) with row_chunks and col_chunks.
# Process 8 windows in parallel using threads
# Process 4 dask chunks in parallel using threads
# 32 total workers are needed
with gw.open('image.tif', chunks=512) as src:
# Each block is a 1024x1024 dask array
# with chunks of 512x512
pt = ParallelTask(
src,
row_chunks=1024,
col_chunks=1024,
scheduler='threads',
n_workers=8
)
# Now, each block has 4 chunks, so we can use dask
# to process them in parallel
res = pt.map(user_func, 4)