Dask: дистанционное разделение данных - PullRequest
0 голосов
/ 21 октября 2019

Для набора данных, который загружается задачей и на который ссылается клиент в будущем, есть ли способ удаленного разделения данных на коллекцию фьючерсов?

Данные могут быть легко разделены, если ониперемещается обратно к клиенту ( аналогичный ответ ):

from distributed import Client


def load(a, b):
    return list(range(a, b))


def operator(x):
    return x * x


if __name__ == '__main__':
    client = Client()
    fut_data = client.submit(load, 0, 100)
    data = fut_data.result()
    fut_results = client.map(operator, data)
    results = client.gather(fut_results)

Чтобы избежать перемещения данных обратно к клиенту, можно использовать Queue для хранения фьючерсов, указывающих на разделы( аналогично ):

from distributed import Client, get_client
from dask.distributed import Queue


def load_and_queue(a, b):
    data = load(a, b)
    for f in get_client().scatter(data):
        queue.put(f)

if __name__ == '__main__':
    client = Client()
    queue = Queue()
    fire_and_forget(client.submit(load_and_queue, 0, 100))
    fut_results = [client.submit(operator, queue.get()) for _ in range(0, 100)]
    results = client.gather(fut_results)

Это не идеально, так как требует блокировки клиентского потока до тех пор, пока не будет истощен Queue и, как правило, необходимо знать, сколько элементов содержится в data что мы просто знаем в этом случае.

Другой альтернативой может быть запуск задач из задачи загрузки:

def load_task_for_task(a, b):
    data = load(a, b)
    task_client = get_client()
    secede()
    fut_results = task_client.map(operator, task_client.scatter(data))
    return task_client.gather(fut_results)

if __name__ == '__main__':
    client = Client()
    fut_results = client.submit(do_everything_remotely, 0, 100)
    results = fut_results.get()

Наконец, можно сделать нечто подобное тому, что содержится вдокументация здесь , которая соответствует отрыву раздела от большей части данных:

def item_i(data, i):
    return data[i]

if __name__ == '__main__':
    client = Client()
    fut_data = client.submit(load, 0, 100)
    n = client.submit(len, fut_data).result()
    fut_part = [client.submit(item_i, fut_data, i) for i in range(0, n)]
    fut_results = client.map(operator, fut_part)
    results = client.gather(fut_results)

Ни один из этих подходов не кажется особенно естественным. Есть ли лучший способ сделать это?

...