Почему запуск compute () на отфильтрованном фрейме данных Dask занимает так много времени? - PullRequest
0 голосов
/ 16 марта 2020

Я читаю данные, используя это: ddf1 = dd.read_sql_table('mytable', conn_string, index_col='id', npartitions=8)

Конечно, это происходит мгновенно из-за ленивых вычислений. Эта таблица имеет несколько сотен миллионов строк.

Далее я хочу отфильтровать этот кадр данных Dask:

ddf2 = ddf1.query('some_col == "converted"')

Наконец, я хочу преобразовать это в Pandas датафрейм. Результат должен составлять всего около 8000 строк:

ddf3 = ddf2.compute()

Однако это занимает очень много времени (~ 1 час). Могу ли я получить какой-либо совет о том, как существенно ускорить это? Я попытался использовать .compute(scheduler='threads'), изменив количество разделов, но пока ни один не работал. Что я делаю не так?

1 Ответ

1 голос
/ 16 марта 2020

Во-первых, вы можете использовать синтаксис выражения sqlalchemy для кодирования вашего предложения фильтра в запросе и выполнять фильтрацию на стороне сервера. Если передача данных является вашим узким местом, то это лучшее решение для вас, особенно если индексируется столбец фильтра.

В зависимости от вашей базы данных БД, sqlalchemy, вероятно, не освобождает GIL, поэтому ваши разделы не могут работать параллельно в потоки. Все, что вы получаете, это конфликт между потоками и дополнительные издержки. Вы должны использовать распределенный планировщик с процессами.

Конечно, пожалуйста посмотрите на использование вашего процессора и памяти; с помощью распределенного планировщика у вас также есть доступ к панели диагностики c. Вы также должны быть обеспокоены тем, насколько большим будет каждый раздел в памяти.

...