Task Runner Backends
Task Runner Backends
You can control how tasks are executed in parallel by specifying an
instance of one of the following Runner Backend classes for the
runner_backend
argument of your Lab
:
labtech.runners.ForkRunnerBackend
Bases: RunnerBackend
Runner Backend that runs each task in a forked subprocess.
The context and dependency task results are shared in-memory between each subprocess.
labtech.runners.SpawnRunnerBackend
Bases: RunnerBackend
Runner Backend that runs each task in a spawned subprocess.
The required context and dependency task results are copied/duplicated into the memory of each subprocess.
labtech.runners.ThreadRunnerBackend
Bases: RunnerBackend
Runner Backend that runs tasks asynchronously in separate threads.
Memory use is reduced by sharing the same in-memory context and dependency task results across threads.
labtech.runners.SerialRunnerBackend
Distributed Task Runner Backends
Labtech offers support for running tasks across multiple machines. See: Multi-Machine Clusters
Custom Task Runner Backends
You can define your own Runner Backend to execute tasks with a
different form of parallelism or distributed computing platform by
defining an implementation of the
RunnerBackend
abstract base class:
labtech.types.RunnerBackend
Bases: ABC
Factory class to construct Runner objects.
Source code in labtech/types.py
329 330 331 332 333 334 335 336 337 338 339 340 341 342 |
|
build_runner(*, context: LabContext, storage: Storage, max_workers: Optional[int]) -> Runner
abstractmethod
Return a Runner prepared with the given configuration.
Parameters:
-
context
(LabContext
) –Additional variables made available to tasks that aren't considered when saving to/loading from the cache.
-
storage
(Storage
) –Where task results should be cached to.
-
max_workers
(Optional[int]
) –The maximum number of parallel worker processes for running tasks.
Source code in labtech/types.py
332 333 334 335 336 337 338 339 340 341 342 |
|
labtech.types.Runner
Bases: ABC
Manages the execution of Tasks, typically by delegating to a parallel processing framework.
Source code in labtech/types.py
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 |
|
submit_task(task: Task, task_name: str, use_cache: bool) -> None
abstractmethod
Submit the given task object to be run and have its result cached.
It is up to the Runner to decide when to start running the task (i.e. when resources become available).
The implementation of this method should run the task by effectively calling:
for dependency_task in get_direct_dependencies(task, all_identities=True):
# Where results_map is expected to contain the TaskResult for
# each dependency_task.
dependency_task._set_results_map(results_map)
current_process = multiprocessing.current_process()
orig_process_name = current_process.name
try:
# If the thread name or similar is set instead of the process
# name, then the Runner should update the handler of the global
# labtech.utils.logger to include that instead of the process name.
current_process.name = task_name
return labtech.runners.base.run_or_load_task(
task=task,
use_cache=use_cache,
filtered_context=task.filter_context(self.context),
storage=self.storage,
)
finally:
current_process.name = orig_process_name
Parameters:
-
task
(Task
) –The task to execute.
-
task_name
(str
) –Name to use when referring to the task in logs.
-
use_cache
(bool
) –If True, the task's result should be fetched from the cache if it is available (fetching should still be done in a delegated process).
Source code in labtech/types.py
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
|
wait(*, timeout_seconds: Optional[float]) -> Iterator[tuple[Task, ResultMeta | BaseException]]
abstractmethod
Wait up to timeout_seconds or until at least one of the submitted tasks is done, then return an iterator of tasks in a done state and a list of tasks in all other states.
Each task is returned as a pair where the first value is the task itself, and the second value is either:
- For a successfully completed task: Metadata of the result.
- For a task that fails with any BaseException descendant: The exception that was raised.
Cancelled tasks are never returned.
Source code in labtech/types.py
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 |
|
cancel() -> None
abstractmethod
Cancel all submitted tasks that have not yet been started.
Source code in labtech/types.py
293 294 295 |
|
stop() -> None
abstractmethod
Stop all currently running tasks.
Source code in labtech/types.py
297 298 299 |
|
close() -> None
abstractmethod
Clean up any resources used by the Runner after all tasks are finished, cancelled, or stopped.
Source code in labtech/types.py
301 302 303 304 |
|
pending_task_count() -> int
abstractmethod
Returns the number of tasks that have been submitted but not yet cancelled or returned from a call to wait().
Source code in labtech/types.py
306 307 308 309 |
|
get_result(task: Task) -> TaskResult
abstractmethod
Returns the in-memory result for a task that was successfully run by this Runner. Raises a KeyError for a result with no in-memory result.
Source code in labtech/types.py
311 312 313 314 315 |
|
remove_results(tasks: Sequence[Task]) -> None
abstractmethod
Removes the in-memory results for tasks that were sucessfully run by this Runner. Ignores tasks that have no in-memory result.
Source code in labtech/types.py
317 318 319 320 321 |
|
get_task_infos() -> list[TaskMonitorInfo]
abstractmethod
Returns a snapshot of monitoring information about each task that is currently running.
Source code in labtech/types.py
323 324 325 326 |
|