Skip to content

Task Runner Backends

Task Runner Backends

You can control how tasks are executed in parallel by specifying an instance of one of the following Runner Backend classes for the runner_backend argument of your Lab:

labtech.runners.ForkRunnerBackend

Bases: RunnerBackend

Runner Backend that runs each task in a forked subprocess.

The context and dependency task results are shared in-memory between each subprocess.

labtech.runners.SpawnRunnerBackend

Bases: RunnerBackend

Runner Backend that runs each task in a spawned subprocess.

The required context and dependency task results are copied/duplicated into the memory of each subprocess.

labtech.runners.ThreadRunnerBackend

Bases: RunnerBackend

Runner Backend that runs tasks asynchronously in separate threads.

Memory use is reduced by sharing the same in-memory context and dependency task results across threads.

labtech.runners.SerialRunnerBackend

Bases: RunnerBackend

Runner Backend that runs each task serially in the main process and thread.

Distributed Task Runner Backends

Labtech offers support for running tasks across multiple machines. See: Multi-Machine Clusters

Custom Task Runner Backends

You can define your own Runner Backend to execute tasks with a different form of parallelism or distributed computing platform by defining an implementation of the RunnerBackend abstract base class:

labtech.types.RunnerBackend

Bases: ABC

Factory class to construct Runner objects.

Source code in labtech/types.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class RunnerBackend(ABC):
    """Factory class to construct [Runner][labtech.types.Runner] objects."""

    @abstractmethod
    def build_runner(self, *, context: LabContext, storage: Storage, max_workers: Optional[int]) -> Runner:
        """Return a Runner prepared with the given configuration.

        Args:
            context: Additional variables made available to tasks that aren't
                considered when saving to/loading from the cache.
            storage: Where task results should be cached to.
            max_workers: The maximum number of parallel worker processes for
                running tasks.
        """
build_runner(*, context: LabContext, storage: Storage, max_workers: Optional[int]) -> Runner abstractmethod

Return a Runner prepared with the given configuration.

Parameters:

  • context (LabContext) –

    Additional variables made available to tasks that aren't considered when saving to/loading from the cache.

  • storage (Storage) –

    Where task results should be cached to.

  • max_workers (Optional[int]) –

    The maximum number of parallel worker processes for running tasks.

Source code in labtech/types.py
332
333
334
335
336
337
338
339
340
341
342
@abstractmethod
def build_runner(self, *, context: LabContext, storage: Storage, max_workers: Optional[int]) -> Runner:
    """Return a Runner prepared with the given configuration.

    Args:
        context: Additional variables made available to tasks that aren't
            considered when saving to/loading from the cache.
        storage: Where task results should be cached to.
        max_workers: The maximum number of parallel worker processes for
            running tasks.
    """

labtech.types.Runner

Bases: ABC

Manages the execution of Tasks, typically by delegating to a parallel processing framework.

Source code in labtech/types.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
class Runner(ABC):
    """Manages the execution of [Tasks][labtech.types.Task], typically
    by delegating to a parallel processing framework."""

    @abstractmethod
    def submit_task(self, task: Task, task_name: str, use_cache: bool) -> None:
        """Submit the given task object to be run and have its result cached.

        It is up to the Runner to decide when to start running the
        task (i.e. when resources become available).

        The implementation of this method should run the task by
        effectively calling:

        ```
        for dependency_task in get_direct_dependencies(task, all_identities=True):
            # Where results_map is expected to contain the TaskResult for
            # each dependency_task.
            dependency_task._set_results_map(results_map)

        current_process = multiprocessing.current_process()
        orig_process_name = current_process.name
        try:
            # If the thread name or similar is set instead of the process
            # name, then the Runner should update the handler of the global
            # labtech.utils.logger to include that instead of the process name.
            current_process.name = task_name
            return labtech.runners.base.run_or_load_task(
                task=task,
                use_cache=use_cache,
                filtered_context=task.filter_context(self.context),
                storage=self.storage,
            )
        finally:
            current_process.name = orig_process_name
        ```

        Args:
            task: The task to execute.
            task_name: Name to use when referring to the task in logs.
            use_cache: If True, the task's result should be fetched from the
                cache if it is available (fetching should still be done in a
                delegated process).

        """

    @abstractmethod
    def wait(self, *, timeout_seconds: Optional[float]) -> Iterator[tuple[Task, ResultMeta | BaseException]]:
        """Wait up to timeout_seconds or until at least one of the
        submitted tasks is done, then return an iterator of tasks in a
        done state and a list of tasks in all other states.

        Each task is returned as a pair where the first value is the
        task itself, and the second value is either:

        * For a successfully completed task: Metadata of the result.
        * For a task that fails with any BaseException descendant: The exception
          that was raised.

        Cancelled tasks are never returned.

        """

    @abstractmethod
    def cancel(self) -> None:
        """Cancel all submitted tasks that have not yet been started."""

    @abstractmethod
    def stop(self) -> None:
        """Stop all currently running tasks."""

    @abstractmethod
    def close(self) -> None:
        """Clean up any resources used by the Runner after all tasks
        are finished, cancelled, or stopped."""

    @abstractmethod
    def pending_task_count(self) -> int:
        """Returns the number of tasks that have been submitted but
        not yet cancelled or returned from a call to wait()."""

    @abstractmethod
    def get_result(self, task: Task) -> TaskResult:
        """Returns the in-memory result for a task that was
        successfully run by this Runner. Raises a KeyError for a
        result with no in-memory result."""

    @abstractmethod
    def remove_results(self, tasks: Sequence[Task]) -> None:
        """Removes the in-memory results for tasks that were
        sucessfully run by this Runner. Ignores tasks that have no
        in-memory result."""

    @abstractmethod
    def get_task_infos(self) -> list[TaskMonitorInfo]:
        """Returns a snapshot of monitoring information about each
        task that is currently running."""
submit_task(task: Task, task_name: str, use_cache: bool) -> None abstractmethod

Submit the given task object to be run and have its result cached.

It is up to the Runner to decide when to start running the task (i.e. when resources become available).

The implementation of this method should run the task by effectively calling:

for dependency_task in get_direct_dependencies(task, all_identities=True):
    # Where results_map is expected to contain the TaskResult for
    # each dependency_task.
    dependency_task._set_results_map(results_map)

current_process = multiprocessing.current_process()
orig_process_name = current_process.name
try:
    # If the thread name or similar is set instead of the process
    # name, then the Runner should update the handler of the global
    # labtech.utils.logger to include that instead of the process name.
    current_process.name = task_name
    return labtech.runners.base.run_or_load_task(
        task=task,
        use_cache=use_cache,
        filtered_context=task.filter_context(self.context),
        storage=self.storage,
    )
finally:
    current_process.name = orig_process_name

Parameters:

  • task (Task) –

    The task to execute.

  • task_name (str) –

    Name to use when referring to the task in logs.

  • use_cache (bool) –

    If True, the task's result should be fetched from the cache if it is available (fetching should still be done in a delegated process).

Source code in labtech/types.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
@abstractmethod
def submit_task(self, task: Task, task_name: str, use_cache: bool) -> None:
    """Submit the given task object to be run and have its result cached.

    It is up to the Runner to decide when to start running the
    task (i.e. when resources become available).

    The implementation of this method should run the task by
    effectively calling:

    ```
    for dependency_task in get_direct_dependencies(task, all_identities=True):
        # Where results_map is expected to contain the TaskResult for
        # each dependency_task.
        dependency_task._set_results_map(results_map)

    current_process = multiprocessing.current_process()
    orig_process_name = current_process.name
    try:
        # If the thread name or similar is set instead of the process
        # name, then the Runner should update the handler of the global
        # labtech.utils.logger to include that instead of the process name.
        current_process.name = task_name
        return labtech.runners.base.run_or_load_task(
            task=task,
            use_cache=use_cache,
            filtered_context=task.filter_context(self.context),
            storage=self.storage,
        )
    finally:
        current_process.name = orig_process_name
    ```

    Args:
        task: The task to execute.
        task_name: Name to use when referring to the task in logs.
        use_cache: If True, the task's result should be fetched from the
            cache if it is available (fetching should still be done in a
            delegated process).

    """
wait(*, timeout_seconds: Optional[float]) -> Iterator[tuple[Task, ResultMeta | BaseException]] abstractmethod

Wait up to timeout_seconds or until at least one of the submitted tasks is done, then return an iterator of tasks in a done state and a list of tasks in all other states.

Each task is returned as a pair where the first value is the task itself, and the second value is either:

  • For a successfully completed task: Metadata of the result.
  • For a task that fails with any BaseException descendant: The exception that was raised.

Cancelled tasks are never returned.

Source code in labtech/types.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
@abstractmethod
def wait(self, *, timeout_seconds: Optional[float]) -> Iterator[tuple[Task, ResultMeta | BaseException]]:
    """Wait up to timeout_seconds or until at least one of the
    submitted tasks is done, then return an iterator of tasks in a
    done state and a list of tasks in all other states.

    Each task is returned as a pair where the first value is the
    task itself, and the second value is either:

    * For a successfully completed task: Metadata of the result.
    * For a task that fails with any BaseException descendant: The exception
      that was raised.

    Cancelled tasks are never returned.

    """
cancel() -> None abstractmethod

Cancel all submitted tasks that have not yet been started.

Source code in labtech/types.py
293
294
295
@abstractmethod
def cancel(self) -> None:
    """Cancel all submitted tasks that have not yet been started."""
stop() -> None abstractmethod

Stop all currently running tasks.

Source code in labtech/types.py
297
298
299
@abstractmethod
def stop(self) -> None:
    """Stop all currently running tasks."""
close() -> None abstractmethod

Clean up any resources used by the Runner after all tasks are finished, cancelled, or stopped.

Source code in labtech/types.py
301
302
303
304
@abstractmethod
def close(self) -> None:
    """Clean up any resources used by the Runner after all tasks
    are finished, cancelled, or stopped."""
pending_task_count() -> int abstractmethod

Returns the number of tasks that have been submitted but not yet cancelled or returned from a call to wait().

Source code in labtech/types.py
306
307
308
309
@abstractmethod
def pending_task_count(self) -> int:
    """Returns the number of tasks that have been submitted but
    not yet cancelled or returned from a call to wait()."""
get_result(task: Task) -> TaskResult abstractmethod

Returns the in-memory result for a task that was successfully run by this Runner. Raises a KeyError for a result with no in-memory result.

Source code in labtech/types.py
311
312
313
314
315
@abstractmethod
def get_result(self, task: Task) -> TaskResult:
    """Returns the in-memory result for a task that was
    successfully run by this Runner. Raises a KeyError for a
    result with no in-memory result."""
remove_results(tasks: Sequence[Task]) -> None abstractmethod

Removes the in-memory results for tasks that were sucessfully run by this Runner. Ignores tasks that have no in-memory result.

Source code in labtech/types.py
317
318
319
320
321
@abstractmethod
def remove_results(self, tasks: Sequence[Task]) -> None:
    """Removes the in-memory results for tasks that were
    sucessfully run by this Runner. Ignores tasks that have no
    in-memory result."""
get_task_infos() -> list[TaskMonitorInfo] abstractmethod

Returns a snapshot of monitoring information about each task that is currently running.

Source code in labtech/types.py
323
324
325
326
@abstractmethod
def get_task_infos(self) -> list[TaskMonitorInfo]:
    """Returns a snapshot of monitoring information about each
    task that is currently running."""