Как распараллелить groupby () в dask? - PullRequest
4 голосов
/ 09 апреля 2019

Я пытался:

df.groupby('name').agg('count').compute(num_workers=1)
df.groupby('name').agg('count').compute(num_workers=4)

Они занимают одно и то же время, почему не работает num_workers?

Спасибо

1 Ответ

0 голосов
/ 08 июля 2019

По умолчанию Dask будет работать с многопоточными задачами, что означает, что он использует один процессор на вашем компьютере.(Обратите внимание, что использование dask, тем не менее, интересно, если у вас есть данные, которые не помещаются в памяти) *

Если вы хотите использовать несколько процессоров для вычисления вашей операции, вы должны использовать другой планировщик:

from dask import dataframe as dd
from dask.distributed import LocalCluster, Client

df = dd.read_csv("data.csv")

def group(num_workers): 
    start = time.time() 
    res = df.groupby("name").agg("count").compute(num_workers=num_workers) 
    end = time.time() 
    return res, end-start

print(group(4))

clust = LocalCluster()
clt = Client(clust, set_as_default=True) 
print(group(4)) 

Здесь я создаю локальный кластер, используя 4 параллельных процесса (потому что у меня есть четырехъядерный процессор), а затем устанавливаю клиента планирования по умолчанию, который будет использовать этот локальный кластер для выполнения операций Dask.При использовании файла CSV с двумя столбцами объемом 1,5 Гб стандартная групповая работа на моем ноутбуке занимает около 35 секунд, тогда как многопроцессорная - всего около 22 секунд.

...