Skip to content

Labs and Tasks

In this document you'll find API reference documentation for configuring Labs and tasks. For a tutorial-style explanation, see the README.

labtech.Lab

Primary interface for configuring, running, and getting results of tasks.

A Lab can be used to run tasks with run_tasks().

Previously cached tasks can be retrieved with cached_tasks(), and can then have their results loaded with run_tasks() or be removed from the cache storage with uncache_tasks().

Source code in labtech/lab.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
class Lab:
    """Primary interface for configuring, running, and getting results of tasks.

    A Lab can be used to run tasks with [`run_tasks()`][labtech.Lab.run_tasks].

    Previously cached tasks can be retrieved with
    [`cached_tasks()`][labtech.Lab.cached_tasks], and can then have
    their results loaded with [`run_tasks()`][labtech.Lab.run_tasks]
    or be removed from the cache storage with
    [`uncache_tasks()`][labtech.Lab.uncache_tasks].

    """

    def __init__(self, *,
                 storage: Union[str, Path, None, Storage],
                 continue_on_failure: bool = True,
                 max_workers: Optional[int] = None,
                 notebook: Optional[bool] = None,
                 context: Optional[dict[str, Any]] = None):
        """
        Args:
            storage: Where task results should be cached to. A string or
                [`Path`](https://docs.python.org/3/library/pathlib.html#pathlib.Path)
                will be interpreted as the path to a local directory, `None`
                will result in no caching. Any [Storage][labtech.types.Storage]
                instance may also be specified.
            continue_on_failure: If `True`, exceptions raised by tasks will be
                logged, but execution of other tasks will continue.
            max_workers: The maximum number of parallel worker processes for
                running tasks. Uses the same default as
                `concurrent.futures.ProcessPoolExecutor`: the number of
                processors on the machine. When `max_workers=1`, all tasks will
                be run in the main process, without multi-processing.
            notebook: Determines whether to use notebook-friendly graphical
                progress bars. When set to `None` (the default), labtech will
                detect whether the code is being run from an IPython notebook.
            context: A dictionary of additional variables to make available to
                tasks. The context will not be cached, so the values should not
                affect results (e.g. parallelism factors) or should be kept
                constant between runs (e.g. datasets).
        """
        if isinstance(storage, str) or isinstance(storage, Path):
            storage = LocalStorage(storage)
        elif storage is None:
            storage = NullStorage()
        self._storage = storage
        self.continue_on_failure = continue_on_failure
        self.max_workers = max_workers
        self.notebook = is_ipython() if notebook is None else notebook
        if context is None:
            context = {}
        self.context = context

    def run_tasks(self, tasks: Sequence[TaskT], *,
                  bust_cache: bool = False,
                  keep_nested_results: bool = False,
                  disable_progress: bool = False,
                  disable_top: bool = False,
                  top_format: str = '$name $status since $start_time CPU: $cpu MEM: $rss',
                  top_sort: str = 'start_time',
                  top_n: int = 10) -> Dict[TaskT, Any]:
        """Run the given tasks with as much process parallelism as possible.
        Loads task results from the cache storage where possible and
        caches results of executed tasks.

        Any attribute of a task that is itself a task object is
        considered a "nested task", and will be executed or loaded so
        that it's result is made available to its parent task. If the
        same task is nested inside multiple task objects, it will only
        be executed/loaded once.

        As well as returning the results, each task's result will be
        assigned to a `result` attribute on the task itself (including
        nested tasks when `keep_nested_results` is `True`).

        Args:
            tasks: The tasks to execute. Each should be an instance of a class
                decorated with [`labtech.task`][labtech.task].
            bust_cache: If `True`, no task results will be loaded from the
                cache storage; all tasks will be re-executed.
            keep_nested_results: If `False`, results of nested tasks that were
                executed or loaded in order to complete the provided tasks will
                be cleared from memory once they are no longer needed.
            disable_progress: If `True`, do not display a tqdm progress bar
                tracking task execution.
            disable_top: If `True`, do not display the list of top active tasks.
            top_format: Format for each top active task. Follows the format
                rules for
                [template strings](https://docs.python.org/3/library/string.html#template-strings)
                and may include any of the following attributes for
                substitution:

                * `name`: The task's name displayed in logs.
                * `pid`: The task's primary process id.
                * `status`: Whether the task is being run or loaded from cache.
                * `start_time`: The time the task's primary process started.
                * `children`: The number of child tasks of the task's primary
                  process.
                * `threads`: The number of active CPU threads for the task
                  (including across any child processes).
                * `cpu`: The CPU percentage (1 core = 100%) being used by the
                  task (including across any child processes).
                * `rss`: The resident set size (RSS) memory percentage being
                  used by the task (including across any child processes). RSS
                  is the primary measure of memory usage.
                * `vms`: The virtual memory size (VMS) percentage being used by
                  the task (including across any child processes).
            top_sort: Sort order for the top active tasks. Can be any of the
                attributes available for use in `top_format`. If the string is
                preceded by a `-`, the sort order will be reversed. Defaults to
                showing oldest tasks first.
            top_n: The maximum number of top active tasks to display.

        Returns:
            A dictionary mapping each of the provided tasks to its
                corresponding result.

        """
        check_tasks(tasks)
        runner = TaskRunner(self,
                            bust_cache=bust_cache,
                            keep_nested_results=keep_nested_results,
                            disable_progress=disable_progress,
                            disable_top=disable_top,
                            top_format=top_format,
                            top_sort=top_sort,
                            top_n=top_n)
        results = runner.run(tasks)
        # Return results in the same order as tasks
        return {task: results[task] for task in tasks}

    def run_task(self, task: Task[ResultT], **kwargs) -> ResultT:
        """Run a single task and return its result. Supports the same keyword
        arguments as `run_tasks`.

        NOTE: If you have many tasks to run, you should use
        `run_tasks` instead to parallelise their execution.

        Returns:
            The result of the given task.

        """
        results = self.run_tasks([task], **kwargs)
        return results[task]

    def cached_tasks(self, task_types: Sequence[Type[TaskT]]) -> Sequence[TaskT]:
        """Returns all task instances present in the Lab's cache storage for
        the given `task_types`, each of which should be a task class
        decorated with [`labtech.task`][labtech.task].

        Does not load task results from the cache storage, but they
        can be loaded by calling
        [`run_tasks()`][labtech.Lab.run_tasks] with the returned task
        instances.

        """
        check_task_types(task_types)
        keys = self._storage.find_keys()
        tasks = []
        for key in keys:
            for task_type in task_types:
                try:
                    task = task_type._lt.cache.load_task(self._storage, task_type, key)
                except TaskNotFound:
                    pass
                else:
                    tasks.append(task)
                    break
        return tasks

    def is_cached(self, task: Task) -> bool:
        """Checks if a result is present for given task in the Lab's cache
        storage."""
        check_tasks([task])
        return task._lt.cache.is_cached(self._storage, task)

    def uncache_tasks(self, tasks: Sequence[Task]):
        """Removes cached results for the given tasks from the Lab's cache
        storage."""
        check_tasks(tasks)
        for task in tasks:
            if self.is_cached(task):
                task._lt.cache.delete(self._storage, task)

__init__(*, storage: Union[str, Path, None, Storage], continue_on_failure: bool = True, max_workers: Optional[int] = None, notebook: Optional[bool] = None, context: Optional[dict[str, Any]] = None)

Parameters:

  • storage (Union[str, Path, None, Storage]) –

    Where task results should be cached to. A string or Path will be interpreted as the path to a local directory, None will result in no caching. Any Storage instance may also be specified.

  • continue_on_failure (bool, default: True ) –

    If True, exceptions raised by tasks will be logged, but execution of other tasks will continue.

  • max_workers (Optional[int], default: None ) –

    The maximum number of parallel worker processes for running tasks. Uses the same default as concurrent.futures.ProcessPoolExecutor: the number of processors on the machine. When max_workers=1, all tasks will be run in the main process, without multi-processing.

  • notebook (Optional[bool], default: None ) –

    Determines whether to use notebook-friendly graphical progress bars. When set to None (the default), labtech will detect whether the code is being run from an IPython notebook.

  • context (Optional[dict[str, Any]], default: None ) –

    A dictionary of additional variables to make available to tasks. The context will not be cached, so the values should not affect results (e.g. parallelism factors) or should be kept constant between runs (e.g. datasets).

Source code in labtech/lab.py
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def __init__(self, *,
             storage: Union[str, Path, None, Storage],
             continue_on_failure: bool = True,
             max_workers: Optional[int] = None,
             notebook: Optional[bool] = None,
             context: Optional[dict[str, Any]] = None):
    """
    Args:
        storage: Where task results should be cached to. A string or
            [`Path`](https://docs.python.org/3/library/pathlib.html#pathlib.Path)
            will be interpreted as the path to a local directory, `None`
            will result in no caching. Any [Storage][labtech.types.Storage]
            instance may also be specified.
        continue_on_failure: If `True`, exceptions raised by tasks will be
            logged, but execution of other tasks will continue.
        max_workers: The maximum number of parallel worker processes for
            running tasks. Uses the same default as
            `concurrent.futures.ProcessPoolExecutor`: the number of
            processors on the machine. When `max_workers=1`, all tasks will
            be run in the main process, without multi-processing.
        notebook: Determines whether to use notebook-friendly graphical
            progress bars. When set to `None` (the default), labtech will
            detect whether the code is being run from an IPython notebook.
        context: A dictionary of additional variables to make available to
            tasks. The context will not be cached, so the values should not
            affect results (e.g. parallelism factors) or should be kept
            constant between runs (e.g. datasets).
    """
    if isinstance(storage, str) or isinstance(storage, Path):
        storage = LocalStorage(storage)
    elif storage is None:
        storage = NullStorage()
    self._storage = storage
    self.continue_on_failure = continue_on_failure
    self.max_workers = max_workers
    self.notebook = is_ipython() if notebook is None else notebook
    if context is None:
        context = {}
    self.context = context

run_tasks(tasks: Sequence[TaskT], *, bust_cache: bool = False, keep_nested_results: bool = False, disable_progress: bool = False, disable_top: bool = False, top_format: str = '$name $status since $start_time CPU: $cpu MEM: $rss', top_sort: str = 'start_time', top_n: int = 10) -> Dict[TaskT, Any]

Run the given tasks with as much process parallelism as possible. Loads task results from the cache storage where possible and caches results of executed tasks.

Any attribute of a task that is itself a task object is considered a "nested task", and will be executed or loaded so that it's result is made available to its parent task. If the same task is nested inside multiple task objects, it will only be executed/loaded once.

As well as returning the results, each task's result will be assigned to a result attribute on the task itself (including nested tasks when keep_nested_results is True).

Parameters:

  • tasks (Sequence[TaskT]) –

    The tasks to execute. Each should be an instance of a class decorated with labtech.task.

  • bust_cache (bool, default: False ) –

    If True, no task results will be loaded from the cache storage; all tasks will be re-executed.

  • keep_nested_results (bool, default: False ) –

    If False, results of nested tasks that were executed or loaded in order to complete the provided tasks will be cleared from memory once they are no longer needed.

  • disable_progress (bool, default: False ) –

    If True, do not display a tqdm progress bar tracking task execution.

  • disable_top (bool, default: False ) –

    If True, do not display the list of top active tasks.

  • top_format (str, default: '$name $status since $start_time CPU: $cpu MEM: $rss' ) –

    Format for each top active task. Follows the format rules for template strings and may include any of the following attributes for substitution:

    • name: The task's name displayed in logs.
    • pid: The task's primary process id.
    • status: Whether the task is being run or loaded from cache.
    • start_time: The time the task's primary process started.
    • children: The number of child tasks of the task's primary process.
    • threads: The number of active CPU threads for the task (including across any child processes).
    • cpu: The CPU percentage (1 core = 100%) being used by the task (including across any child processes).
    • rss: The resident set size (RSS) memory percentage being used by the task (including across any child processes). RSS is the primary measure of memory usage.
    • vms: The virtual memory size (VMS) percentage being used by the task (including across any child processes).
  • top_sort (str, default: 'start_time' ) –

    Sort order for the top active tasks. Can be any of the attributes available for use in top_format. If the string is preceded by a -, the sort order will be reversed. Defaults to showing oldest tasks first.

  • top_n (int, default: 10 ) –

    The maximum number of top active tasks to display.

Returns:

  • Dict[TaskT, Any]

    A dictionary mapping each of the provided tasks to its corresponding result.

Source code in labtech/lab.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
def run_tasks(self, tasks: Sequence[TaskT], *,
              bust_cache: bool = False,
              keep_nested_results: bool = False,
              disable_progress: bool = False,
              disable_top: bool = False,
              top_format: str = '$name $status since $start_time CPU: $cpu MEM: $rss',
              top_sort: str = 'start_time',
              top_n: int = 10) -> Dict[TaskT, Any]:
    """Run the given tasks with as much process parallelism as possible.
    Loads task results from the cache storage where possible and
    caches results of executed tasks.

    Any attribute of a task that is itself a task object is
    considered a "nested task", and will be executed or loaded so
    that it's result is made available to its parent task. If the
    same task is nested inside multiple task objects, it will only
    be executed/loaded once.

    As well as returning the results, each task's result will be
    assigned to a `result` attribute on the task itself (including
    nested tasks when `keep_nested_results` is `True`).

    Args:
        tasks: The tasks to execute. Each should be an instance of a class
            decorated with [`labtech.task`][labtech.task].
        bust_cache: If `True`, no task results will be loaded from the
            cache storage; all tasks will be re-executed.
        keep_nested_results: If `False`, results of nested tasks that were
            executed or loaded in order to complete the provided tasks will
            be cleared from memory once they are no longer needed.
        disable_progress: If `True`, do not display a tqdm progress bar
            tracking task execution.
        disable_top: If `True`, do not display the list of top active tasks.
        top_format: Format for each top active task. Follows the format
            rules for
            [template strings](https://docs.python.org/3/library/string.html#template-strings)
            and may include any of the following attributes for
            substitution:

            * `name`: The task's name displayed in logs.
            * `pid`: The task's primary process id.
            * `status`: Whether the task is being run or loaded from cache.
            * `start_time`: The time the task's primary process started.
            * `children`: The number of child tasks of the task's primary
              process.
            * `threads`: The number of active CPU threads for the task
              (including across any child processes).
            * `cpu`: The CPU percentage (1 core = 100%) being used by the
              task (including across any child processes).
            * `rss`: The resident set size (RSS) memory percentage being
              used by the task (including across any child processes). RSS
              is the primary measure of memory usage.
            * `vms`: The virtual memory size (VMS) percentage being used by
              the task (including across any child processes).
        top_sort: Sort order for the top active tasks. Can be any of the
            attributes available for use in `top_format`. If the string is
            preceded by a `-`, the sort order will be reversed. Defaults to
            showing oldest tasks first.
        top_n: The maximum number of top active tasks to display.

    Returns:
        A dictionary mapping each of the provided tasks to its
            corresponding result.

    """
    check_tasks(tasks)
    runner = TaskRunner(self,
                        bust_cache=bust_cache,
                        keep_nested_results=keep_nested_results,
                        disable_progress=disable_progress,
                        disable_top=disable_top,
                        top_format=top_format,
                        top_sort=top_sort,
                        top_n=top_n)
    results = runner.run(tasks)
    # Return results in the same order as tasks
    return {task: results[task] for task in tasks}

run_task(task: Task[ResultT], **kwargs) -> ResultT

Run a single task and return its result. Supports the same keyword arguments as run_tasks.

NOTE: If you have many tasks to run, you should use run_tasks instead to parallelise their execution.

Returns:

  • ResultT

    The result of the given task.

Source code in labtech/lab.py
564
565
566
567
568
569
570
571
572
573
574
575
576
def run_task(self, task: Task[ResultT], **kwargs) -> ResultT:
    """Run a single task and return its result. Supports the same keyword
    arguments as `run_tasks`.

    NOTE: If you have many tasks to run, you should use
    `run_tasks` instead to parallelise their execution.

    Returns:
        The result of the given task.

    """
    results = self.run_tasks([task], **kwargs)
    return results[task]

cached_tasks(task_types: Sequence[Type[TaskT]]) -> Sequence[TaskT]

Returns all task instances present in the Lab's cache storage for the given task_types, each of which should be a task class decorated with labtech.task.

Does not load task results from the cache storage, but they can be loaded by calling run_tasks() with the returned task instances.

Source code in labtech/lab.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def cached_tasks(self, task_types: Sequence[Type[TaskT]]) -> Sequence[TaskT]:
    """Returns all task instances present in the Lab's cache storage for
    the given `task_types`, each of which should be a task class
    decorated with [`labtech.task`][labtech.task].

    Does not load task results from the cache storage, but they
    can be loaded by calling
    [`run_tasks()`][labtech.Lab.run_tasks] with the returned task
    instances.

    """
    check_task_types(task_types)
    keys = self._storage.find_keys()
    tasks = []
    for key in keys:
        for task_type in task_types:
            try:
                task = task_type._lt.cache.load_task(self._storage, task_type, key)
            except TaskNotFound:
                pass
            else:
                tasks.append(task)
                break
    return tasks

is_cached(task: Task) -> bool

Checks if a result is present for given task in the Lab's cache storage.

Source code in labtech/lab.py
603
604
605
606
607
def is_cached(self, task: Task) -> bool:
    """Checks if a result is present for given task in the Lab's cache
    storage."""
    check_tasks([task])
    return task._lt.cache.is_cached(self._storage, task)

uncache_tasks(tasks: Sequence[Task])

Removes cached results for the given tasks from the Lab's cache storage.

Source code in labtech/lab.py
609
610
611
612
613
614
615
def uncache_tasks(self, tasks: Sequence[Task]):
    """Removes cached results for the given tasks from the Lab's cache
    storage."""
    check_tasks(tasks)
    for task in tasks:
        if self.is_cached(task):
            task._lt.cache.delete(self._storage, task)

labtech.task(*args, cache: Union[CacheDefault, None, Cache] = CACHE_DEFAULT, max_parallel: Optional[int] = None, mlflow_run: bool = False)

Class decorator for defining task type classes.

Task types are frozen [dataclasses], and attribute definitions should capture all parameters of the task type. Parameter attributes can be any of the following types:

  • Simple scalar types: str, bool, float, int, None
  • Any member of an Enum type. Referring to members of an Enum can be used to parameterise a task with a value that does not have one of the types above (e.g. a Pandas/Numpy dataset).
  • Task types: A task parameter is a "nested task" that will be executed before its parent so that it may make use of the nested result.
  • Collections of any of these types: list, tuple, dict, frozendict
  • Dictionaries may only contain string keys.
  • Note: Mutable list and dict collections will be converted to immutable tuple and frozendict collections.

The task type is expected to define a run() method that takes no arguments (other than self). The run() method should execute the task as parameterised by the task's attributes and return the result of the task.

Example usage:

@labtech.task
class Experiment:
    seed: int
    multiplier: int

    def run(self):
        return self.seed * self.multiplier.value

experiment = Experiment(seed=1, multiplier=2)

You can also provide arguments to the decorator to control caching, parallelism, and mlflow tracking:

@labtech.task(cache=None, max_parallel=1, mlflow_run=True)
class Experiment:
    ...

    def run(self):
        ...

If a post_init(self) method is defined, it will be called after the task object is initialised (analagously to the __post_init__ method of a dataclass). Because task types are frozen dataclasses, attributes can only be assigned to the task with object.__setattr__(self, attribute_name, attribute_value).

Parameters:

  • cache (Union[CacheDefault, None, Cache], default: CACHE_DEFAULT ) –

    The Cache that controls how task results are formatted for caching. Can be set to an instance of any Cache class, or None to disable caching of this type of task. Defaults to a PickleCache.

  • max_parallel (Optional[int], default: None ) –

    The maximum number of instances of this task type that are allowed to run simultaneously in separate sub-processes. Useful to set if running too many instances of this particular task simultaneously will exhaust system memory or processing resources. When max_parallel=1, all tasks will be run in the main process, without multi-processing.

  • mlflow_run (bool, default: False ) –

    If True, the execution of each instance of this task type will be wrapped with mlflow.start_run(), tags the run with labtech_task_type equal to the task class name, and all parameters will be logged with mlflow.log_param(). You can make additional mlflow logging calls from the task's run() method.

Source code in labtech/tasks.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def task(*args,
         cache: Union[CacheDefault, None, Cache] = CACHE_DEFAULT,
         max_parallel: Optional[int] = None,
         mlflow_run: bool = False):
    """Class decorator for defining task type classes.

    Task types are frozen [`dataclasses`], and attribute definitions
    should capture all parameters of the task type. Parameter
    attributes can be any of the following types:

    * Simple scalar types: `str`, `bool`, `float`, `int`, `None`
    * Any member of an `Enum` type. Referring to members of an `Enum` can be
      used to parameterise a task with a value that does not have one of the
      types above (e.g. a Pandas/Numpy dataset).
    * Task types: A task parameter is a "nested task" that will be executed
      before its parent so that it may make use of the nested result.
    * Collections of any of these types: `list`, `tuple`,
      `dict`, [`frozendict`](https://pypi.org/project/frozendict/)
      * Dictionaries may only contain string keys.
      * Note: Mutable `list` and `dict` collections will be converted to
        immutable `tuple` and [`frozendict`](https://pypi.org/project/frozendict/)
        collections.

    The task type is expected to define a `run()` method that takes no
    arguments (other than `self`). The `run()` method should execute
    the task as parameterised by the task's attributes and return the
    result of the task.

    Example usage:

    ```python
    @labtech.task
    class Experiment:
        seed: int
        multiplier: int

        def run(self):
            return self.seed * self.multiplier.value

    experiment = Experiment(seed=1, multiplier=2)
    ```

    You can also provide arguments to the decorator to control caching,
    parallelism, and [mlflow](https://mlflow.org/docs/latest/tracking.html#quickstart)
    tracking:

    ```python
    @labtech.task(cache=None, max_parallel=1, mlflow_run=True)
    class Experiment:
        ...

        def run(self):
            ...
    ```

    If a `post_init(self)` method is defined, it will be called after
    the task object is initialised (analagously to the `__post_init__`
    method of a dataclass). Because task types are frozen dataclasses,
    attributes can only be assigned to the task with
    `object.__setattr__(self, attribute_name, attribute_value)`.

    Args:
        cache: The Cache that controls how task results are formatted for
            caching. Can be set to an instance of any
            [`Cache`](caching.md#caches) class, or `None` to disable caching
            of this type of task. Defaults to a
            [`PickleCache`][labtech.cache.PickleCache].
        max_parallel: The maximum number of instances of this task type that
            are allowed to run simultaneously in separate sub-processes. Useful
            to set if running too many instances of this particular task
            simultaneously will exhaust system memory or processing resources.
            When `max_parallel=1`, all tasks will be run in the main process,
            without multi-processing.
        mlflow_run: If True, the execution of each instance of this task type
            will be wrapped with `mlflow.start_run()`, tags the run with
            `labtech_task_type` equal to the task class name, and all parameters
            will be logged with `mlflow.log_param()`. You can make additional
            mlflow logging calls from the task's `run()` method.

    """

    def decorator(cls):
        nonlocal cache

        if not is_task_type(cls):
            for reserved_attr in _RESERVED_ATTRS:
                if hasattr(cls, reserved_attr):
                    raise AttributeError(f"Task type already defines reserved attribute '{reserved_attr}'.")

        post_init = getattr(cls, 'post_init', None)
        cls.__post_init__ = _task_post_init

        cls = dataclass(frozen=True, eq=True, order=True)(cls)

        run_func = getattr(cls, 'run', None)
        if not callable(run_func):
            raise NotImplementedError(f"Task type '{cls.__name__}' must define a 'run' method")

        if cache is CACHE_DEFAULT:
            cache = PickleCache()
        elif cache is None:
            cache = NullCache()

        cls._lt = TaskInfo(
            cache=cast(Cache, cache),
            orig_post_init=post_init,
            max_parallel=max_parallel,
            mlflow_run=mlflow_run,
        )
        cls.__getstate__ = _task__getstate__
        cls.__setstate__ = _task__setstate__
        cls._set_results_map = _task_set_results_map
        cls._set_result_meta = _task_set_result_meta
        cls.result = property(_task_result)
        cls.set_context = _task_set_context
        return cls

    if len(args) > 0 and isclass(args[0]):
        return decorator(args[0], *args[1:])
    else:
        return decorator

Any task type decorated with labtech.task will provide the following attributes and methods:

labtech.types.Task dataclass

Bases: Protocol, Generic[CovariantResultT]

Interface provided by any class that is decorated by labtech.task.

Source code in labtech/types.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@dataclass
class Task(Protocol, Generic[CovariantResultT]):
    """Interface provided by any class that is decorated by
    [`labtech.task`][labtech.task]."""
    _lt: TaskInfo
    _is_task: Literal[True]
    _results_map: Optional[ResultsMap]
    cache_key: str
    """The key that uniquely identifies the location for this task within cache storage."""
    context: Optional[dict[str, Any]]
    """Context variables from the Lab that can be accessed when the task is running."""
    result_meta: Optional[ResultMeta]
    """Metadata about the execution of the task."""

    def _set_results_map(self, results_map: ResultsMap):
        pass

    def _set_result_meta(self, result_meta: ResultMeta):
        pass

    @property
    def result(self) -> CovariantResultT:
        """Returns the result executed/loaded for this task. If no result is
        available in memory, accessing this property raises a `TaskError`."""

    def set_context(self, context: dict[str, Any]):
        """Set the context that is made available to the task while it is
        running."""

    def run(self):
        """User-provided method that executes the task parameterised by the
        attributes of the task.

        Usually executed by [`Lab.run_tasks()`][labtech.Lab.run_tasks]
        instead of being called directly.

        """

cache_key: str instance-attribute

The key that uniquely identifies the location for this task within cache storage.

context: Optional[dict[str, Any]] instance-attribute

Context variables from the Lab that can be accessed when the task is running.

result_meta: Optional[ResultMeta] instance-attribute

Metadata about the execution of the task.

result: CovariantResultT property

Returns the result executed/loaded for this task. If no result is available in memory, accessing this property raises a TaskError.

set_context(context: dict[str, Any])

Set the context that is made available to the task while it is running.

Source code in labtech/types.py
78
79
80
def set_context(self, context: dict[str, Any]):
    """Set the context that is made available to the task while it is
    running."""

run()

User-provided method that executes the task parameterised by the attributes of the task.

Usually executed by Lab.run_tasks() instead of being called directly.

Source code in labtech/types.py
82
83
84
85
86
87
88
89
def run(self):
    """User-provided method that executes the task parameterised by the
    attributes of the task.

    Usually executed by [`Lab.run_tasks()`][labtech.Lab.run_tasks]
    instead of being called directly.

    """

labtech.types.ResultT = TypeVar('ResultT') module-attribute

Type variable for result returned by the run method of a Task.

labtech.types.ResultMeta dataclass

Metadata about the execution of a task. If the task is loaded from cache, the metadata is also loaded from the cache.

Source code in labtech/types.py
37
38
39
40
41
42
43
44
@dataclass(frozen=True)
class ResultMeta:
    """Metadata about the execution of a task. If the task is loaded from
    cache, the metadata is also loaded from the cache."""
    start: Optional[datetime]
    """The timestamp when the task's execution began."""
    duration: Optional[timedelta]
    """The time that the task took to execute."""

start: Optional[datetime] instance-attribute

The timestamp when the task's execution began.

duration: Optional[timedelta] instance-attribute

The time that the task took to execute.

labtech.is_task_type(cls)

Returns True if the given cls is a class decorated with labtech.task.

Source code in labtech/types.py
95
96
97
98
def is_task_type(cls):
    """Returns `True` if the given `cls` is a class decorated with
    [`labtech.task`][labtech.task]."""
    return isclass(cls) and isinstance(getattr(cls, '_lt', None), TaskInfo)

labtech.is_task(obj)

Returns True if the given obj is an instance of a task class.

Source code in labtech/types.py
101
102
103
def is_task(obj):
    """Returns `True` if the given `obj` is an instance of a task class."""
    return is_task_type(type(obj)) and hasattr(obj, '_is_task')

labtech.logger = get_logger() module-attribute

logging.Logger object that labtech logs events to during task execution.

Can be used to customize logging and to write additional logs from task run() methods:

import logging
from labtech import logger

# Change verbosity of logging
logger.setLevel(logging.ERROR)

# Logging methods to call from inside your task's run() method:
logger.info('Useful info from task: ...')
logger.warning('Warning from task: ...')
logger.error('Error from task: ...')