вложенные рабочие процессы dask в python? - PullRequest
0 голосов
/ 06 декабря 2018

У меня есть простой рабочий процесс DASK.когда я его распечатываю, это выглядит так:

workflow = {
    'a_task': (<function a_func at 0x7f1dc5ded598>,), 
    'b_task': (<function b_func at 0x7f1dc5ded620>,), 
    'c_task': (<function c_func at 0x7f1ddb07aea0>,), 
    'd_task': (<function d_func at 0x7f1dc5ded6a8>,), 
    'workflow_end_task': (<function aggregate_func at 0x7f1dc5ded730>, 
        'a_task', 'b_task', 'c_task', 'd_task')
}

Оказывается, b_func - это небольшая функция с циклом for, которая выполняет около 1000 итераций и занимает около часа.в основном это выглядит так:

def b_func(args...):
    data = []
    for this in that:
        data.append(...)
    return data

Это цикл for не обязательно должен быть выполнен по порядку.Это можно сделать параллельно.

Итак, вопрос: как мне справиться с этим?Должен ли я преобразовать этот цикл for в рабочий процесс и сделать еще один вызов dask внутри b_func?или я должен вытащить этот процесс и расширить исходный рабочий процесс?

По сути, могу ли я вкладывать рабочие процессы dask или это плохая идея?

Кроме того, вы должны знать, что я использую from dask.distributed import Client и Client.get для распределения рабочего процесса по всему кластеру компьютеров.Я не знаю, усложняет ли это что-то за пределами dask.threaded.get, но, возможно, это что-то меняет.Я предполагаю, что это означает, что одному из сотрудников workers потребуется настроить новый планировщик и рабочих на всех компьютерах кластера, а затем передать им свой рабочий процесс.Может быть, idk.

Кто-нибудь имел дело с этой проблемой раньше?

1 Ответ

0 голосов
/ 10 декабря 2018

Должен ли я преобразовать цикл for в рабочий процесс и сделать еще один вызов dask внутри b_func?или я должен вытащить этот процесс и расширить исходный рабочий процесс?

По сути, я могу вложить рабочие процессы dask или это плохая идея?

В общем случае нет, выне должно быть задач в даске и звоните compute.Однако вы можете сделать это с помощью распределенного планировщика, и все должно работать.Если вы не укажете планировщик при вызове compute в задаче, будет использован текущий планировщик.Недостатком является то, что отправляющее задание (в вашем случае «b_task») будет по-прежнему заблокировано все время, что занимает поток на работнике (менее эффективно).

В вашем случае я бы вместо этого собралвесь график заранее, используя dask.delayed (http://docs.dask.org/en/latest/delayed.html).). Это позволяет вам написать зацикленный нормальный код Python, и dask может построить график для вас. Для получения дополнительной информации см. отложенную документацию.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...