Tutorial
Labtech Tutorial
The following tutorial presents a complete example of using labtech to easily add parallelism and caching to machine learning experiments.
You can also run this tutorial as an interactive notebook.
Before we begin, let's install labtech
along with some other
dependencies we will use in this tutorial:
%pip install labtech mlflow scikit-learn
Let's also clear any caches that were created by previous runs of this tutorial:
!rm -rf storage/tutorial/
!mkdir -p storage/tutorial/
Running a single experiment as a labtech task
To get started, we'll take the following simple machine learning experiment code and convert it to be run with labtech.
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import log_loss
digits_X, digits_y = datasets.load_digits(return_X_y=True)
digits_X = StandardScaler().fit_transform(digits_X)
clf = RandomForestClassifier(
n_estimators=5,
random_state=42,
)
clf.fit(digits_X, digits_y)
prob_y = clf.predict_proba(digits_X)
print(f'{log_loss(digits_y, prob_y) = :.3}')
Throughout this tutorial, we will use labtech to improve and extend this experiment, which currently:
- Loads and scales the
digits
dataset.- This is a benchmark dataset where the goal is to train a classifier that can correctly assign a number between 0 and 9 to the image of a hand-written digit.
- We will want to extend our experimentation to also include other datasets.
- Trains a random forest classifier on the digits dataset.
- The classifier is configured with
n_estimators=5
(i.e. a forest of 5 trees) and a fixedrandom_state
(to ensure we get the same result every time we run the code). - We will want to extend our experimentation to test other
n_estimators
values and classifiers other than a random forest.
- The classifier is configured with
- Evaluates the classifier by calculating the logistic loss of
probabilities predicted by the classifier for the dataset.
- Standard evaluation practice would be to calculate loss for a separate test dataset, but we will use a single dataset for both training and testing to simplify this tutorial.
Let's set up this same experiment to be run with labtech, providing us with a foundation that we can extend throughout this tutorial.
First, we'll define a labtech task type that will load the dataset,
train the classifier, and return the probabilities predicted for the
dataset. Defining a task type for our experiment is as simple as
defining a class decorated with @labtech.task
that defines a run()
method that performs the experiment and returns its result (the
predicted probabilities):
import labtech
@labtech.task
class ClassifierExperiment:
def run(self):
digits_X, digits_y = datasets.load_digits(return_X_y=True)
digits_X = StandardScaler().fit_transform(digits_X)
clf = RandomForestClassifier(
n_estimators=5,
random_state=42,
)
clf.fit(digits_X, digits_y)
prob_y = clf.predict_proba(digits_X)
return prob_y
Next, we create a labtech lab that can be used to execute the
experiment. We'll configure the lab to cache results in a folder
called storage/tutorial/classification_lab_1
and to display
notebook-friendly progress bars:
lab = labtech.Lab(storage='storage/tutorial/classification_lab_1')
Finally, we create a task instance of ClassifierExperiment
and call
lab.run_task()
to run it. The output will be the predicted
probabilities returned by the task's run()
method, so we can
calculate the loss from them as before:
classifier_experiment = ClassifierExperiment()
prob_y = lab.run_task(classifier_experiment)
print(f'{log_loss(digits_y, prob_y) = :.3}')
An immediate benefit of running an experiment this way with labtech is that the result will be cached to disk for future use. Any future calls to run the same experiment (even after restarting Python) will load the result from the cache:
prob_y = lab.run_task(classifier_experiment)
print(f'{log_loss(digits_y, prob_y) = :.3}')
Defining the task to return the prediction probabilities instead of
just the loss metric gives us flexibility to change the evaluation in
the future (e.g. from log_loss
to another metric) while still being
able to re-use the same cached result.
We can also ask our lab to return task
objects for all previously
cached results for a given task type by calling lab.cached_tasks()
.
A given task could then be passed to lab.run_task()
to load it's
result (or we could pass a list of tasks to lab.run_tasks()
, as we
will see in the next section of this tutorial).
lab.cached_tasks([
ClassifierExperiment,
])
It is very important that you clear any cached results whenever you make a change that will impact the behaviour of a task - otherwise your cached results may no longer reflect the actual result of the current code.
You can clear the cached results for a list of tasks with
lab.uncache_tasks()
:
lab.uncache_tasks([
classifier_experiment,
])
Parameterising tasks, and running many tasks in parallel
Let's extend our experimentation to compare the results for
classifiers configured with different n_estimators
values.
To do so, we'll add an n_estimators
parameter to our
ClassifierExperiment
task type and reference it within the run()
method as self.n_estimators
. Task parameters are declared in exactly
the same way as
dataclass
fields:
import labtech
@labtech.task
class ClassifierExperiment:
n_estimators: int
def run(self):
digits_X, digits_y = datasets.load_digits(return_X_y=True)
digits_X = StandardScaler().fit_transform(digits_X)
clf = RandomForestClassifier(
n_estimators=self.n_estimators,
random_state=42,
)
clf.fit(digits_X, digits_y)
prob_y = clf.predict_proba(digits_X)
return prob_y
Now we'll use a list comprehension to construct a list of
ClassifierExperiment
tasks with different n_estimators
values:
classifier_experiments = [
ClassifierExperiment(
n_estimators=n_estimators,
)
for n_estimators in range(1, 11)
]
We can run a list of tasks with lab.run_tasks()
, which has the added
benefit of leveraging Python's multiprocessing capabilities to run
the tasks in parallel - running as many tasks simultaneously as
possible with the CPU of the machine running the tasks. Also, because
we've changed the definition of our ClassifierExperiment
class,
we'll keep caches for the new definition separate by constructing a
new lab that uses a different storage directory:
lab = labtech.Lab(storage='storage/tutorial/classification_lab_2')
results = lab.run_tasks(classifier_experiments)
lab.run_tasks()
returns a dictionary mapping each input task to the
result it returned, which we can loop over to print loss metrics for
each experiment:
for experiment, prob_y in results.items():
print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')
Maximising concurrency and caching with dependent tasks
Labtech's true power lies in its ability to manage complex networks of dependent tasks - automatically running as many tasks as possible in parallel (even different types of tasks) and re-using cached results wherever possible.
To demonstrate this, let's extend our experimentation with a new
post-processing step that will take the probabilities returned by one
of our previous ClassifierExperiment
tasks and assign a probability
of 1
to the most likely class for each record (and conversely assign
a probability of 0
to all other classes).
To achieve this, we will define a new MinMaxProbabilityExperiment
task type that accepts a ClassifierExperiment
as a parameter.
Labtech will consider any task in a parameter to be a dependency of
the task. Dependency tasks will be run before any of their dependent
tasks, allowing us to access the result from the .result
attribute
of the task parameter (i.e. self.classifier_experiment.result
):
import numpy as np
@labtech.task
class MinMaxProbabilityExperiment:
classifier_experiment: ClassifierExperiment
def run(self):
prob_y = self.classifier_experiment.result
# Replace the maximum probability in each row with 1,
# and replace all other probabilities with 0.
min_max_prob_y = np.zeros(prob_y.shape)
min_max_prob_y[np.arange(len(prob_y)), prob_y.argmax(axis=1)] = 1
return min_max_prob_y
We can then construct and run a list of MinMaxProbabilityExperiment
tasks that depend on our previous ClassifierExperiment
tasks in
classifier_experiments
. Labtech will ensure each of the
classifier_experiments
has been run before it's dependent
MinMaxProbabilityExperiment
is run, re-using results depended on by
multiple tasks and loading previously cached results wherever
possible:
min_max_prob_experiments = [
MinMaxProbabilityExperiment(
classifier_experiment=classifier_experiment,
)
for classifier_experiment in classifier_experiments
]
results = lab.run_tasks(min_max_prob_experiments)
for experiment, prob_y in results.items():
print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')
By simply specifying task dependencies, you can construct any task structure that can be expressed as a directed acyclic graph (or DAG) and let labtech handle running tasks concurrently, sharing results between dependent tasks, and using caches wherever possible.
Parameterising tasks with complex objects
Now let's extend our experimentation to compare different classifier models. We'd like to make the classifier itself a parameter to the task, but task parameters can only be json-serializable values or dependency tasks. Therefore, we will use dependency tasks to construct and return classifier objects to our experiment tasks. We achieve this in the following code by:
- Defining
RFClassifierTask
andLRClassifierTask
task types.RFClassifierTask
returns a random forest classifier parameterised by ann_estimators
value.LRClassifierTask
returns a logistic regression classifier.- Because constructing a classifier object is inexpensive, we don't
need to cache them, so we set
cache=None
in the@labtech.task
decorator for these task types. - For type hinting purposes, we will identify these task types
with the
ClassifierTask
Protocol, which will match any task type that returns an sklearn classifier.
- Redefining
ClassifierExperiment
to be parameterised by aClassifierTask
.- The classifier object to be trained and applied is retrieved
from the
ClassifierTask
result withself.classifier_task.result
. - Because one
ClassifierTask
result may be shared by manyClassifierExperiment
tasks, therun()
method first creates its own copy of the classifier withclone()
.
- The classifier object to be trained and applied is retrieved
from the
from typing import Protocol
from sklearn.base import clone, ClassifierMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
class ClassifierTask(Protocol):
def run(self) -> ClassifierMixin:
pass
@labtech.task(cache=None)
class RFClassifierTask:
n_estimators: int
def run(self) -> ClassifierMixin:
return RandomForestClassifier(
n_estimators=self.n_estimators,
random_state=42,
)
@labtech.task(cache=None)
class LRClassifierTask:
def run(self) -> ClassifierMixin:
return LogisticRegression(
random_state=42,
)
@labtech.task
class ClassifierExperiment:
classifier_task: ClassifierTask
def run(self):
digits_X, digits_y = datasets.load_digits(return_X_y=True)
digits_X = StandardScaler().fit_transform(digits_X)
clf = clone(self.classifier_task.result)
clf.fit(digits_X, digits_y)
prob_y = clf.predict_proba(digits_X)
return prob_y
Now we can generate and run a set of RFClassifierTask
tasks for
various n_estimators
values, and construct a ClassifierExperiment
for each of these RFClassifierTask
tasks as well as an
LRClassifierTask
task:
rf_classifier_tasks = [
RFClassifierTask(
n_estimators=n_estimators,
)
for n_estimators in range(1, 11)
]
classifier_experiments = [
ClassifierExperiment(
classifier_task=classifier_task,
)
for classifier_task in [
LRClassifierTask(),
*rf_classifier_tasks,
]
]
lab = labtech.Lab(storage='storage/tutorial/classification_lab_3')
results = lab.run_tasks(classifier_experiments)
for experiment, prob_y in results.items():
print(f'{experiment}: {log_loss(digits_y, prob_y) = :.3}')
Providing large objects as context
Sometimes we want to pass large, unchanging objects to our tasks, but don't want to be forced to load them in a dependent task. For example, it would be convenient to load a collection of datasets (on which to run our experiments) outside of any task, allowing us to inspect these datasets before and after the tasks have been run:
iris_X, iris_y = datasets.load_iris(return_X_y=True)
iris_X = StandardScaler().fit_transform(iris_X)
DATASETS = {
'digits': {'X': digits_X, 'y': digits_y},
'iris': {'X': iris_X, 'y': iris_y},
}
To achieve this, a labtech lab can be provided with a context that is made available to all tasks. In the following code, we:
- Pass a
context
to thelabtech.Lab()
constructor, with a'DATASETS'
key for the set ofDATASETS
defined above. - Redefine
ClassifierExperiment
to accept adataset_key
parameter and use it to look up a dataset inside the'DATASETS'
key of the context, which is made available by labtech asself.context
. - Alter the task generation and evaluation code to handle multiple datasets.
@labtech.task
class ClassifierExperiment:
classifier_task: ClassifierTask
dataset_key: str
def run(self):
dataset = self.context['DATASETS'][self.dataset_key]
X, y = dataset['X'], dataset['y']
clf = clone(self.classifier_task.result)
clf.fit(X, y)
prob_y = clf.predict_proba(X)
return prob_y
classifier_experiments = [
ClassifierExperiment(
classifier_task=classifier_task,
dataset_key=dataset_key,
)
# By including multiple for clauses, we will produce a ClassifierExperiment
# for every combination of dataset_key and classifier_task
for dataset_key in DATASETS.keys()
for classifier_task in [LRClassifierTask(), *rf_classifier_tasks]
]
lab = labtech.Lab(
storage='storage/tutorial/classification_lab_4',
context={
'DATASETS': DATASETS,
},
)
results = lab.run_tasks(classifier_experiments)
for experiment, prob_y in results.items():
dataset_y = DATASETS[experiment.dataset_key]["y"]
print(f'{experiment}: {log_loss(dataset_y, prob_y) = :.3}')
The lab context can also be useful for passing parameters to a task that won't affect its result and therefore don't need to be part of the task's formal parameters. For example: log levels and task-internal parallelism settings.
It is important that you do NOT make changes to context values that impact task results after you have started caching experiment results - otherwise your cached results may not reflect your latest context values.
Bringing it all together and aggregating results
The following code brings all the steps from this tutorial together in one place, with some additional improvements:
- The "experiment-like"
ClassifierExperiment
andMinMaxProbabilityExperiment
task types are now identified by a commonExperimentTask
Protocol (which requires each of those classes to provide adataset_key
attribute or property that is used by the newExperimentEvaluationTask
). - A new, final,
ExperimentEvaluationTask
task that depends on allExperimentTask
tasks is used to compute the loss metric for all experiments.- A final task like this is useful once we have a large number of experiments as it allows us to cache the final evaluation of all tasks, meaning that we only need to load experiment results and re-calculate metrics when experiment parameters have changed or new experiments have been added.
- We enable labtech's integration with
mlflow
by the following additions (see How can I use labtech with mlfow? for details):- We add
mlflow_run=True
to the@labtech.task
decorator ofClassifierExperiment
andMinMaxProbabilityExperiment
, indicating that each task of these types should be recorded as a "run" in mflow. - We name the over-arching mlflow "experiment" with
mlflow.set_experiment('example_labtech_experiment')
before the tasks are run.
- We add
from typing import Protocol
import labtech
from sklearn.base import clone, ClassifierMixin
# === Prepare Datasets ===
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
digits_X, digits_y = datasets.load_digits(return_X_y=True)
digits_X = StandardScaler().fit_transform(digits_X)
iris_X, iris_y = datasets.load_iris(return_X_y=True)
iris_X = StandardScaler().fit_transform(iris_X)
DATASETS = {
'digits': {'X': digits_X, 'y': digits_y},
'iris': {'X': iris_X, 'y': iris_y},
}
# === Classifier Tasks ===
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
class ClassifierTask(Protocol):
def run(self) -> ClassifierMixin:
pass
@labtech.task(cache=None)
class RFClassifierTask:
n_estimators: int
def run(self) -> ClassifierMixin:
return RandomForestClassifier(
n_estimators=self.n_estimators,
random_state=42,
)
@labtech.task(cache=None)
class LRClassifierTask:
def run(self) -> ClassifierMixin:
return LogisticRegression(
random_state=42,
)
# === Experiment Tasks ===
class ExperimentTask(Protocol):
dataset_key: str
def run(self) -> ClassifierMixin:
pass
@labtech.task(mlflow_run=True)
class ClassifierExperiment(ExperimentTask):
classifier_task: ClassifierTask
dataset_key: str
def run(self) -> np.ndarray:
dataset = self.context['DATASETS'][self.dataset_key]
X, y = dataset['X'], dataset['y']
clf = clone(self.classifier_task.result)
clf.fit(X, y)
prob_y = clf.predict_proba(X)
return prob_y
@labtech.task(mlflow_run=True)
class MinMaxProbabilityExperiment(ExperimentTask):
experiment: ExperimentTask
@property
def dataset_key(self):
return self.experiment.dataset_key
def run(self) -> np.ndarray:
prob_y = self.experiment.result
# Replace the maximum probability in each row with 1,
# and replace all other probabilities with 0.
min_max_prob_y = np.zeros(prob_y.shape)
min_max_prob_y[np.arange(len(prob_y)), prob_y.argmax(axis=1)] = 1
return min_max_prob_y
# === Results Aggregation ===
from sklearn.metrics import log_loss
@labtech.task
class ExperimentEvaluationTask:
experiments: list[ExperimentTask]
def run(self):
return {
experiment: {'log_loss': log_loss(
self.context['DATASETS'][experiment.dataset_key]['y'],
experiment.result,
)}
for experiment in self.experiments
}
# === Task Construction ===
rf_classifier_tasks = [
RFClassifierTask(
n_estimators=n_estimators,
)
for n_estimators in range(1, 11)
]
classifier_experiments = [
ClassifierExperiment(
classifier_task=classifier_task,
dataset_key=dataset_key,
)
for dataset_key in DATASETS.keys()
for classifier_task in [LRClassifierTask(), *rf_classifier_tasks]
]
min_max_prob_experiments = [
MinMaxProbabilityExperiment(
experiment=classifier_experiment,
)
for classifier_experiment in classifier_experiments
]
evaluation_task = ExperimentEvaluationTask(
experiments=[
*classifier_experiments,
*min_max_prob_experiments,
]
)
# === Task Execution ===
import mlflow
mlflow.set_experiment('example_labtech_experiment')
lab = labtech.Lab(
storage='storage/tutorial/classification_lab_final',
context={
'DATASETS': DATASETS,
},
)
evaluation_result = lab.run_task(evaluation_task)
for experiment, result in evaluation_result.items():
print(f'{experiment}: log_loss = {result["log_loss"]:.3}')
Visualising tasks and dependencies
Finally, we can use Labtech to generate a diagram of a list of tasks that shows all of the task types, parameters, and dependencies:
from labtech.diagram import display_task_diagram
labtech.diagram.display_task_diagram([
evaluation_task,
], direction='BT')
classDiagram
direction BT
class ExperimentEvaluationTask
ExperimentEvaluationTask : list[ExperimentTask] experiments
ExperimentEvaluationTask : run()
class ClassifierExperiment
ClassifierExperiment : ClassifierTask classifier_task
ClassifierExperiment : str dataset_key
ClassifierExperiment : run() ndarray
class MinMaxProbabilityExperiment
MinMaxProbabilityExperiment : ExperimentTask experiment
MinMaxProbabilityExperiment : run() ndarray
class LRClassifierTask
LRClassifierTask : run() ClassifierMixin
class RFClassifierTask
RFClassifierTask : int n_estimators
RFClassifierTask : run() ClassifierMixin
ExperimentEvaluationTask <-- "many" ClassifierExperiment: experiments
ExperimentEvaluationTask <-- "many" MinMaxProbabilityExperiment: experiments
ClassifierExperiment <-- LRClassifierTask: classifier_task
ClassifierExperiment <-- RFClassifierTask: classifier_task
MinMaxProbabilityExperiment <-- ClassifierExperiment: experiment
Such diagrams can help you visualise how your experiments are running, and may be useful to include in project documentation.
Next steps
Congratulations on completing the labtech tutorial! You're now ready to manage complex experiment workflows with ease!
To learn more about labtech, you can dive into the following resources: