Мне нужен работник, чтобы обрабатывать одну задачу за раз и завершить текущий процесс, прежде чем начинать новую.Я не могу: (1) иметь не более одной задачи, выполняемой в каждый момент времени на каждом работнике, (2) заставить работника завершить процедуру перед запуском новой;атомарные транзакции.
Я использую клиент dask.distributed в кластере с 40 узлами;4 ядра и 15 ГБ оперативной памяти каждое.У конвейера, который я обрабатываю, задача объемом около 8-10 ГБ, поэтому выполнение двух задач на работе приведет к сбою приложения.
Я попытался назначить рабочему ресурсу и распределению задачи значения dask-worker scheduler-ip:port --nprocs 1 --resources process=1
и futures = [client.submit(func, f, resources={'process': 1}) for f in futures]
.но это было безуспешно.
Мой код выглядит следующим образом:
import dask
from dask.distributed import Client
@dask.delayed
def load():
...
@dask.delayed
def foo():
...
@dask.delayed
def save():
...
client = Client(scheduler-ip:port)
# Process file from a given path
paths = ['list', 'of', 'path']
results = []
for path in paths:
img = load(path)
for _ in range(n):
img = foo(img)
results.append(save(output-filename))
client.scatter(results)
futures = client.compute(results)
def identity(x):
return x
client.scatter(futures)
futures = [client.submit(same, f, resources={'process': 1}) for f in futures]
client.gather(futures)
На данный момент у меня есть два случая:
1- Я запускаю все свои входные данныеи приложения завершаются с MemoryError
2 - я запускаю подвыборку, однако она выполняется следующим образом:
load (img-1) -> load (img-2) -> foo (img-1) -> загрузить (img-3) -> ...-> сохранить (img-1) -> сохранить (img-2) -> ...
TLDR: этоВот что я хочу сделать для каждого работника:
load (img-1) -> foo (img-1) -> save (img-1) -> load (img-7) -> ...