Задача
Я хочу вычислить статистику "легко собрать", используя Dask.Скорость - это моя главная задача и цель, и поэтому я собираюсь бросить широкий круг проблем.В идеале я хотел бы закончить описанную проблему менее чем за час.Я рассчитываю использовать 100-1000 рабочих.В настоящее время я тестирую тесты на больших компьютерах (160 ядер, 4 ТБ ОЗУ), но планирую в ближайшее время перейти на Kubernetes.
Настройка
У меня есть некоторые данные в фрейме данных (pandas, dask, csv, parquet и т. д.) У меня также есть много подмножеств этих данных (с фильтрами произвольных столбцов), для которых я хотел бы рассчитать статистику.
Размер DataFrame :от 5 ГБ до 5 ТБ данных.(100 миллионов строк, 1000 столбцов).Ожидается 50-100 ТБ в будущем.
Размер статистики : около 5000 уникальных фильтров и от 1 до 500 статов на уникальный фильтр.(Статистика 5k-5M)
Пример игрушки ниже:
requested_statistics = [
{'filters': [{'col': 'col_1', 'op': 'lt', 'value': 0.8},
{'col': 'col_38', 'op': 'lt', 'value': 0.4},
{'col': 'col_224', 'op': 'gt', 'value': 0.12333}],
'output': {'col': 'col_3', 'op': 'sum'},
'name': 'stat_1'},
{'filters': [{'col': 'col_551', 'op': 'lt', 'value': 0.8},
{'col': 'col_112', 'op': 'gt', 'value': '2018-01-13'},
{'col': 'col_1', 'op': 'lt', 'value': 0.8}],
'output': {'col': 'col_2', 'op': 'avg'},
'name': 'stat_2'}
]
Я могу написать простой парсер, который работает на dask или pandas:
def filter_index(df, filter):
filter_ops = {'lt': lambda x, y: x < y, 'gt': lambda x, y: x > y, 'eq': lambda x, y: x == y}
return filter_ops[filter['op']](df[filter['col']], filter['value'])
def get_indexer(df, filters):
if len(filters) == 1:
return filter_index(df, filters[0])
return np.logical_and(filter_index(df, filters[0]), get_indexer(df, filters[1:]))
def get_statistic(df, statistic):
indexer = get_indexer(df, statistic['filters'])
agg_ops = {'sum': np.sum, 'avg': np.mean, 'unique_count': lambda x: x.unique().size}
return agg_ops[statistic['output']['op']](df[statistic['output']['col']][indexer])
all_stats = {x['name']: get_statistic(df, x) for x in requested_statistics}
Некоторые оптимизацииЯ пытался.
1) Просто положитесь на dask: future_stats = client.compute(all_stats)
.
Это не сработало, потому что время вычислений для оптимизации графа (или просто сериализации в планировщик) слишком великодолго.В мелкомасштабных тестах это работает нормально, но когда я масштабирую n-разделы, это, кажется, масштабируется намного хуже, чем O (N) во времени.
2) Запустите вычисление для каждой статистики (client.compute(stat, sync=True)
или client.compute(stat).result()
).
Это добавляет слишком много накладных расходов на общение с планировщиком, и для статистики ~ 100 000, которую я пытаюсь вычислить, потребуется слишком много времени.
3) Кэширование (с помощью persist) промежуточные результаты (индексаторы), чтобы я мог их использовать повторно.
Учитывая, что фильтры иногда могут совместно использовать индексаторы, я добавил кеширование в поля filter_index
и get_indexer
.
Специально, создайте хеш и indexer = client.persist(indexer)
, возвращая постоянный индексатор при будущих вызовах.Для get_indexer
я также добавил проверку combinations
, которая пытается увидеть, существует ли какое-либо подмножество фильтров в кэше.Я также оптимизировал порядок, в котором я вызываю статистику, чтобы оптимально потребовалось не более 1 нового изменяющегося индексатора на следующий набор.(например, выполнить все операции, которые совместно используют одни и те же фильтры одновременно, а затем перейти к следующему).
К сожалению, это требует огромных объемов памяти для сохранения всех логических масок.
Я еще не пробовал свернуть кэш (поскольку вычисления выполняются, cache.pop(index_id)
, как только вычисления больше не будут требовать его сохранения), но это мой следующий шаг.
Ключевые проблемы вhand
Решение (3), перечисленное выше, является тем, что я реализовал в настоящее время, но оно все еще работает не так хорошо, как я надеюсь.
Стоимость памяти очень высокавысокий (эффективно создающий полный новый столбец для каждого уникального фильтра)
сериализация планировщика / графика кажется дорогой
, глядя на htop
в большинстве случаев показывает, что только dask-scheduler
работает на 100%, а рабочие в основном бездействуют.
Вопросы
1) Какие еще подходы я мог быпринять или есть какие-либо явные промахи в моемПодход, перечисленный выше?
2) Я рассмотрел df.query(string)
, но, поскольку он работает на всем фрейме данных, кажется, что он будет неэффективным (много дублирования данных).Это правда, или есть некоторые победы с использованием встроенного синтаксического синтаксического анализатора (я заметил, что графы dask были меньше для этого, но не был уверен, стоит ли это того).
3)планировщик и однопоточный (?) создатель графа dask кажутся узким местом, есть ли какой-нибудь четкий путь для их распараллеливания?
4) Когда я смотрю на распределенный наблюдатель состояния боке, я часто замечаючто он также зависает во время этих вычислений, затрудняет отладку и заставляет меня любопытно, действительно ли использование веб-сервера ухудшает производительность планировщика?Это правда?
5) В логах я получаю много Event loop was unresponsive in Worker for Xs.
предупреждений.Могу ли я что-нибудь сделать, чтобы помочь сбалансировать работу или переписать задачи, которые распределяются на работника или сделать планировщик более отзывчивым?
6) Из-за желания уменьшить сложность графа dask,У меня repartition(npartitions=num_workers*2)
, но я не уверен, что это хорошая эвристика или что мне следует использовать?
Вот пример задач, которыми управляет планировщик (это для ~ 25 уникальных фильтров,каждый с ~ 50 статистикой, всего ~ 1000 статистических данных.
https://i.imgur.com/hRzmXHP.png
Спасибо за любую помощь или советы относительно того, как можно оптимизировать это.