Skip to content

Labs and Tasks

In this document you'll find API reference documentation for configuring Labs and tasks. For a tutorial-style explanation, see the README.

labtech.Lab

Primary interface for configuring, running, and getting results of tasks.

A Lab can be used to run tasks with run_tasks().

Previously cached tasks can be retrieved with cached_tasks(), and can then have their results loaded with run_tasks() or be removed from the cache storage with uncache_tasks().

Source code in labtech/lab.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
class Lab:
    """Primary interface for configuring, running, and getting results of tasks.

    A Lab can be used to run tasks with [`run_tasks()`][labtech.Lab.run_tasks].

    Previously cached tasks can be retrieved with
    [`cached_tasks()`][labtech.Lab.cached_tasks], and can then have
    their results loaded with [`run_tasks()`][labtech.Lab.run_tasks]
    or be removed from the cache storage with
    [`uncache_tasks()`][labtech.Lab.uncache_tasks].

    """

    def __init__(self, *,
                 storage: Union[str, Path, None, Storage],
                 continue_on_failure: bool = True,
                 max_workers: Optional[int] = None,
                 notebook: Optional[bool] = None,
                 context: Optional[LabContext] = None,
                 runner_backend: Optional[str | RunnerBackend] = None):
        """
        Args:
            storage: Where task results should be cached to. A string or
                [`Path`](https://docs.python.org/3/library/pathlib.html#pathlib.Path)
                will be interpreted as the path to a local directory, `None`
                will result in no caching. Any [Storage][labtech.types.Storage]
                instance may also be specified.
            continue_on_failure: If `True`, exceptions raised by tasks will be
                logged, but execution of other tasks will continue.
            max_workers: The maximum number of parallel worker processes for
                running tasks. A sensible default will be determined by the
                runner_backend (`'fork'` and `'spawn'` use the number of
                processors on the machine given by `os.cpu_count()`, while
                `'thread'` uses `os.cpu_count() + 4`).
            notebook: Determines whether to use notebook-friendly graphical
                progress bars. When set to `None` (the default), labtech will
                detect whether the code is being run from an IPython notebook.
            context: A dictionary of additional variables to make available to
                tasks. The context will not be cached, so the values should not
                affect results (e.g. parallelism factors) or should be kept
                constant between runs (e.g. datasets).
            runner_backend: Controls how tasks are run in parallel. It can
                optionally be set to one of the following options:

                * `'fork'`: Uses the
                  [`ForkRunnerBackend`][labtech.runners.ForkRunnerBackend]
                  to run each task in a forked subprocess. Memory use
                  is reduced by sharing the context and dependency task
                  results between tasks with memory inherited from the
                  parent process. The default on platforms that support
                  forked Python subprocesses when `max_workers > 1`: Linux
                  and other POSIX systems, but not macOS or Windows.
                * `'spawn'`: Uses the
                  [`SpawnRunnerBackend`][labtech.runners.SpawnRunnerBackend]
                  to run each task in a spawned subprocess. The
                  context and dependency task results are
                  copied/duplicated into the memory of each
                  subprocess. The default on macOS and Windows when
                  `max_workers > 1`.
                * `'thread'`: Uses the
                  [`ThreadRunnerBackend`][labtech.runners.ThreadRunnerBackend]
                  to run each task in a separate Python thread. Because
                  [Python threads do not execute in parallel](https://docs.python.org/3/glossary.html#term-global-interpreter-lock),
                  this runner is best suited for running tasks that are
                  constrained by non-blocking IO operations (e.g. web
                  requests), or for running a single worker with live task
                  monitoring. Memory use is reduced by sharing the same
                  in-memory context and dependency task results across
                  threads. The default when `max_workers = 1`.
                * `'serial'`: Uses the
                  [`SerialRunnerBackend`][labtech.runners.SerialRunnerBackend]
                  to run each task serially in the main process and thread.
                  The task monitor will only be updated in between tasks. Mainly
                  useful when troubleshooting issues running tasks on different
                  threads and processes.
                * Any instance of a
                  [`RunnerBackend`][labtech.types.RunnerBackend],
                  allowing for custom task management implementations.

                For details on the differences between `'fork'` and
                `'spawn'` backends, see [the Python documentation on
                `multiprocessing` start methods](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods).

        """
        if isinstance(storage, str) or isinstance(storage, Path):
            storage = LocalStorage(storage)
        elif storage is None:
            storage = NullStorage()
        self._storage = storage
        self.continue_on_failure = continue_on_failure
        self.max_workers = max_workers
        self.notebook = is_ipython() if notebook is None else notebook
        if context is None:
            context = {}
        self.context = context
        if runner_backend is None:
            start_methods = get_all_start_methods()
            if self.max_workers == 1:
                runner_backend = ThreadRunnerBackend()
            elif 'fork' in start_methods:
                runner_backend = ForkRunnerBackend()
            elif 'spawn' in start_methods:
                runner_backend = SpawnRunnerBackend()
            else:
                raise LabError(('Default \'fork\' and \'spawn\' multiprocessing runner '
                                'backends are not supported on your system.'
                                'Please specify a system-compatible runner_backend.'))
        elif isinstance(runner_backend, str):
            if runner_backend == 'fork':
                runner_backend = ForkRunnerBackend()
            elif runner_backend == 'spawn':
                runner_backend = SpawnRunnerBackend()
            elif runner_backend == 'serial':
                runner_backend = SerialRunnerBackend()
            elif runner_backend == 'thread':
                runner_backend = ThreadRunnerBackend()
            else:
                raise LabError(f'Unrecognised runner_backend: {runner_backend}')
        self.runner_backend = runner_backend

    def run_tasks(self, tasks: Sequence[TaskT], *,
                  bust_cache: bool = False,
                  disable_progress: bool = False,
                  disable_top: bool = False,
                  top_format: str = '$name $status since $start_time CPU: $cpu MEM: $rss',
                  top_sort: str = 'start_time',
                  top_n: int = 10) -> dict[TaskT, Any]:
        """Run the given tasks with as much process parallelism as possible.
        Loads task results from the cache storage where possible and
        caches results of executed tasks.

        Any attribute of a task that is itself a task object is
        considered a "nested task", and will be executed or loaded so
        that it's result is made available to its parent task. If the
        same task is nested inside multiple task objects, it will only
        be executed/loaded once.

        As well as returning the results, each task's result will be
        assigned to a `result` attribute on the task itself.

        Args:
            tasks: The tasks to execute. Each should be an instance of a class
                decorated with [`labtech.task`][labtech.task].
            bust_cache: If `True`, no task results will be loaded from the
                cache storage; all tasks will be re-executed.
            disable_progress: If `True`, do not display a tqdm progress bar
                tracking task execution.
            disable_top: If `True`, do not display the list of top active tasks.
            top_format: Format for each top active task. Follows the format
                rules for
                [template strings](https://docs.python.org/3/library/string.html#template-strings)
                and may include any of the following attributes for
                substitution:

                * `name`: The task's name displayed in logs.
                * `pid`: The task's primary process id.
                * `status`: Whether the task is being run or loaded from cache.
                * `start_time`: The time the task's primary process started.
                * `children`: The number of child tasks of the task's primary
                  process.
                * `threads`: The number of active CPU threads for the task
                  (including across any child processes).
                * `cpu`: The CPU percentage (1 core = 100%) being used by the
                  task (including across any child processes).
                * `rss`: The resident set size (RSS) memory percentage being
                  used by the task (including across any child processes). RSS
                  is the primary measure of memory usage.
                * `vms`: The virtual memory size (VMS) percentage being used by
                  the task (including across any child processes).
            top_sort: Sort order for the top active tasks. Can be any of the
                attributes available for use in `top_format`. If the string is
                preceded by a `-`, the sort order will be reversed. Defaults to
                showing oldest tasks first.
            top_n: The maximum number of top active tasks to display.

        Returns:
            A dictionary mapping each of the provided tasks to its
                corresponding result.

        """
        check_tasks(tasks)

        for task in tasks:
            if task.code_version != task.current_code_version:
                raise LabError(
                    (f'`{repr(task)}` cannot be run, as it has code_version={task.code_version!r} '
                     f'while the current implementation of {task.__class__.__name__} has '
                     f'code_version={task.current_code_version!r}. You should construct new '
                     f'{task.__class__.__name__} tasks to run instead of running tasks loaded from cache.')
                )


        coordinator = TaskCoordinator(
            self,
            bust_cache=bust_cache,
            disable_progress=disable_progress,
            disable_top=disable_top,
            top_format=top_format,
            top_sort=top_sort,
            top_n=top_n,
        )
        results = coordinator.run(tasks)

        failed_tasks = {task for task in tasks if task not in results}
        if failed_tasks:
            raise LabError(f'Failed to complete {len(failed_tasks)} submitted task(s)')

        # Return results in the same order as tasks
        return {task: results[task] for task in tasks}

    def run_task(self, task: Task[ResultT], **kwargs) -> ResultT:
        """Run a single task and return its result. Supports the same keyword
        arguments as `run_tasks`.

        NOTE: If you have many tasks to run, you should use
        `run_tasks` instead to parallelise their execution.

        Returns:
            The result of the given task.

        """
        results = self.run_tasks([task], **kwargs)
        return results[task]

    def cached_tasks(self, task_types: Sequence[Type[TaskT]]) -> Sequence[TaskT]:
        """Returns all task instances present in the Lab's cache storage for
        the given `task_types`, each of which should be a task class
        decorated with [`labtech.task`][labtech.task].

        Does not load task results from the cache storage, but they
        can be loaded by calling
        [`run_tasks()`][labtech.Lab.run_tasks] with the returned task
        instances.

        """
        check_task_types(task_types)
        keys = self._storage.find_keys()
        tasks = []
        for key in keys:
            for task_type in task_types:
                try:
                    task = task_type._lt.cache.load_task(self._storage, task_type, key)
                except TaskNotFound:
                    pass
                else:
                    tasks.append(task)
                    break
        return tasks

    def is_cached(self, task: Task) -> bool:
        """Checks if a result is present for given task in the Lab's cache
        storage."""
        check_tasks([task])
        return task._lt.cache.is_cached(self._storage, task)

    def uncache_tasks(self, tasks: Sequence[Task]):
        """Removes cached results for the given tasks from the Lab's cache
        storage."""
        check_tasks(tasks)
        for task in tasks:
            if self.is_cached(task):
                task._lt.cache.delete(self._storage, task)

__init__(*, storage: Union[str, Path, None, Storage], continue_on_failure: bool = True, max_workers: Optional[int] = None, notebook: Optional[bool] = None, context: Optional[LabContext] = None, runner_backend: Optional[str | RunnerBackend] = None)

Parameters:

  • storage (Union[str, Path, None, Storage]) –

    Where task results should be cached to. A string or Path will be interpreted as the path to a local directory, None will result in no caching. Any Storage instance may also be specified.

  • continue_on_failure (bool, default: True ) –

    If True, exceptions raised by tasks will be logged, but execution of other tasks will continue.

  • max_workers (Optional[int], default: None ) –

    The maximum number of parallel worker processes for running tasks. A sensible default will be determined by the runner_backend ('fork' and 'spawn' use the number of processors on the machine given by os.cpu_count(), while 'thread' uses os.cpu_count() + 4).

  • notebook (Optional[bool], default: None ) –

    Determines whether to use notebook-friendly graphical progress bars. When set to None (the default), labtech will detect whether the code is being run from an IPython notebook.

  • context (Optional[LabContext], default: None ) –

    A dictionary of additional variables to make available to tasks. The context will not be cached, so the values should not affect results (e.g. parallelism factors) or should be kept constant between runs (e.g. datasets).

  • runner_backend (Optional[str | RunnerBackend], default: None ) –

    Controls how tasks are run in parallel. It can optionally be set to one of the following options:

    • 'fork': Uses the ForkRunnerBackend to run each task in a forked subprocess. Memory use is reduced by sharing the context and dependency task results between tasks with memory inherited from the parent process. The default on platforms that support forked Python subprocesses when max_workers > 1: Linux and other POSIX systems, but not macOS or Windows.
    • 'spawn': Uses the SpawnRunnerBackend to run each task in a spawned subprocess. The context and dependency task results are copied/duplicated into the memory of each subprocess. The default on macOS and Windows when max_workers > 1.
    • 'thread': Uses the ThreadRunnerBackend to run each task in a separate Python thread. Because Python threads do not execute in parallel, this runner is best suited for running tasks that are constrained by non-blocking IO operations (e.g. web requests), or for running a single worker with live task monitoring. Memory use is reduced by sharing the same in-memory context and dependency task results across threads. The default when max_workers = 1.
    • 'serial': Uses the SerialRunnerBackend to run each task serially in the main process and thread. The task monitor will only be updated in between tasks. Mainly useful when troubleshooting issues running tasks on different threads and processes.
    • Any instance of a RunnerBackend, allowing for custom task management implementations.

    For details on the differences between 'fork' and 'spawn' backends, see the Python documentation on multiprocessing start methods.

Source code in labtech/lab.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def __init__(self, *,
             storage: Union[str, Path, None, Storage],
             continue_on_failure: bool = True,
             max_workers: Optional[int] = None,
             notebook: Optional[bool] = None,
             context: Optional[LabContext] = None,
             runner_backend: Optional[str | RunnerBackend] = None):
    """
    Args:
        storage: Where task results should be cached to. A string or
            [`Path`](https://docs.python.org/3/library/pathlib.html#pathlib.Path)
            will be interpreted as the path to a local directory, `None`
            will result in no caching. Any [Storage][labtech.types.Storage]
            instance may also be specified.
        continue_on_failure: If `True`, exceptions raised by tasks will be
            logged, but execution of other tasks will continue.
        max_workers: The maximum number of parallel worker processes for
            running tasks. A sensible default will be determined by the
            runner_backend (`'fork'` and `'spawn'` use the number of
            processors on the machine given by `os.cpu_count()`, while
            `'thread'` uses `os.cpu_count() + 4`).
        notebook: Determines whether to use notebook-friendly graphical
            progress bars. When set to `None` (the default), labtech will
            detect whether the code is being run from an IPython notebook.
        context: A dictionary of additional variables to make available to
            tasks. The context will not be cached, so the values should not
            affect results (e.g. parallelism factors) or should be kept
            constant between runs (e.g. datasets).
        runner_backend: Controls how tasks are run in parallel. It can
            optionally be set to one of the following options:

            * `'fork'`: Uses the
              [`ForkRunnerBackend`][labtech.runners.ForkRunnerBackend]
              to run each task in a forked subprocess. Memory use
              is reduced by sharing the context and dependency task
              results between tasks with memory inherited from the
              parent process. The default on platforms that support
              forked Python subprocesses when `max_workers > 1`: Linux
              and other POSIX systems, but not macOS or Windows.
            * `'spawn'`: Uses the
              [`SpawnRunnerBackend`][labtech.runners.SpawnRunnerBackend]
              to run each task in a spawned subprocess. The
              context and dependency task results are
              copied/duplicated into the memory of each
              subprocess. The default on macOS and Windows when
              `max_workers > 1`.
            * `'thread'`: Uses the
              [`ThreadRunnerBackend`][labtech.runners.ThreadRunnerBackend]
              to run each task in a separate Python thread. Because
              [Python threads do not execute in parallel](https://docs.python.org/3/glossary.html#term-global-interpreter-lock),
              this runner is best suited for running tasks that are
              constrained by non-blocking IO operations (e.g. web
              requests), or for running a single worker with live task
              monitoring. Memory use is reduced by sharing the same
              in-memory context and dependency task results across
              threads. The default when `max_workers = 1`.
            * `'serial'`: Uses the
              [`SerialRunnerBackend`][labtech.runners.SerialRunnerBackend]
              to run each task serially in the main process and thread.
              The task monitor will only be updated in between tasks. Mainly
              useful when troubleshooting issues running tasks on different
              threads and processes.
            * Any instance of a
              [`RunnerBackend`][labtech.types.RunnerBackend],
              allowing for custom task management implementations.

            For details on the differences between `'fork'` and
            `'spawn'` backends, see [the Python documentation on
            `multiprocessing` start methods](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods).

    """
    if isinstance(storage, str) or isinstance(storage, Path):
        storage = LocalStorage(storage)
    elif storage is None:
        storage = NullStorage()
    self._storage = storage
    self.continue_on_failure = continue_on_failure
    self.max_workers = max_workers
    self.notebook = is_ipython() if notebook is None else notebook
    if context is None:
        context = {}
    self.context = context
    if runner_backend is None:
        start_methods = get_all_start_methods()
        if self.max_workers == 1:
            runner_backend = ThreadRunnerBackend()
        elif 'fork' in start_methods:
            runner_backend = ForkRunnerBackend()
        elif 'spawn' in start_methods:
            runner_backend = SpawnRunnerBackend()
        else:
            raise LabError(('Default \'fork\' and \'spawn\' multiprocessing runner '
                            'backends are not supported on your system.'
                            'Please specify a system-compatible runner_backend.'))
    elif isinstance(runner_backend, str):
        if runner_backend == 'fork':
            runner_backend = ForkRunnerBackend()
        elif runner_backend == 'spawn':
            runner_backend = SpawnRunnerBackend()
        elif runner_backend == 'serial':
            runner_backend = SerialRunnerBackend()
        elif runner_backend == 'thread':
            runner_backend = ThreadRunnerBackend()
        else:
            raise LabError(f'Unrecognised runner_backend: {runner_backend}')
    self.runner_backend = runner_backend

run_tasks(tasks: Sequence[TaskT], *, bust_cache: bool = False, disable_progress: bool = False, disable_top: bool = False, top_format: str = '$name $status since $start_time CPU: $cpu MEM: $rss', top_sort: str = 'start_time', top_n: int = 10) -> dict[TaskT, Any]

Run the given tasks with as much process parallelism as possible. Loads task results from the cache storage where possible and caches results of executed tasks.

Any attribute of a task that is itself a task object is considered a "nested task", and will be executed or loaded so that it's result is made available to its parent task. If the same task is nested inside multiple task objects, it will only be executed/loaded once.

As well as returning the results, each task's result will be assigned to a result attribute on the task itself.

Parameters:

  • tasks (Sequence[TaskT]) –

    The tasks to execute. Each should be an instance of a class decorated with labtech.task.

  • bust_cache (bool, default: False ) –

    If True, no task results will be loaded from the cache storage; all tasks will be re-executed.

  • disable_progress (bool, default: False ) –

    If True, do not display a tqdm progress bar tracking task execution.

  • disable_top (bool, default: False ) –

    If True, do not display the list of top active tasks.

  • top_format (str, default: '$name $status since $start_time CPU: $cpu MEM: $rss' ) –

    Format for each top active task. Follows the format rules for template strings and may include any of the following attributes for substitution:

    • name: The task's name displayed in logs.
    • pid: The task's primary process id.
    • status: Whether the task is being run or loaded from cache.
    • start_time: The time the task's primary process started.
    • children: The number of child tasks of the task's primary process.
    • threads: The number of active CPU threads for the task (including across any child processes).
    • cpu: The CPU percentage (1 core = 100%) being used by the task (including across any child processes).
    • rss: The resident set size (RSS) memory percentage being used by the task (including across any child processes). RSS is the primary measure of memory usage.
    • vms: The virtual memory size (VMS) percentage being used by the task (including across any child processes).
  • top_sort (str, default: 'start_time' ) –

    Sort order for the top active tasks. Can be any of the attributes available for use in top_format. If the string is preceded by a -, the sort order will be reversed. Defaults to showing oldest tasks first.

  • top_n (int, default: 10 ) –

    The maximum number of top active tasks to display.

Returns:

  • dict[TaskT, Any]

    A dictionary mapping each of the provided tasks to its corresponding result.

Source code in labtech/lab.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
def run_tasks(self, tasks: Sequence[TaskT], *,
              bust_cache: bool = False,
              disable_progress: bool = False,
              disable_top: bool = False,
              top_format: str = '$name $status since $start_time CPU: $cpu MEM: $rss',
              top_sort: str = 'start_time',
              top_n: int = 10) -> dict[TaskT, Any]:
    """Run the given tasks with as much process parallelism as possible.
    Loads task results from the cache storage where possible and
    caches results of executed tasks.

    Any attribute of a task that is itself a task object is
    considered a "nested task", and will be executed or loaded so
    that it's result is made available to its parent task. If the
    same task is nested inside multiple task objects, it will only
    be executed/loaded once.

    As well as returning the results, each task's result will be
    assigned to a `result` attribute on the task itself.

    Args:
        tasks: The tasks to execute. Each should be an instance of a class
            decorated with [`labtech.task`][labtech.task].
        bust_cache: If `True`, no task results will be loaded from the
            cache storage; all tasks will be re-executed.
        disable_progress: If `True`, do not display a tqdm progress bar
            tracking task execution.
        disable_top: If `True`, do not display the list of top active tasks.
        top_format: Format for each top active task. Follows the format
            rules for
            [template strings](https://docs.python.org/3/library/string.html#template-strings)
            and may include any of the following attributes for
            substitution:

            * `name`: The task's name displayed in logs.
            * `pid`: The task's primary process id.
            * `status`: Whether the task is being run or loaded from cache.
            * `start_time`: The time the task's primary process started.
            * `children`: The number of child tasks of the task's primary
              process.
            * `threads`: The number of active CPU threads for the task
              (including across any child processes).
            * `cpu`: The CPU percentage (1 core = 100%) being used by the
              task (including across any child processes).
            * `rss`: The resident set size (RSS) memory percentage being
              used by the task (including across any child processes). RSS
              is the primary measure of memory usage.
            * `vms`: The virtual memory size (VMS) percentage being used by
              the task (including across any child processes).
        top_sort: Sort order for the top active tasks. Can be any of the
            attributes available for use in `top_format`. If the string is
            preceded by a `-`, the sort order will be reversed. Defaults to
            showing oldest tasks first.
        top_n: The maximum number of top active tasks to display.

    Returns:
        A dictionary mapping each of the provided tasks to its
            corresponding result.

    """
    check_tasks(tasks)

    for task in tasks:
        if task.code_version != task.current_code_version:
            raise LabError(
                (f'`{repr(task)}` cannot be run, as it has code_version={task.code_version!r} '
                 f'while the current implementation of {task.__class__.__name__} has '
                 f'code_version={task.current_code_version!r}. You should construct new '
                 f'{task.__class__.__name__} tasks to run instead of running tasks loaded from cache.')
            )


    coordinator = TaskCoordinator(
        self,
        bust_cache=bust_cache,
        disable_progress=disable_progress,
        disable_top=disable_top,
        top_format=top_format,
        top_sort=top_sort,
        top_n=top_n,
    )
    results = coordinator.run(tasks)

    failed_tasks = {task for task in tasks if task not in results}
    if failed_tasks:
        raise LabError(f'Failed to complete {len(failed_tasks)} submitted task(s)')

    # Return results in the same order as tasks
    return {task: results[task] for task in tasks}

run_task(task: Task[ResultT], **kwargs) -> ResultT

Run a single task and return its result. Supports the same keyword arguments as run_tasks.

NOTE: If you have many tasks to run, you should use run_tasks instead to parallelise their execution.

Returns:

  • ResultT

    The result of the given task.

Source code in labtech/lab.py
496
497
498
499
500
501
502
503
504
505
506
507
508
def run_task(self, task: Task[ResultT], **kwargs) -> ResultT:
    """Run a single task and return its result. Supports the same keyword
    arguments as `run_tasks`.

    NOTE: If you have many tasks to run, you should use
    `run_tasks` instead to parallelise their execution.

    Returns:
        The result of the given task.

    """
    results = self.run_tasks([task], **kwargs)
    return results[task]

cached_tasks(task_types: Sequence[Type[TaskT]]) -> Sequence[TaskT]

Returns all task instances present in the Lab's cache storage for the given task_types, each of which should be a task class decorated with labtech.task.

Does not load task results from the cache storage, but they can be loaded by calling run_tasks() with the returned task instances.

Source code in labtech/lab.py
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
def cached_tasks(self, task_types: Sequence[Type[TaskT]]) -> Sequence[TaskT]:
    """Returns all task instances present in the Lab's cache storage for
    the given `task_types`, each of which should be a task class
    decorated with [`labtech.task`][labtech.task].

    Does not load task results from the cache storage, but they
    can be loaded by calling
    [`run_tasks()`][labtech.Lab.run_tasks] with the returned task
    instances.

    """
    check_task_types(task_types)
    keys = self._storage.find_keys()
    tasks = []
    for key in keys:
        for task_type in task_types:
            try:
                task = task_type._lt.cache.load_task(self._storage, task_type, key)
            except TaskNotFound:
                pass
            else:
                tasks.append(task)
                break
    return tasks

is_cached(task: Task) -> bool

Checks if a result is present for given task in the Lab's cache storage.

Source code in labtech/lab.py
535
536
537
538
539
def is_cached(self, task: Task) -> bool:
    """Checks if a result is present for given task in the Lab's cache
    storage."""
    check_tasks([task])
    return task._lt.cache.is_cached(self._storage, task)

uncache_tasks(tasks: Sequence[Task])

Removes cached results for the given tasks from the Lab's cache storage.

Source code in labtech/lab.py
541
542
543
544
545
546
547
def uncache_tasks(self, tasks: Sequence[Task]):
    """Removes cached results for the given tasks from the Lab's cache
    storage."""
    check_tasks(tasks)
    for task in tasks:
        if self.is_cached(task):
            task._lt.cache.delete(self._storage, task)

labtech.task(*args, code_version: Optional[str] = None, cache: Union[CacheDefault, None, Cache] = CACHE_DEFAULT, max_parallel: Optional[int] = None, mlflow_run: bool = False)

Class decorator for defining task type classes.

Task types are frozen [dataclasses], and attribute definitions should capture all parameters of the task type. Parameter attributes can be any of the following types:

  • Simple scalar types: str, bool, float, int, None
  • Any member of an Enum type. Referring to members of an Enum can be used to parameterise a task with a value that does not have one of the types above (e.g. a Pandas/Numpy dataset).
  • Task types: A task parameter is a "nested task" that will be executed before its parent so that it may make use of the nested result.
  • Collections of any of these types: list, tuple, dict, frozendict
  • Dictionaries may only contain string keys.
  • Note: Mutable list and dict collections will be converted to immutable tuple and frozendict collections.

The task type is expected to define a run() method that takes no arguments (other than self). The run() method should execute the task as parameterised by the task's attributes and return the result of the task.

Example usage:

@labtech.task
class Experiment:
    seed: int
    multiplier: int

    def run(self):
        return self.seed * self.multiplier.value

experiment = Experiment(seed=1, multiplier=2)

You can also provide arguments to the decorator to control caching, cache-busting versioning, parallelism, and mlflow tracking:

@labtech.task(cache=None, code_version='v1', max_parallel=1, mlflow_run=True)
class Experiment:
    ...

    def run(self):
        ...

If a post_init(self) method is defined, it will be called after the task object is initialised (analagously to the __post_init__ method of a dataclass). Because task types are frozen dataclasses, attributes can only be assigned to the task with object.__setattr__(self, attribute_name, attribute_value).

If a filter_context(self, context: LabContext) -> LabContext method is defined, it will be called to transform the context provided to each task. This can be useful for selecting subsets of large contexts in order to reduce data transferred to non-forked subprocesses or other kinds of processes in parallel processing frameworks. The filtering may take into account the values of the task's attributes. If filter_context() is not defined, the full context will be provided to each task.

A runner_options(self) -> dict[str, Any] method may be defined to provide a dictionary of options to further control the behaviour of specific types of runner backend - refer to the documentation of each runner backend for supported options. The implementation may make use of the task's parameter values.

Parameters:

  • cache (Union[CacheDefault, None, Cache], default: CACHE_DEFAULT ) –

    The Cache that controls how task results are formatted for caching. Can be set to an instance of any Cache class, or None to disable caching of this type of task. Defaults to a PickleCache.

  • code_version (Optional[str], default: None ) –

    Optional identifier for the version of task's implementation. Task results will only be loaded from the cache where they have a matching code_version, so you should change the code_version whenever the definition of the task's run method or any code it depends on changes in a way that will impact the result. Any string value can be used, e.g. 'v1', '2025-04-22', etc.

  • max_parallel (Optional[int], default: None ) –

    The maximum number of instances of this task type that are allowed to run simultaneously in separate sub-processes. Useful to set if running too many instances of this particular task simultaneously will exhaust system memory or processing resources.

  • mlflow_run (bool, default: False ) –

    If True, the execution of each instance of this task type will be wrapped with mlflow.start_run(), tags the run with labtech_task_type equal to the task class name, and all parameters will be logged with mlflow.log_param(). You can make additional mlflow logging calls from the task's run() method.

Source code in labtech/tasks.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def task(*args,
         code_version: Optional[str] = None,
         cache: Union[CacheDefault, None, Cache] = CACHE_DEFAULT,
         max_parallel: Optional[int] = None,
         mlflow_run: bool = False):
    """Class decorator for defining task type classes.

    Task types are frozen [`dataclasses`], and attribute definitions
    should capture all parameters of the task type. Parameter
    attributes can be any of the following types:

    * Simple scalar types: `str`, `bool`, `float`, `int`, `None`
    * Any member of an `Enum` type. Referring to members of an `Enum` can be
      used to parameterise a task with a value that does not have one of the
      types above (e.g. a Pandas/Numpy dataset).
    * Task types: A task parameter is a "nested task" that will be executed
      before its parent so that it may make use of the nested result.
    * Collections of any of these types: `list`, `tuple`,
      `dict`, [`frozendict`](https://pypi.org/project/frozendict/)
      * Dictionaries may only contain string keys.
      * Note: Mutable `list` and `dict` collections will be converted to
        immutable `tuple` and [`frozendict`](https://pypi.org/project/frozendict/)
        collections.

    The task type is expected to define a `run()` method that takes no
    arguments (other than `self`). The `run()` method should execute
    the task as parameterised by the task's attributes and return the
    result of the task.

    Example usage:

    ```python
    @labtech.task
    class Experiment:
        seed: int
        multiplier: int

        def run(self):
            return self.seed * self.multiplier.value

    experiment = Experiment(seed=1, multiplier=2)
    ```

    You can also provide arguments to the decorator to control
    caching, cache-busting versioning, parallelism, and
    [mlflow](https://mlflow.org/docs/latest/tracking.html#quickstart)
    tracking:

    ```python
    @labtech.task(cache=None, code_version='v1', max_parallel=1, mlflow_run=True)
    class Experiment:
        ...

        def run(self):
            ...
    ```

    If a `post_init(self)` method is defined, it will be called after
    the task object is initialised (analagously to the `__post_init__`
    method of a dataclass). Because task types are frozen dataclasses,
    attributes can only be assigned to the task with
    `object.__setattr__(self, attribute_name, attribute_value)`.

    If a `filter_context(self, context: LabContext) -> LabContext`
    method is defined, it will be called to transform the context
    provided to each task. This can be useful for selecting subsets of
    large contexts in order to reduce data transferred to non-forked
    subprocesses or other kinds of processes in parallel processing
    frameworks. The filtering may take into account the values of the
    task's attributes. If `filter_context()` is not defined, the full
    context will be provided to each task.

    A `runner_options(self) -> dict[str, Any]` method may be defined
    to provide a dictionary of options to further control the
    behaviour of specific types of runner backend - refer to the
    documentation of each runner backend for supported options. The
    implementation may make use of the task's parameter values.

    Args:
        cache: The Cache that controls how task results are formatted for
            caching. Can be set to an instance of any
            [`Cache`](caching.md#caches) class, or `None` to disable caching
            of this type of task. Defaults to a
            [`PickleCache`][labtech.cache.PickleCache].
        code_version: Optional identifier for the version of task's
            implementation. Task results will only be loaded from the
            cache where they have a matching code_version, so you
            should change the code_version whenever the definition of
            the task's `run` method or any code it depends on changes
            in a way that will impact the result. Any string value can
            be used, e.g. 'v1', '2025-04-22', etc.
        max_parallel: The maximum number of instances of this task type that
            are allowed to run simultaneously in separate sub-processes. Useful
            to set if running too many instances of this particular task
            simultaneously will exhaust system memory or processing resources.
        mlflow_run: If True, the execution of each instance of this task type
            will be wrapped with `mlflow.start_run()`, tags the run with
            `labtech_task_type` equal to the task class name, and all parameters
            will be logged with `mlflow.log_param()`. You can make additional
            mlflow logging calls from the task's `run()` method.

    """

    def decorator(cls):
        nonlocal cache

        if not is_task_type(cls):
            for reserved_attr in _RESERVED_ATTRS:
                if hasattr(cls, reserved_attr):
                    raise AttributeError(f"Task type already defines reserved attribute '{reserved_attr}'.")

        post_init = getattr(cls, 'post_init', None)
        cls.__post_init__ = _task_post_init

        cls = dataclass(frozen=True, eq=True, order=True)(cls)

        run_func = getattr(cls, 'run', None)
        if not callable(run_func):
            raise NotImplementedError(f"Task type '{cls.__name__}' must define a 'run' method")

        if cache is CACHE_DEFAULT:
            cache = PickleCache()
        elif cache is None:
            cache = NullCache()

        cls._lt = TaskInfo(
            cache=cast(Cache, cache),
            orig_post_init=post_init,
            max_parallel=max_parallel,
            mlflow_run=mlflow_run,
            current_code_version=code_version,
        )
        cls.__getstate__ = _task__getstate__
        cls.__setstate__ = _task__setstate__
        cls._set_results_map = _task_set_results_map
        cls._set_result_meta = _task_set_result_meta
        cls._set_code_version = _task_set_code_version
        cls.current_code_version = property(_task_current_code_version)
        cls.cache_key = property(_task_cache_key)
        cls.result = property(_task_result)
        cls.set_context = _task_set_context
        if not hasattr(cls, 'filter_context'):
            cls.filter_context = _task_filter_context_default
        if not hasattr(cls, 'runner_options'):
            cls.runner_options = _task_runner_options_default
        return cls

    if len(args) > 0 and isclass(args[0]):
        return decorator(args[0], *args[1:])
    else:
        return decorator

Any task type decorated with labtech.task will provide the following attributes and methods:

labtech.types.Task dataclass

Bases: Protocol, Generic[CovariantResultT]

Interface provided by any class that is decorated by labtech.task.

Source code in labtech/types.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@dataclass
class Task(Protocol, Generic[CovariantResultT]):
    """Interface provided by any class that is decorated by
    [`labtech.task`][labtech.task]."""
    _lt: TaskInfo
    _is_task: Literal[True]
    _results_map: Optional[ResultsMap]
    _cache_key: Optional[str]
    context: Optional[LabContext]
    """Context variables from the Lab that can be accessed when the task is running."""
    result_meta: Optional[ResultMeta]
    """Metadata about the execution of the task."""
    code_version: Optional[str]
    """Identifier for the version of task's implementation. If this task was
    loaded from cache, it may have a different value to that currently specified
    in the decorator."""

    def _set_results_map(self, results_map: ResultsMap):
        pass

    def _set_result_meta(self, result_meta: ResultMeta):
        pass

    @property
    def current_code_version(self) -> Optional[str]:
        """Identifier for the current version of task's implementation
        as specified in the [`labtech.task`][labtech.task] decorator."""

    @property
    def cache_key(self) -> str:
        """The key that uniquely identifies the location for this task
        within cache storage."""

    @property
    def result(self) -> CovariantResultT:
        """The result executed/loaded for this task. If no result is
        available in memory, accessing this property raises a `TaskError`."""

    def set_context(self, context: LabContext):
        """Set the context that is made available to the task while it is
        running."""

    def runner_options(self) -> dict[str, Any]:
        """User-overridable method to a dictionary of options to
        further control the behaviour of specific types of runner
        backend - refer to the documentation of each runner backend
        for supported options. The implementation may make use of the
        task's parameter values.

        """

    def filter_context(self, context: LabContext) -> LabContext:
        """User-overridable method to filter/transform the context to
        be provided to the task. The default implementation provides
        the full context to the task. The filtering may take into
        account the values of the task's attributes.

        This can be useful for selecting subsets of large contexts in
        order to reduce data transferred to non-forked subprocesses or
        other kinds of processes in parallel processing frameworks.

        """

    def run(self):
        """User-provided method that executes the task parameterised by the
        attributes of the task.

        Usually executed by [`Lab.run_tasks()`][labtech.Lab.run_tasks]
        instead of being called directly.

        """

context: Optional[LabContext] instance-attribute

Context variables from the Lab that can be accessed when the task is running.

result_meta: Optional[ResultMeta] instance-attribute

Metadata about the execution of the task.

code_version: Optional[str] instance-attribute

Identifier for the version of task's implementation. If this task was loaded from cache, it may have a different value to that currently specified in the decorator.

current_code_version: Optional[str] property

Identifier for the current version of task's implementation as specified in the labtech.task decorator.

cache_key: str property

The key that uniquely identifies the location for this task within cache storage.

result: CovariantResultT property

The result executed/loaded for this task. If no result is available in memory, accessing this property raises a TaskError.

set_context(context: LabContext)

Set the context that is made available to the task while it is running.

Source code in labtech/types.py
104
105
106
def set_context(self, context: LabContext):
    """Set the context that is made available to the task while it is
    running."""

runner_options() -> dict[str, Any]

User-overridable method to a dictionary of options to further control the behaviour of specific types of runner backend - refer to the documentation of each runner backend for supported options. The implementation may make use of the task's parameter values.

Source code in labtech/types.py
108
109
110
111
112
113
114
115
def runner_options(self) -> dict[str, Any]:
    """User-overridable method to a dictionary of options to
    further control the behaviour of specific types of runner
    backend - refer to the documentation of each runner backend
    for supported options. The implementation may make use of the
    task's parameter values.

    """

filter_context(context: LabContext) -> LabContext

User-overridable method to filter/transform the context to be provided to the task. The default implementation provides the full context to the task. The filtering may take into account the values of the task's attributes.

This can be useful for selecting subsets of large contexts in order to reduce data transferred to non-forked subprocesses or other kinds of processes in parallel processing frameworks.

Source code in labtech/types.py
117
118
119
120
121
122
123
124
125
126
127
def filter_context(self, context: LabContext) -> LabContext:
    """User-overridable method to filter/transform the context to
    be provided to the task. The default implementation provides
    the full context to the task. The filtering may take into
    account the values of the task's attributes.

    This can be useful for selecting subsets of large contexts in
    order to reduce data transferred to non-forked subprocesses or
    other kinds of processes in parallel processing frameworks.

    """

run()

User-provided method that executes the task parameterised by the attributes of the task.

Usually executed by Lab.run_tasks() instead of being called directly.

Source code in labtech/types.py
129
130
131
132
133
134
135
136
def run(self):
    """User-provided method that executes the task parameterised by the
    attributes of the task.

    Usually executed by [`Lab.run_tasks()`][labtech.Lab.run_tasks]
    instead of being called directly.

    """

labtech.types.ResultT = TypeVar('ResultT') module-attribute

Type variable for result returned by the run method of a Task.

labtech.types.ResultMeta dataclass

Metadata about the execution of a task. If the task is loaded from cache, the metadata is also loaded from the cache.

Source code in labtech/types.py
38
39
40
41
42
43
44
45
@dataclass(frozen=True)
class ResultMeta:
    """Metadata about the execution of a task. If the task is loaded from
    cache, the metadata is also loaded from the cache."""
    start: Optional[datetime]
    """The timestamp when the task's execution began."""
    duration: Optional[timedelta]
    """The time that the task took to execute."""

start: Optional[datetime] instance-attribute

The timestamp when the task's execution began.

duration: Optional[timedelta] instance-attribute

The time that the task took to execute.

labtech.types.LabContext = dict[str, Any] module-attribute

labtech.is_task_type(cls)

Returns True if the given cls is a class decorated with labtech.task.

Source code in labtech/types.py
142
143
144
145
def is_task_type(cls):
    """Returns `True` if the given `cls` is a class decorated with
    [`labtech.task`][labtech.task]."""
    return isclass(cls) and isinstance(getattr(cls, '_lt', None), TaskInfo)

labtech.is_task(obj)

Returns True if the given obj is an instance of a task class.

Source code in labtech/types.py
148
149
150
def is_task(obj):
    """Returns `True` if the given `obj` is an instance of a task class."""
    return is_task_type(type(obj)) and hasattr(obj, '_is_task')

labtech.logger = get_logger() module-attribute

logging.Logger object that labtech logs events to during task execution.

Can be used to customize logging and to write additional logs from task run() methods:

import logging
from labtech import logger

# Change verbosity of logging
logger.setLevel(logging.ERROR)

# Logging methods to call from inside your task's run() method:
logger.info('Useful info from task: ...')
logger.warning('Warning from task: ...')
logger.error('Error from task: ...')