У меня есть пользовательская группа DAG, например:
dag = {'load': (load, 'myfile.txt'),
'heavy_comp': (heavy_comp, 'load'),
'simple_comp_1': (sc_1, 'heavy_comp'),
'simple_comp_2': (sc_2, 'heavy_comp'),
'simple_comp_3': (sc_3, 'heavy_comp')}
И я рассчитываю вычислить ключи simple_comp_1
, simple_comp_2
и simple_comp_3
, которые я выполняю следующим образом,
import dask
from dask.distributed import Client
from dask_yarn import YarnCluster
task_1 = dask.get(dag, 'simple_comp_1')
task_2 = dask.get(dag, 'simple_comp_2')
task_3 = dask.get(dag, 'simple_comp_3')
tasks = [task_1, task_2, task_3]
cluster = YarnCluster()
cluster.scale(3)
client = Client(cluster)
dask.compute(tasks)
cluster.shutdown()
Кажется, что без кэширования вычисление этих 3 ключей приведет к вычислению heavy_comp
также 3 раза. И поскольку это сложное вычисление, я попытался реализовать оппортунистическое кэширование из здесь следующим образом:
from dask.cache import Cache
cache = Cache(2e9)
cache.register()
Однако, когда я попытался распечатать результаты кэширования, я ничего не получил:
>>> cache.cache.data
[]
>>> cache.cache.heap.heap
{}
>>> cache.cache.nbytes
{}
Я даже пытался увеличить размер кэша до 6 ГБ, но безрезультатно. Я делаю что-то неправильно? Как я могу заставить Dask кешировать результат ключа heavy_comp
?