Как поставить в очередь отложенную задержку на каждом работнике, чтобы обеспечить последовательное выполнение процесса? - PullRequest
1 голос
/ 12 апреля 2019

Мне нужен работник, чтобы обрабатывать одну задачу за раз и завершить текущий процесс, прежде чем начинать новую.Я не могу: (1) иметь не более одной задачи, выполняемой в каждый момент времени на каждом работнике, (2) заставить работника завершить процедуру перед запуском новой;атомарные транзакции.

Я использую клиент dask.distributed в кластере с 40 узлами;4 ядра и 15 ГБ оперативной памяти каждое.У конвейера, который я обрабатываю, задача объемом около 8-10 ГБ, поэтому выполнение двух задач на работе приведет к сбою приложения.

Я попытался назначить рабочему ресурсу и распределению задачи значения dask-worker scheduler-ip:port --nprocs 1 --resources process=1 и futures = [client.submit(func, f, resources={'process': 1}) for f in futures].но это было безуспешно.

Мой код выглядит следующим образом:

import dask
from dask.distributed import Client


@dask.delayed
def load():
  ...


@dask.delayed
def foo():
  ...


@dask.delayed
def save():
  ...

client = Client(scheduler-ip:port)

# Process file from a given path
paths = ['list', 'of', 'path']

results = []
for path in paths:
  img = load(path)

  for _ in range(n):
    img = foo(img)

  results.append(save(output-filename))

client.scatter(results)
futures = client.compute(results)

def identity(x):
  return x
client.scatter(futures)
futures = [client.submit(same, f, resources={'process': 1}) for f in futures]

client.gather(futures)

На данный момент у меня есть два случая:

1- Я запускаю все свои входные данныеи приложения завершаются с MemoryError

2 - я запускаю подвыборку, однако она выполняется следующим образом:

load (img-1) -> load (img-2) -> foo (img-1) -> загрузить (img-3) -> ...-> сохранить (img-1) -> сохранить (img-2) -> ...

TLDR: этоВот что я хочу сделать для каждого работника:

load (img-1) -> foo (img-1) -> save (img-1) -> load (img-7) -> ...

1 Ответ

0 голосов
/ 15 апреля 2019

Самым простым здесь, вероятно, будет запуск ваших работников только с одного потока

dask-worker ... --nthreads 1

Тогда этот работник будет запускать только одну вещь за один раз

...