Для набора данных, который загружается задачей и на который ссылается клиент в будущем, есть ли способ удаленного разделения данных на коллекцию фьючерсов?
Данные могут быть легко разделены, если ониперемещается обратно к клиенту ( аналогичный ответ ):
from distributed import Client
def load(a, b):
return list(range(a, b))
def operator(x):
return x * x
if __name__ == '__main__':
client = Client()
fut_data = client.submit(load, 0, 100)
data = fut_data.result()
fut_results = client.map(operator, data)
results = client.gather(fut_results)
Чтобы избежать перемещения данных обратно к клиенту, можно использовать Queue
для хранения фьючерсов, указывающих на разделы( аналогично ):
from distributed import Client, get_client
from dask.distributed import Queue
def load_and_queue(a, b):
data = load(a, b)
for f in get_client().scatter(data):
queue.put(f)
if __name__ == '__main__':
client = Client()
queue = Queue()
fire_and_forget(client.submit(load_and_queue, 0, 100))
fut_results = [client.submit(operator, queue.get()) for _ in range(0, 100)]
results = client.gather(fut_results)
Это не идеально, так как требует блокировки клиентского потока до тех пор, пока не будет истощен Queue
и, как правило, необходимо знать, сколько элементов содержится в data
что мы просто знаем в этом случае.
Другой альтернативой может быть запуск задач из задачи загрузки:
def load_task_for_task(a, b):
data = load(a, b)
task_client = get_client()
secede()
fut_results = task_client.map(operator, task_client.scatter(data))
return task_client.gather(fut_results)
if __name__ == '__main__':
client = Client()
fut_results = client.submit(do_everything_remotely, 0, 100)
results = fut_results.get()
Наконец, можно сделать нечто подобное тому, что содержится вдокументация здесь , которая соответствует отрыву раздела от большей части данных:
def item_i(data, i):
return data[i]
if __name__ == '__main__':
client = Client()
fut_data = client.submit(load, 0, 100)
n = client.submit(len, fut_data).result()
fut_part = [client.submit(item_i, fut_data, i) for i in range(0, n)]
fut_results = client.map(operator, fut_part)
results = client.gather(fut_results)
Ни один из этих подходов не кажется особенно естественным. Есть ли лучший способ сделать это?