Dask DataFrame присоединяются медленно, как панды - PullRequest
0 голосов
/ 14 февраля 2019

У меня есть 2 фрейма данных, один из которых называется аниме ~ 10 тыс. Строк данных, а другой называется анимелистами ~ 30 млн. Строк данных, и я хочу присоединиться к ним.Я сравнил его с пандами, и он быстрее всего на 7%, что не так много, и мне интересно, может ли это быть быстрее, если у меня 16 ядер.

У меня есть панды Dataframes, где я устанавливаю индексы

animes = animes.set_index('anime_id')
animelists = animelists.set_index('anime_id')

Данные выглядят следующим образом (я пропустил другие столбцы), аниме:

anime_id | genres
-------- | ------
11013    | Comedy, Supernatural, Romance, Shounen
2104     | Comedy, Parody, Romance, School, Shounen
5262     | Comedy, Magic, School, Shoujo

и анимелисты:

anime_id | username | my_score
21       | karthiga | 9
59       | karthiga | 7
74       | karthiga | 7

, а затем я создал Dask Dataframes изthis

animes_dd = dd.from_pandas(animes, npartitions=8)
animelists_dd = dd.from_pandas(animelists, npartitions=8)

Я хочу эффективно объединить отдельные жанры аниме с анимелистами, чтобы запрашивать оценки по жанрам.У меня есть код для этого здесь, в pandas:

genres_arr = animes['genres'].str.replace(' ', '').str.split(',', expand=True).stack().reset_index(drop=True, level=1).to_frame(name='genre')
genres_arr = genres_arr[genres_arr['genre'] != '']
resulting_df = animelists.merge(genres_arr, how='inner', left_index=True, right_index=True)
# this takes 1min 37s

и тот же код в dask:

genres_arr_dd = animes_dd['genres'].map_partitions(lambda x: x.str.replace(' ', '').str.split(',', expand=True).stack().reset_index(drop=True, level=1)).to_frame(name='genre')
genres_arr_dd = genres_arr_dd[genres_arr_dd['genre'] != '']
resulting_dd = animelists_dd.merge(genres_arr_dd, how='inner', left_index=True, right_index=True).compute()
# this takes 1min 30s

(результирующий фрейм данных имеет ~ 140M строк)

IsЕсть ли способ ускорить его?Я следовал официальному руководству по производительности , я выполняю объединения в индексированных столбцах и имею 8 разделов на каждом Dask Dataframe, поэтому он должен быть подготовлен для эффективного многопроцессорного объединения.

Что здесь не так икак мне увеличить скорость?

Когда я запускал код в ноутбуке jupyter, я наблюдал за загрузкой процессора на ядро, и он был очень низким, и некоторое время только одно ядро ​​было активным и работалона 100%.Кажется, что он не распараллеливает хорошо.

1 Ответ

0 голосов
/ 14 февраля 2019

Это было повторено в другом месте, поэтому я буду держать это очень кратко.

  • from_pandas-> compute означает, что вы округляете все данные;Вы хотите загружать в рабочих (например, dd.read_csv) и агрегировать в рабочих, а не перемещать целые наборы данных в

  • и обратно, выбор планировщика очень важен.Если ваш системный монитор говорит, что вы используете один ЦП, вы, вероятно, ограничены GIL и должны попробовать распределенный планировщик с соответствующим сочетанием процессов и потоков.Он также предоставит вам больше информации о том, что происходит, на панели инструментов:

  • Панды быстрые, а когда данные небольшие, дополнительные издержки dask, хотя и небольшие, могут перевеситьлюбой параллелизм вы получаете.

...