Как правильно распараллелить код generi c с помощью Numba + Dask - PullRequest
1 голос
/ 11 февраля 2020

Я новичок в использовании Dask и Numba для ускорения кода, и я надеялся, что это может быть ценным вопросом для пользователей, чтобы получить ответы на передовые методики распараллеливания кода. Я сделал обобщенный c тестовый пример pandas фрейма данных с 3 столбцами.

Функция generi c будет реализована на 3 векторах в кадре, который представляет вид преобразования это можно сделать при анализе данных: первые два столбца возводятся в квадрат, добавляются, а затем берется квадрат root, а затем вычисляется логическое значение, сравнивая результат с 3-м столбцом.

Я реализую 4 теста случаи: (a) a pandas применяются, (b) Dask, (c) Numba и (d) Dask и Numba вместе.

Numba работает Великий. Все мои проблемы с Dask. Вот проблемы, которые у меня возникают:

  1. Dask, независимо от того, какого размера я делаю векторы, медленнее. Возможно, я не совсем понимаю, как и когда вычислять определенные части фрейма данных, или как правильно распараллелить его. Это медленнее, чем обычное применение.
  2. Как правильно использовать Dask для распараллеливания? Я нарисовал его как 4 раздела и у меня есть 2-х ядерный процессор, но как вы на самом деле решаете, как его отформатировать?
# Practice parallelizing
from dask import dataframe as dd
from numba import jit
import pandas as pd
import numpy as np
import time

# df is going to be the regular dataframe
df = pd.DataFrame(np.random.random(size=(1000000,3))*100,columns=['col1','col2','col3'])

# ddf is the dask dataframe
ddf = dd.from_pandas(df,npartitions=4)

# Check the distance regular (probably wouldn't write like this but doing for symmetry)
def check_dist(col1,col2,col3):
    dist = np.sqrt(col1**2+col2**2)
    check = dist < col3
    return check

# Jit
@jit(nopython=True)
def check_dist_fast(col1,col2,col3):
    dist = np.sqrt(col1**2+col2**2)
    check = dist < col3
    return check

#####################################
# Regular Python Apply
#####################################
t0 = time.time()
df['col4'] = df.apply(lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
t1 = time.time()-t0
print("Regular pandas took",t1)
df = df.drop('col4',axis=1)

#####################################
# Dask Apply
#####################################
t0 = time.time()
ddf['col4'] = ddf.map_partitions(lambda d: d.apply(
                                    lambda x: check_dist(x.col1,x.col2,x.col3),axis=1)
                                ).compute()
t1 = time.time()-t0
print("Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)


#####################################
# Numba Pandas
#####################################
t0 = time.time()
df['col4'] = check_dist_fast(df.col1.to_numpy(),df.col2.to_numpy(),df.col3.to_numpy())
t1 = time.time()-t0
print("Numba pandas took",t1)
df = df.drop('col4',axis=1)


#####################################
# Numba + Jit Pandas
#####################################
t0 = time.time()
t0 = time.time()

ddf['col4'] = ddf.map_partitions(lambda d: d.apply(lambda x:
                    check_dist_fast(x.col1,x.col2,x.col3),axis=1)).compute()
t1 = time.time()-t0
print("Numba Dask pandas took",t1)
ddf = ddf.drop('col4',axis=1)

Наконец, какие еще лучшие практики следует знать из. Идея состоит в том, чтобы отправить это в какой-то кластер со многими узлами.

Время:

  • Обычное pandas Потребовалось 150.6191689968109
  • Даск pandas Требуется 153.70575094223022
  • Numba pandas взял 0.710655927658081
  • Numba Dask pandas взял 139.57402181625366

1 Ответ

1 голос
/ 13 февраля 2020

Я думаю, что dask настолько медленный, потому что вы вычисляете ряды, используя:

ddf.map_partitions(
    lambda d: d.apply(lambda x: check_dist(x.col1,x.col2,x.col3), axis=1)
    ).compute()

, а затем присваиваете его новому столбцу, таким образом, dask не может распараллелить процесс. Следующий код делает то же самое, но выполняется за 0,06 секунды:

#####################################
# Dask Assign
#####################################
t0 = time.time()
ddf = ddf.assign(col4=lambda x: check_dist(x.col1,x.col2,x.col3))
ddf.compute()
t1 = time.time()-t0
print("Dask using Assign took",t1)
ddf = ddf.drop('col4',axis=1)

Я бы посоветовал взглянуть на раздел с рекомендациями в dask docs.

Надеюсь, это поможет!

...