Почему DASK не выполняется параллельно - PullRequest
0 голосов
/ 10 июня 2019

Может ли кто-нибудь указать, что я сделал неправильно с последующей реализацией dask, поскольку, похоже, он не использует многоядерные системы.

[Обновлено с воспроизводимым кодом]

Код, который использует dask :

bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))
def calculate_feature_stats(bookingID):
    curr_book_data = book_data
    row = list()
    row.append(bookingID)
    row.append(curr_book_data.min())
    row.append(curr_book_data.max())
    row.append(curr_book_data.std())
    row.append(curr_book_data.mean())

    return row


calculate_feature_stats = dask.delayed(calculate_feature_stats)


rows = []


for bookid in bookingID.tolist():
    row = calculate_feature_stats(bookid)
    rows.append(row)

  start = time.time()
  rows = dask.persist(*rows)
  end = time.time()
  print(end - start)  # Execution time = 16s in my machine

Код с нормальной реализацией без задачи :

bookingID = np.arange(1,10000)
book_data = pd.DataFrame(np.random.rand(1000))

def calculate_feature_stats_normal(bookingID):
    curr_book_data = book_data
    row = list()
    row.append(bookingID)
    row.append(curr_book_data.min())
    row.append(curr_book_data.max())
    row.append(curr_book_data.std())
    row.append(curr_book_data.mean())
   return row


rows = []
start = time.time()
for bookid in bookingID.tolist():
    row = calculate_feature_stats_normal(bookid)
    rows.append(row)
end = time.time()
print(end - start)  # Execution time = 4s in my machine

Так что, если не на самом деле быстрее, как это возможно?

1 Ответ

1 голос
/ 13 июня 2019

Ответ

Расширенный комментарий.Вы должны учитывать, что при использовании dask накладные расходы составляют около 1 мс (см. doc ), поэтому, если ваши вычисления короче, чем dask, это не стоит проблем.

Переход к конкретному вопросуЯ могу вспомнить два возможных сценария реального мира: 1. Большой кадр данных со столбцом с именем bookingID и другим value 2. Другой файл для каждого идентификатора бронирования

Во втором случае вы можете играть из этого ответ в то время как для первого случая вы можете действовать следующим образом:

import dask.dataframe as dd
import numpy as np
import pandas as pd



# create dummy df
df = []
for i in range(10_000):
    df.append(pd.DataFrame({"id":i,
                            "value":np.random.rand(1000)}))
df = pd.concat(df, ignore_index=True)
df = df.sample(frac=1).reset_index(drop=True)
df.to_parquet("df.parq")

Панды

%%time
df = pd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":{"min", "max", "std", "mean"}})
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)

CPU times: user 1.65 s, sys: 316 ms, total: 1.96 s
Wall time: 1.08 s

Даска

%%time
df = dd.read_parquet("df.parq")
out = df.groupby("id").agg({"value":["min", "max", "std", "mean"]}).compute()
out.columns = [col[1] for col in out.columns]
out = out.reset_index(drop=True)

CPU times: user 4.94 s, sys: 427 ms, total: 5.36 s
Wall time: 3.94 s

Заключительные мысли

В этой ситуации dask начинает иметь смысл, если df не помещается в памяти.

...