GeoTask#

class geowombat.tasks.GeoTask(inputs, outputs, tasks, clean=None, config_args=None, open_args=None, func_args=None, out_args=None, log_file=None)[source]#

Bases: BaseGeoTask, GraphBuilder

A Geo-task scheduler.

Parameters:
  • inputs (dict) – The input steps.

  • outputs (dict) – The outputs.

  • tasks (tuple) – The tasks to execute.

  • clean (Optional[dict]) – Currently not implemented.

  • config_args (Optional[dict]) – The arguments for geowombat.config.update.

  • open_args (Optional[dict]) – The arguments for geowombat.open.

  • func_args (Optional[dict]) – The arguments to pass to each function in tasks.

  • out_args (Optional[dict]) – The arguments for geowombat.to_raster.

  • log_file (Optional[str]) – A file to write the log to.

Examples

>>> import geowombat as gw
>>> from geowombat.data import l8_224078_20200518_B3, l8_224078_20200518_B4, l8_224078_20200518
>>> from geowombat.tasks import GeoTask
>>>
>>> # Task a and b take 1 input file
>>> # Task c takes 2 input files
>>> # Task d takes the output of task c
>>> # Task e takes the outputs of a, b, and d
>>> inputs = {'a': l8_224078_20200518, 'b': l8_224078_20200518, 'c': (l8_224078_20200518_B3, l8_224078_20200518_B4), 'd': 'c', 'e': ('a', 'b', 'd')}
>>>
>>> # The output task names
>>> # All tasks are in-memory DataArrays
>>> outputs = {'a': 'mem|r1', 'b': 'mem|r2', 'c': 'mem|r3', 'd': 'mem|mean', 'e': 'mem|stack'}
>>>
>>> # Task a and b compute the `norm_diff`
>>> # Task c concatenates two images
>>> # Task d takes the mean of c
>>> # Task e concatenates a, b, and d
>>> tasks = (('a', gw.norm_diff), ('b', gw.norm_diff), ('c', xr.concat), ('d', xr.DataArray.mean), ('e', xr.concat))
>>>
>>> # Task a and b take band name arguments
>>> # Tasks c, d, and e take the coordinate dimension name as an argument
>>> func_args = {'a': {'b1': 'green', 'b2': 'red'}, 'b': {'b1': 'blue', 'b2': 'green'}, 'c': {'dim': 'band'}, 'd': {'dim': 'band'}, 'e': {'dim': 'band'}}
>>> open_args = {'chunks': 512}
>>> config_args = {'sensor': 'bgr', 'nodata': 0, 'scale_factor': 0.0001}
>>>
>>> # Setup a task
>>> task_mean = GeoTask(inputs, outputs, tasks, config_args=config_args, open_args=open_args, func_args=func_args)
>>>
>>> # Visualize the task
>>> task_mean.visualize()
>>>
>>> # Create a task that takes the output of task e and writes the mean to file
>>> task_write = GeoTask({'f': 'e'}, {'f': 'mean.tif'}, (('f', xr.DataArray.mean),),
>>>                      config_args=config_args,
>>>                      func_args={'f': {'dim': 'band'}},
>>>                      open_args=open_args,
>>>                      out_args={'compress': 'lzw', 'overwrite': True})
>>>
>>> # Add the new task
>>> new_task = task_mean + task_write
>>>
>>> new_task.visualize()
>>>
>>> # Write the task pipeline to file
>>> new_task.submit()

Methods

execute(task_id, task, src, task_results, ...)

Executes an individual task.

submit()

Submits a pipeline task.

copy

visualize

Methods Summary

execute(task_id, task, src, task_results, ...)

Executes an individual task.

submit()

Submits a pipeline task.

Methods Documentation

execute(task_id, task, src, task_results, attrs, **kwargs)[source]#

Executes an individual task.

Parameters:
  • task_id (str) –

  • task (func) –

  • src (DataArray | list) –

  • task_results (dict) –

  • attrs (dict) –

  • kwargs (Optional[dict]) –

submit()[source]#

Submits a pipeline task.

execute(task_id, task, src, task_results, attrs, **kwargs)[source]#

Executes an individual task.

Parameters:
  • task_id (str) –

  • task (func) –

  • src (DataArray | list) –

  • task_results (dict) –

  • attrs (dict) –

  • kwargs (Optional[dict]) –

submit()[source]#

Submits a pipeline task.