Skip to content

Caches and Storage

Caches

You can control how the results of a particular task type should be formatted for caching by specifying an instance of one of the following Cache classes for the cache argument of the labtech.task decorator:

labtech.cache.PickleCache

Bases: BaseCache

Default cache that stores results as pickled Python objects.

NOTE: As pickle is not secure, you should only load pickle cache results that you trust.

Source code in labtech/cache.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class PickleCache(BaseCache):
    """Default cache that stores results as
    [pickled](https://docs.python.org/3/library/pickle.html) Python
    objects.

    NOTE: As pickle is not secure, you should only load pickle cache
    results that you trust.

    """

    KEY_PREFIX = 'pickle__'
    RESULT_FILENAME = 'data.pickle'

    def __init__(self, *, serializer: Optional[Serializer] = None,
                 pickle_protocol: int = pickle.HIGHEST_PROTOCOL):
        super().__init__(serializer=serializer)
        self.pickle_protocol = pickle_protocol

    def save_result(self, storage: Storage, task: Task[ResultT], result: ResultT):
        data_file = storage.file_handle(task.cache_key, self.RESULT_FILENAME, mode='wb')
        with data_file:
            pickle.dump(result, data_file, protocol=self.pickle_protocol)

    def load_result(self, storage: Storage, task: Task[ResultT]) -> ResultT:
        data_file = storage.file_handle(task.cache_key, self.RESULT_FILENAME, mode='rb')
        with data_file:
            return pickle.load(data_file)

labtech.cache.NullCache

Bases: Cache

Cache that never stores results in the storage provider.

Source code in labtech/cache.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class NullCache(Cache):
    """Cache that never stores results in the storage provider."""

    def cache_key(self, task: Task) -> str:
        return 'null'

    def is_cached(self, storage: Storage, task: Task) -> bool:
        return False

    def save(self, storage: Storage, task: Task[ResultT], result: TaskResult[ResultT]):
        pass

    def load_task(self, storage: Storage, task_type: Type[TaskT], key: str) -> TaskT:
        raise TaskNotFound

    def load_result_with_meta(self, storage: Storage, task: Task[ResultT]) -> TaskResult[ResultT]:
        raise CacheError('Loading a result from a NullCache is not supported.')

    def load_cache_timestamp(self, storage: Storage, task: Task) -> Any:
        raise CacheError('Loading a cache_timestamp from a NullCache is not supported.')

    def delete(self, storage: Storage, task: Task):
        pass

Custom Caches

You can define your own type of Cache with its own format or behaviour by inheriting from BaseCache:

labtech.cache.BaseCache

Bases: Cache

Base class for defining a Cache that will store results in a storage provider.

Source code in labtech/cache.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class BaseCache(Cache):
    """Base class for defining a Cache that will store results in a
    storage provider."""

    KEY_PREFIX = ''
    """Prefix for all files created by this Cache type - should be
    different for each Cache type to avoid conflicts."""

    METADATA_FILENAME = 'metadata.json'

    def __init__(self, *, serializer: Optional[Serializer] = None):
        self.serializer = serializer or Serializer()

    def cache_key(self, task: Task) -> str:
        serialized_str = json.dumps(self.serializer.serialize_task(task)).encode('utf-8')
        # Use sha1, as it is the same hash as git, produces short
        # hashes, and security concerns with sha1 are not relevant to
        # our use case.
        hashed = hashlib.sha1(serialized_str).hexdigest()
        return f'{self.KEY_PREFIX}{task.__class__.__qualname__}__{hashed}'

    def is_cached(self, storage: Storage, task: Task) -> bool:
        return storage.exists(task.cache_key)

    def save(self, storage: Storage, task: Task[ResultT], task_result: TaskResult[ResultT]):
        start_timestamp = None
        if task_result.meta.start is not None:
            start_timestamp = task_result.meta.start.isoformat()

        duration_seconds = None
        if task_result.meta.duration is not None:
            duration_seconds = task_result.meta.duration.total_seconds()

        metadata = {
            'labtech_version': labtech_version,
            'cache': self.__class__.__qualname__,
            'cache_key': task.cache_key,
            'task': self.serializer.serialize_task(task),
            'start_timestamp': start_timestamp,
            'duration_seconds': duration_seconds,
        }
        metadata_file = storage.file_handle(task.cache_key, self.METADATA_FILENAME, mode='w')
        with metadata_file:
            json.dump(metadata, metadata_file, indent=2)
        self.save_result(storage, task, task_result.value)

    def load_metadata(self, storage: Storage, task_type: Type[Task], key: str) -> dict[str, Any]:
        if not key.startswith(f'{self.KEY_PREFIX}{task_type.__qualname__}'):
            raise TaskNotFound
        with storage.file_handle(key, self.METADATA_FILENAME, mode='r') as metadata_file:
            metadata = json.load(metadata_file)
        if metadata.get('cache') != self.__class__.__qualname__:
            raise TaskNotFound
        return metadata

    def build_result_meta(self, metadata: dict[str, Any]) -> ResultMeta:
        start = None
        if 'start_timestamp' in metadata:
            start = datetime.fromisoformat(metadata['start_timestamp'])

        duration = None
        if 'duration_seconds' in metadata:
            duration = timedelta(seconds=metadata['duration_seconds'])

        return ResultMeta(
            start=start,
            duration=duration,
        )

    def load_task(self, storage: Storage, task_type: Type[TaskT], key: str) -> TaskT:
        metadata = self.load_metadata(storage, task_type, key)
        result_meta = self.build_result_meta(metadata)
        task = self.serializer.deserialize_task(metadata['task'], result_meta=result_meta)
        if not isinstance(task, task_type):
            raise TaskNotFound
        return task

    def load_result_with_meta(self, storage: Storage, task: Task[ResultT]) -> TaskResult[ResultT]:
        result = self.load_result(storage, task)
        metadata = self.load_metadata(storage, type(task), task.cache_key)
        return TaskResult(
            value=result,
            meta=self.build_result_meta(metadata),
        )

    def delete(self, storage: Storage, task: Task):
        storage.delete(task.cache_key)

    @abstractmethod
    def load_result(self, storage: Storage, task: Task[ResultT]) -> ResultT:
        """Loads the result for the given task from the storage provider.

        Args:
            storage: Storage provider to load the result from
            task: task instance to load the result for

        """

    @abstractmethod
    def save_result(self, storage: Storage, task: Task[ResultT], result: ResultT):
        """Saves the given task result into the storage provider.

        Args:
            storage: Storage provider to save the result into
            task: task instance the result belongs to
            result: result to save

        """
KEY_PREFIX = '' class-attribute instance-attribute

Prefix for all files created by this Cache type - should be different for each Cache type to avoid conflicts.

save_result(storage: Storage, task: Task[ResultT], result: ResultT) abstractmethod

Saves the given task result into the storage provider.

Parameters:

  • storage (Storage) –

    Storage provider to save the result into

  • task (Task[ResultT]) –

    task instance the result belongs to

  • result (ResultT) –

    result to save

Source code in labtech/cache.py
140
141
142
143
144
145
146
147
148
149
@abstractmethod
def save_result(self, storage: Storage, task: Task[ResultT], result: ResultT):
    """Saves the given task result into the storage provider.

    Args:
        storage: Storage provider to save the result into
        task: task instance the result belongs to
        result: result to save

    """
load_result(storage: Storage, task: Task[ResultT]) -> ResultT abstractmethod

Loads the result for the given task from the storage provider.

Parameters:

  • storage (Storage) –

    Storage provider to load the result from

  • task (Task[ResultT]) –

    task instance to load the result for

Source code in labtech/cache.py
130
131
132
133
134
135
136
137
138
@abstractmethod
def load_result(self, storage: Storage, task: Task[ResultT]) -> ResultT:
    """Loads the result for the given task from the storage provider.

    Args:
        storage: Storage provider to load the result from
        task: task instance to load the result for

    """

Storage

You can set the storage location for caching task results by specifying an instance of one of the following Storage classes for the storage argument of your Lab:

labtech.storage.LocalStorage

Bases: Storage

Storage provider that stores cached results in a local filesystem directory.

Source code in labtech/storage.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class LocalStorage(Storage):
    """Storage provider that stores cached results in a local filesystem
    directory."""

    def __init__(self, storage_dir: Union[str, Path], *, with_gitignore: bool = True):
        """
        Args:
            storage_dir: Path to the directory where cached results will be
                stored. The directory will be created if it does not already
                exist.
            with_gitignore: If `True`, a `.gitignore` file will be created
                inside the storage directory to ignore the entire storage
                directory. If an existing `.gitignore` file exists, it will be
                replaced.
        """
        if isinstance(storage_dir, str):
            storage_dir = Path(storage_dir)
        self._storage_path = storage_dir.resolve()
        if not self._storage_path.exists():
            self._storage_path.mkdir()

        if with_gitignore:
            gitignore_path = self._storage_path / '.gitignore'
            with gitignore_path.open('w') as gitignore_file:
                gitignore_file.write('*\n')

    def _key_path(self, key: str) -> Path:
        if not key:
            raise StorageError("Key cannot be empty")

        disallowed_key_chars = ['.', '/', '\\', os.path.sep, os.path.altsep]
        for char in disallowed_key_chars:
            if char is not None and char in key: # altsep can be None
                raise StorageError(f"Key '{key}' must not contain the forbidden character '{char}'")

        key_path = (self._storage_path / key).resolve()
        if key_path.parent != self._storage_path:
            raise StorageError((f"Key '{key}' should only reference a directory directly "
                                f"under the storage directory '{self._storage_path}'"))
        return key_path

    def find_keys(self) -> Sequence[str]:
        return sorted([
            key_path.name for key_path in self._storage_path.iterdir()
            if key_path.is_dir()
        ])

    def exists(self, key: str) -> bool:
        key_path = self._key_path(key)
        return key_path.exists()

    def file_handle(self, key: str, filename: str, *, mode: str = 'r') -> IO:
        key_path = self._key_path(key)
        try:
            key_path.mkdir()
        except FileExistsError:
            pass
        file_path = (key_path / filename).resolve()
        if file_path.parent != key_path:
            raise StorageError((f"Filename '{filename}' should only reference a directory directly "
                                f"under the storage key directory '{key_path}'"))
        return file_path.open(mode=mode)

    def delete(self, key: str):
        key_path = self._key_path(key)
        if key_path.exists():
            shutil.rmtree(key_path)

__init__(storage_dir: Union[str, Path], *, with_gitignore: bool = True)

Parameters:

  • storage_dir (Union[str, Path]) –

    Path to the directory where cached results will be stored. The directory will be created if it does not already exist.

  • with_gitignore (bool, default: True ) –

    If True, a .gitignore file will be created inside the storage directory to ignore the entire storage directory. If an existing .gitignore file exists, it will be replaced.

Source code in labtech/storage.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(self, storage_dir: Union[str, Path], *, with_gitignore: bool = True):
    """
    Args:
        storage_dir: Path to the directory where cached results will be
            stored. The directory will be created if it does not already
            exist.
        with_gitignore: If `True`, a `.gitignore` file will be created
            inside the storage directory to ignore the entire storage
            directory. If an existing `.gitignore` file exists, it will be
            replaced.
    """
    if isinstance(storage_dir, str):
        storage_dir = Path(storage_dir)
    self._storage_path = storage_dir.resolve()
    if not self._storage_path.exists():
        self._storage_path.mkdir()

    if with_gitignore:
        gitignore_path = self._storage_path / '.gitignore'
        with gitignore_path.open('w') as gitignore_file:
            gitignore_file.write('*\n')

labtech.storage.NullStorage

Bases: Storage

Storage provider that does not store cached results.

Source code in labtech/storage.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class NullStorage(Storage):
    """Storage provider that does not store cached results."""

    def find_keys(self) -> Sequence[str]:
        return []

    def exists(self, key: str) -> bool:
        return False

    def file_handle(self, key: str, filename: str, *, mode: str = 'r') -> IO:
        return open(os.devnull, mode=mode)

    def delete(self, key: str):
        pass

Custom Storage

To store cached results with an alternative storage provider (such as a storage bucket in the cloud), you can define your own type of Storage by inheriting from Storage:

labtech.storage.Storage

Bases: ABC

Storage provider for persisting cached task results.

Source code in labtech/types.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class Storage(ABC):
    """Storage provider for persisting cached task results."""

    @abstractmethod
    def find_keys(self) -> Sequence[str]:
        """Returns the keys of all currently cached task results."""

    @abstractmethod
    def exists(self, key: str) -> bool:
        """Returns `True` if the given task `key` is present in the storage
        cache."""

    @abstractmethod
    def file_handle(self, key: str, filename: str, *, mode: str = 'r') -> IO:
        """Opens and returns a File-like object for a single file within the
        storage cache.

        Args:
            key: The task key of the cached result containing the file.
            filename: The name of the file to open.
            mode: The file mode to open the file with.

        """

    @abstractmethod
    def delete(self, key: str) -> None:
        """Deletes the cached result for the task with the given `key`."""
find_keys() -> Sequence[str] abstractmethod

Returns the keys of all currently cached task results.

Source code in labtech/types.py
109
110
111
@abstractmethod
def find_keys(self) -> Sequence[str]:
    """Returns the keys of all currently cached task results."""
exists(key: str) -> bool abstractmethod

Returns True if the given task key is present in the storage cache.

Source code in labtech/types.py
113
114
115
116
@abstractmethod
def exists(self, key: str) -> bool:
    """Returns `True` if the given task `key` is present in the storage
    cache."""
file_handle(key: str, filename: str, *, mode: str = 'r') -> IO abstractmethod

Opens and returns a File-like object for a single file within the storage cache.

Parameters:

  • key (str) –

    The task key of the cached result containing the file.

  • filename (str) –

    The name of the file to open.

  • mode (str, default: 'r' ) –

    The file mode to open the file with.

Source code in labtech/types.py
118
119
120
121
122
123
124
125
126
127
128
@abstractmethod
def file_handle(self, key: str, filename: str, *, mode: str = 'r') -> IO:
    """Opens and returns a File-like object for a single file within the
    storage cache.

    Args:
        key: The task key of the cached result containing the file.
        filename: The name of the file to open.
        mode: The file mode to open the file with.

    """
delete(key: str) -> None abstractmethod

Deletes the cached result for the task with the given key.

Source code in labtech/types.py
130
131
132
@abstractmethod
def delete(self, key: str) -> None:
    """Deletes the cached result for the task with the given `key`."""