Как получить результаты заданий, когда они завершены, а не после того, как все закончилось в Даске? - PullRequest
0 голосов
/ 05 апреля 2019

У меня есть датафрейм dask, и я хочу вычислить некоторые задачи, которые являются независимыми. Некоторые задачи выполняются быстрее, чем другие, но я получаю результат каждой задачи после выполнения более длинных задач.

Я создал локальный клиент и использую client.compute() для отправки задач. Затем я использую future.result(), чтобы получить результат каждого задания.

Я использую потоки, чтобы запрашивать результаты одновременно и измерять время для каждого результата для вычисления следующим образом:

def get_result(future,i):
    t0 = time.time()
    print("calculating result", i)
    result = future.result()
    print("result {} took {}".format(i, time.time() - t0))

client = Client()
df = dd.read_csv(path_to_csv)

future1 = client.compute(df[df.x > 200])
future2 = client.compute(df[df.x > 500])

threading.Thread(target=get_result, args=[future1,1]).start()
threading.Thread(target=get_result, args=[future2,2]).start()

Я ожидаю, что вывод приведенного выше кода будет выглядеть примерно так:

calculating result 1
calculating result 2
result 2 took 10
result 1 took 46

Поскольку первое задание больше.

Но вместо этого я получил оба одновременно

calculating result 1
calculating result 2
result 2 took 46.3046760559082
result 1 took 46.477620363235474

Я полагаю, это потому, что future2 фактически вычисляется в фоновом режиме и завершается до future1 , но ожидает, пока future1 не будет завершено, чтобы вернуться.

Есть ли способ получить результат future2 в тот момент, когда он заканчивается?

1 Ответ

1 голос
/ 05 апреля 2019

Вам не нужно создавать потоки для использования фьючерсов в асинхронном режиме - они уже изначально асинхронны и отслеживают свое состояние в фоновом режиме. Если вы хотите получить результаты в том порядке, в котором они готовы, вы должны использовать as_completed.

Однако в вашей конкретной ситуации вы можете просто просмотреть панель инструментов (или использовать df.visulalize()), чтобы понять происходящие вычисления. Оба фьючерса зависят от чтения CSV, и эта одна задача потребуется прежде, чем любой из них сможет работать - и, вероятно, занимает подавляющее большинство времени. Dask не знает без сканирования всех данных, какие строки имеют какое значение x.

...