Multi-Machine Clusters
Labtech offers built-in support for distributing your tasks across a
multi-machine Ray cluster with the
RayRunnerBackend
.
- Ray manages distributing tasks to available nodes across your cluster, and its built-in object store is used to share the lab context and task results across nodes.
- Ray's locality-aware scheduling will prefer scheduling tasks to nodes where the results of dependency tasks are already available.
- For task results that are NumPy arrays, Ray uses zero-copy deserialization to share the in-memory array between all workers on the same node.
You can also use distributed computation platforms other than Ray with Labtech by implementing a custom task runner backend, but this documentation will focus on Labtech's built-in Ray support.
Tip
Given the inherent complexities that come with managing a cluster of machines, you should always consider whether the scale of your tasks really requires it - public cloud providers offer "spot" (i.e. discounted cost, but evictable) virtual machines with dozens of CPU cores and hundreds of GB in memory for less than a dollar an hour, and these may be a perfect fit for easily scaling your tasks.
Running Labtech tasks on a Ray cluster
Follow these steps to get Labtech tasks running across a Ray cluster. Because Ray makes it easy to run a local cluster, you can even try Labtech with Ray without setting up a multi-machine cluster.
Installing Ray
ray
is an optional dependency of Labtech, so you must explicitly
install it with pip
(or your preferred Python package manager). It
is recommended that you install ray[default]
on the machine you
intend to start the cluster from so that you can enable Ray's built-in
dashboard:
pip install "ray[default]"
Using distributed storage
Because each Labtech task is responsible for caching its result to persistent storage, you must use a storage backend that can be accessed from any node in the cluster. For example, you could use an NFS share or a cloud object storage provider (e.g. Amazon S3 or Azure Blob Storage).
To learn how to configure Labtech to use a non-local storage backend, see: How can I cache task results somewhere other than my filesystem?
In the following example, we will run a
LocalStack instance to emulate an
Amazon S3 object storage bucket. For
testing, you can run your own LocalStack S3 bucket named
labtech-dev-bucket
with Docker Compose by
creating the following docker-compose.yml
and running
docker compose up localstack
:
# docker-compose.yml
services:
localstack:
image: localstack/localstack:4.3
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
volumes:
- "./.localstack:/var/lib/localstack"
post_start:
- command: awslocal s3api create-bucket --bucket labtech-dev-bucket
Code example
The following code demonstrates how to configure a Lab to run tasks across a Ray cluster:
# Ray defaults to de-duplicating similar log messages. To show all log
# messages from tasks, the RAY_DEDUP_LOGS environment variable must be
# set to zero **before** importing ray and labtech.runners.ray. See:
# https://docs.ray.io/en/latest/ray-observability/user-guides/
# configure-logging.html#log-deduplication
import os
os.environ['RAY_DEDUP_LOGS'] = '0'
import labtech
import ray
from labtech.storage import FsspecStorage
from labtech.runners.ray import RayRunnerBackend
from s3fs import S3FileSystem
# Connect to a Ray cluster:
# * If no cluster is running locally, `ray.init()` will start one.
# * If you've started a local cluster with `ray start --head --port 6379`,
# `ray.init()` will connect to it.
# See: https://docs.ray.io/en/latest/ray-core/starting-ray.html
# * You can specify the address of a remote cluster,
# e.g. `ray.init(address='ray://123.45.67.89:10001')`
ray.init()
# Define a custom Storage backend for our localstack S3 bucket
# using s3fs (which implements the fsspec interface)
class S3fsStorage(FsspecStorage):
def fs_constructor(self):
return S3FileSystem(
# Use localstack endpoint:
endpoint_url='http://localhost:4566',
key='anything',
secret='anything',
)
@labtech.task
class Experiment:
seed: int
def run(self):
labtech.logger.info(f'Running with seed {self.seed}')
return self.seed
experiments = [Experiment(seed=seed) for seed in range(10)]
# Configure a Lab with remote storage and a Ray runner backend:
lab = labtech.Lab(
# labtech-dev-bucket is the name of our localstack bucket:
storage=S3fsStorage('labtech-dev-bucket/lab_cache'),
runner_backend=RayRunnerBackend(),
)
results = lab.run_tasks(experiments, bust_cache=True)
print(results)
# Shutdown the connection to the Ray cluster:
ray.shutdown()
Ray remote function options
Ray allows you to configure a number of options that control how a
task will be executed.
These can be configured for Labtech tasks by defining a
runner_options()
method on a task type that returns a
ray.remote_options
section.
For example, you can configure the minimum memory and CPU cores that must be available to a Ray worker that is executing a task:
@task
class Experiment:
...
def runner_options(self):
# Require 2 CPU cores and 2G of memory for each task of this type.
return {
'ray': {
'remote_options': {
'num_cpus': '2',
'memory': (2 * (1024 ** 3)),
},
}
}
def run(self):
...
Syncing Python environments across a cluster
One of the challenges of running tasks across a distributed cluster is ensuring that the Python execution environment is identical in each worker process running on each node. You should employ the following mechanisms provided by Ray to ensure that your tasks execute identically wherever they are run.
Worker initialisation
You can use Ray's worker_process_setup_hook
to execute one-off setup code before any tasks are run in a Ray worker process.
For example, you can pass worker_process_setup_hook
into
ray.init()
to configure mlflow in each worker:
def worker_setup():
# Initialise mlflow on each worker to use a centralised
# mlflow tracking server:
mlflow.set_tracking_uri('http://my-mlflow-host:8080')
mlflow.set_experiment('example_ray_experiment')
ray.init(
runtime_env={
'worker_process_setup_hook': worker_setup,
},
)
File and package dependencies
There are two broad approaches to ensuring the necessary source code, data files, and Python packages are installed on each node in your Ray cluster, as discussed in Ray's documentation on Environment Dependencies:
- Pre-install all dependencies onto each node in your cluster. Ray's cluster launcher has options to support this.
- Specify a runtime environment
for Ray to install on-demand whenever a node runs a task.
- When using Labtech for experimentation, your code and dependencies may be changing frequently, so the flexibility of a runtime environment may be a better fit.
- You can specify a runtime environment for all tasks with
ray.init(runtime_env={...})
or for specific tasks with theruntime_env
remote function option. - You can use a runtime environment to:
Fault tolerance
Because a variety of failures can occur in a distributed system, Ray has several mechanisms for fault tolerance.
You should ensure that your tasks can be safely executed multiple times, as Ray may re-execute tasks under certain circumstances. You can control how many times Ray will re-execute tasks in some of these circumstances through remote function options:
- If your task's
run()
method raises an exception, Ray will not re-execute the task unless you setretry_exceptions
toTrue
or a list of exception types. - If the worker running a task dies
or a stored object is lost due to node failure
then Ray will re-execute the task up to
max_retries
(which you can disable by settingmax_retries=0
).- If a task is re-executed after a stored object is lost, it will still re-run the task instead of loading it's result from the cache.
- If Ray's memory monitor
terminates a task to avoid running out of memory, then the task will
be re-executed irrespective of the
max_retries
setting. You can disable this behaviour by disabling Ray's built-in memory monitor.
Other considerations
- A Lab's
max_workers
must be set toNone
when using theRayRunnerBackend
. This is because Ray concurrency is not limited by a maximum number of tasks but by specifying the resource requirements of each task. - Log messages from tasks are not displayed directly under the
Labtech progress bars but are instead available in Ray's worker
logs, which are, by default, available from:
- The standard output stream of your Python script running Labtech
- Under the cell that executes
ray.init()
in a Jupyter notebook - The Ray Dashboard that aggregates all worker logs
- Labtech's task monitor is currently unable to report on the CPU
usage, memory usage, and other process metrics for tasks run on Ray
clusters.
- Instead, you can refer to the resource utilisation of cluster nodes from the Ray Dashboard or enable the Ray Metrics View (which requires configuring Prometheus and Grafana).
API Reference
labtech.runners.ray.RayRunnerBackend
Bases: RunnerBackend
Runner Backend that runs each task on a Ray cluster.
Ray's shared-memory object store is used to distribute context and results between nodes, and Ray will allocate tasks to cluster nodes where large memory dependencies are already loaded.
Ray remote options
may be provided for a task by defining a runner_options()
on
it's Task type that returns a dictionary of options under
ray.remote_options
(the implementation of which may be based on task
parameter values):
@task
class Experiment:
...
def runner_options(self):
# Require 2 CPU cores and 2G of memory for each task of this type.
return {
'ray': {
'remote_options': {
'num_cpus': '2',
'memory': (2 * (1024 ** 3)),
},
}
}
def run(self):
...
Source code in labtech/runners/ray.py
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
|
__init__(monitor_interval_seconds: float = 1, monitor_timeout_seconds: int = 5) -> None
Parameters:
-
monitor_interval_seconds
(float
, default:1
) –Determines frequency of requests to Ray for task states.
-
monitor_timeout_seconds
(int
, default:5
) –Maximum time to wait for a request to Ray for task states.
Source code in labtech/runners/ray.py
300 301 302 303 304 305 306 307 308 309 |
|