Обновление DataFrame в различных python процессах в реальном времени - PullRequest
8 голосов
/ 16 марта 2020

Допустим, у вас есть процесс Python, который собирает данные в режиме реального времени со скоростью около 500 строк в секунду (это может быть дополнительно распараллелено для сокращения примерно до 50 пс) из системы очередей и добавления его к DataFrame:

rq = MyRedisQueue(..)
df = pd.DataFrame()
while 1:
    recv = rq.get(block=True)
    # some converting
    df.append(recv, ignore_index = True)

Теперь вопрос: Как использовать процессоры на основе этих данных? Итак, я полностью осознаю ограничения GIL и посмотрел в диспетчер многопроцессорной обработки пространство имен , и здесь , но, похоже, есть некоторые недостатки в отношении задержки на удерживаемом по центру кадре данных . Прежде чем углубиться в это, я также попытался pool.map, который я чем распознал , чтобы применить pickle между процессами, что слишком медленно и имеет слишком много накладных расходов.

Итак, после всего этого я, наконец, задаюсь вопросом, как (если) вставка в 500 строк в секунду (или даже 50 строк в секунду) может быть передана различным процессам с некоторым временем ЦП, оставшимся для применения статистики и эвристики к данным в дочернем элементе процессы?

Может быть, было бы лучше внедрить пользовательский сокет tcp или систему очередей между двумя процессами? Или есть какие-то реализации в pandas или других библиотеках , чтобы действительно разрешить быстрый доступ к одному большому фрейму данных в родительском процессе ? Я люблю pandas!

Ответы [ 3 ]

4 голосов
/ 03 апреля 2020

Перед тем, как мы начнем, я должен сказать, что вы мало рассказали нам о своем коде, но помните об этом, чтобы каждую секунду передавать только эти 50/500 новых строк в дочерний процесс и пытаться создать этот большой * 1001. * в дочернем процессе.

Я работаю над проектом точно так же, как вы. Python получил много реализаций IP C, таких как Pipe и Queue, как вы знаете. Shared Memory решение может быть проблематичным c во многих случаях AFAIK python официальная документация предупреждено об использовании общей памяти.

По моему опыту, лучший способ преобразования данных между только двумя процессами - Pipe, поэтому вы можете выбрать DataFrame и отправить его в другую конечную точку соединения. Я настоятельно рекомендую вам избегать TCP сокетов (AF_INET) в вашем случае.

Pandas DataFrame не может быть преобразован в другой процесс без травления и расслоения. поэтому я также рекомендую передавать необработанные данные в виде встроенных типов, таких как dict вместо DataFrame. Это может ускорить процесс засолки и распахивания, а также уменьшит объем памяти.

0 голосов
/ 03 апреля 2020

Простым решением было бы разделить процесс на две разные стадии. Используйте Asyncio для получения данных неблокирующим способом и выполнения ваших преобразований в них. Второй этап потребует Asyncio Queue для построения DataFrame. Это предполагает, что вам не нужен DataFrame, доступный для другого процесса, пока вы получаете данные из очереди Redis.

Вот пример построения модели производитель / потребитель с помощью Asyncio

0 голосов
/ 31 марта 2020

Распараллеливание в pandas, вероятно, лучше обрабатывать другим движком.

Взгляните на проект Koalas от Databricks или DataFrame Dask .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...