Как использовать dask для эффективного расчета многих простых статистических данных - PullRequest
0 голосов
/ 19 декабря 2018

Задача

Я хочу вычислить статистику "легко собрать", используя Dask.Скорость - это моя главная задача и цель, и поэтому я собираюсь бросить широкий круг проблем.В идеале я хотел бы закончить описанную проблему менее чем за час.Я рассчитываю использовать 100-1000 рабочих.В настоящее время я тестирую тесты на больших компьютерах (160 ядер, 4 ТБ ОЗУ), но планирую в ближайшее время перейти на Kubernetes.

Настройка

У меня есть некоторые данные в фрейме данных (pandas, dask, csv, parquet и т. д.) У меня также есть много подмножеств этих данных (с фильтрами произвольных столбцов), для которых я хотел бы рассчитать статистику.

Размер DataFrame :от 5 ГБ до 5 ТБ данных.(100 миллионов строк, 1000 столбцов).Ожидается 50-100 ТБ в будущем.

Размер статистики : около 5000 уникальных фильтров и от 1 до 500 статов на уникальный фильтр.(Статистика 5k-5M)

Пример игрушки ниже:

requested_statistics = [
    {'filters': [{'col': 'col_1', 'op': 'lt', 'value': 0.8},
                 {'col': 'col_38', 'op': 'lt', 'value': 0.4},
                 {'col': 'col_224', 'op': 'gt', 'value': 0.12333}],
     'output': {'col': 'col_3', 'op': 'sum'},
     'name': 'stat_1'},
     {'filters': [{'col': 'col_551', 'op': 'lt', 'value': 0.8},
                  {'col': 'col_112', 'op': 'gt', 'value': '2018-01-13'},
                  {'col': 'col_1', 'op': 'lt', 'value': 0.8}],
      'output': {'col': 'col_2', 'op': 'avg'},
      'name': 'stat_2'}
]

Я могу написать простой парсер, который работает на dask или pandas:

def filter_index(df, filter):
    filter_ops = {'lt': lambda x, y: x < y, 'gt': lambda x, y: x > y, 'eq': lambda x, y: x == y}
    return filter_ops[filter['op']](df[filter['col']], filter['value'])

def get_indexer(df, filters):
    if len(filters) == 1:
        return filter_index(df, filters[0])
    return np.logical_and(filter_index(df, filters[0]), get_indexer(df, filters[1:]))

def get_statistic(df, statistic):
    indexer = get_indexer(df, statistic['filters'])
    agg_ops = {'sum': np.sum, 'avg': np.mean, 'unique_count': lambda x: x.unique().size}
    return agg_ops[statistic['output']['op']](df[statistic['output']['col']][indexer])

all_stats = {x['name']: get_statistic(df, x) for x in requested_statistics}

Некоторые оптимизацииЯ пытался.

1) Просто положитесь на dask: future_stats = client.compute(all_stats).

Это не сработало, потому что время вычислений для оптимизации графа (или просто сериализации в планировщик) слишком великодолго.В мелкомасштабных тестах это работает нормально, но когда я масштабирую n-разделы, это, кажется, масштабируется намного хуже, чем O (N) во времени.

2) Запустите вычисление для каждой статистики (client.compute(stat, sync=True) или client.compute(stat).result()).

Это добавляет слишком много накладных расходов на общение с планировщиком, и для статистики ~ 100 000, которую я пытаюсь вычислить, потребуется слишком много времени.

3) Кэширование (с помощью persist) промежуточные результаты (индексаторы), чтобы я мог их использовать повторно.

Учитывая, что фильтры иногда могут совместно использовать индексаторы, я добавил кеширование в поля filter_index и get_indexer.

Специально, создайте хеш и indexer = client.persist(indexer), возвращая постоянный индексатор при будущих вызовах.Для get_indexer я также добавил проверку combinations, которая пытается увидеть, существует ли какое-либо подмножество фильтров в кэше.Я также оптимизировал порядок, в котором я вызываю статистику, чтобы оптимально потребовалось не более 1 нового изменяющегося индексатора на следующий набор.(например, выполнить все операции, которые совместно используют одни и те же фильтры одновременно, а затем перейти к следующему).

К сожалению, это требует огромных объемов памяти для сохранения всех логических масок.

Я еще не пробовал свернуть кэш (поскольку вычисления выполняются, cache.pop(index_id), как только вычисления больше не будут требовать его сохранения), но это мой следующий шаг.

Ключевые проблемы вhand

Решение (3), перечисленное выше, является тем, что я реализовал в настоящее время, но оно все еще работает не так хорошо, как я надеюсь.

  • Стоимость памяти очень высокавысокий (эффективно создающий полный новый столбец для каждого уникального фильтра)

  • сериализация планировщика / графика кажется дорогой

  • , глядя на htopв большинстве случаев показывает, что только dask-scheduler работает на 100%, а рабочие в основном бездействуют.

Вопросы

1) Какие еще подходы я мог быпринять или есть какие-либо явные промахи в моемПодход, перечисленный выше?

2) Я рассмотрел df.query(string), но, поскольку он работает на всем фрейме данных, кажется, что он будет неэффективным (много дублирования данных).Это правда, или есть некоторые победы с использованием встроенного синтаксического синтаксического анализатора (я заметил, что графы dask были меньше для этого, но не был уверен, стоит ли это того).

3)планировщик и однопоточный (?) создатель графа dask кажутся узким местом, есть ли какой-нибудь четкий путь для их распараллеливания?

4) Когда я смотрю на распределенный наблюдатель состояния боке, я часто замечаючто он также зависает во время этих вычислений, затрудняет отладку и заставляет меня любопытно, действительно ли использование веб-сервера ухудшает производительность планировщика?Это правда?

5) В логах я получаю много Event loop was unresponsive in Worker for Xs. предупреждений.Могу ли я что-нибудь сделать, чтобы помочь сбалансировать работу или переписать задачи, которые распределяются на работника или сделать планировщик более отзывчивым?

6) Из-за желания уменьшить сложность графа dask,У меня repartition(npartitions=num_workers*2), но я не уверен, что это хорошая эвристика или что мне следует использовать?

Вот пример задач, которыми управляет планировщик (это для ~ 25 уникальных фильтров,каждый с ~ 50 статистикой, всего ~ 1000 статистических данных.

https://i.imgur.com/hRzmXHP.png

Спасибо за любую помощь или советы относительно того, как можно оптимизировать это.

1 Ответ

0 голосов
/ 20 декабря 2018

Два общих предложения приходят на ум, но трудно диагностировать такую ​​проблему без практического опыта с ней.Похоже, вы уже смотрите на приборную панель, что приятно слышать.Я сосредоточусь здесь на двух предложениях, чтобы избежать затрат на планирование, так как это то, что вы упомянули специально.

Использование больших разделов

Размер раздела по умолчанию для таких операций, как dd.read_csv, достаточно мал, чтобыработа на потребительских ноутбуках.(Я подозреваю, что они около 128 МБ).Учитывая размер ваших узлов, вы, вероятно, можете увеличить это в 10 раз (или больше) и все будет в порядке.Это сократит ваши затраты на планировщик также в 10 раз.

Используйте высокоуровневое объединение графиков

На 2018-12-20 это все еще находится в ветвях разработки, но dask.dataframe начинает сливатьсяна уровне выражения, а не на уровне задач.Это должно значительно сократить накладные расходы на тысячи статистических данных, возможно, превратив их в одну задачу с точки зрения Даска.

Возможно, вы захотите отследить следующие PR:

Я также призываю вас привести синтетический пример вашего варианта использования в качестве проблемы GitHub, чтобы он мог дать информацию о будущем развитии.Я рекомендую использовать dask.datasets.timeseries() для создания поддельного фрейма данных, а затем что-то простое для генерации большого количества простых статистических данных из этого.(просто гораздо лучше, если это возможно, чтобы сопровождающим не приходилось слишком глубоко погружаться).

...