Совместное использование python объектов (например, Pandas Dataframe) между независимо выполняющимися python скриптами - PullRequest
2 голосов
/ 21 января 2020

это мой первый вопрос здесь, и я надеюсь, что я не открываю вопрос, очень похожий на уже существующий. Если это так, пожалуйста, извините!

Итак, у меня возникла небольшая проблема:

Я хотел бы запустить независимые python сценарии в параллельный, который может обращаться к тем же python объектам, в моем случае к Pandas Dataframe. Моя идея состоит в том, что один скрипт в основном постоянно работает и подписывается на поток данных (в данном случае это данные, которые передаются через веб-сокет), который затем добавляется в общий Dataframe. Второй сценарий должен запускаться независимо от первого и при этом иметь доступ к кадру данных, который постоянно обновляется первым сценарием. Во втором сценарии я хочу выполнить различные виды анализа в заранее заданные интервалы времени или выполнить другие относительно трудоемкие операции с действующими данными.

Я уже пытался выполнить все операции из одного сценария, но я продолжал иметь отсоединения от веб-сокета. В будущем появятся также несколько сценариев, которые должны получать доступ к общим данным в режиме реального времени.

Вместо сохранения CSV-файла или маринада после каждого обновления в сценарии 1, я бы предпочел решение, в котором оба сценария в основном разделяют та же память. Мне также нужен только один из сценариев для написания и добавления в Dataframe, другой только для чтения из него. Модуль многопроцессорной обработки, похоже, обладает некоторыми интересными функциями, которые могут помочь, но я до сих пор не мог разобраться в этом. Я также посмотрел на глобальные переменные, но в данном случае это тоже не совсем правильно.

То, что я представляю, выглядит примерно так (я знаю, что код совершенно неправильный, и это только для иллюстрации):

Первый сценарий должен продолжать назначать новые данные из потока данных для фрейм данных и общий доступ к этому объекту.

from share_data import share

shared_df = pd.DataFrame()

for data from datastream:
        shared_df.append(data)
        share(shared_df)

Затем второй сценарий сможет выполнить следующее:

from share_data import get

df = get(shared_df)

Возможно ли это вообще или у вас есть какие-либо идеи о том, как совершитель sh это по-простому? Или у вас есть предложения, какие пакеты могут быть полезны для использования?

С уважением, Оле

1 Ответ

3 голосов
/ 21 января 2020

У вас уже есть правильное представление о том, что вы можете сделать, чтобы использовать свои данные.

Наилучшее решение зависит от ваших реальных потребностей, поэтому я постараюсь охватить возможности рабочим примером.

Что вы хотите

Если я полностью понимаю вашу потребность, вы хотите

  • непрерывно обновлять DataFrame (из веб-сокета)
  • во время выполнения некоторых вычисления в одном и том же DataFrame
  • , поддерживающие актуальность DataFrame для вычислительных работников,
  • одно вычисление требует значительных ресурсов процессора
  • другое - нет.

Что вам нужно

Как вы сказали, вам потребуется способ запуска различных потоков или процессов, чтобы обеспечить выполнение вычислений.

Как насчет потоков

Самый простой способ достичь желаемого - использовать библиотеку threading . Поскольку потоки могут совместно использовать память, и у вас есть только один работник, фактически обновляющий DataFrame, довольно просто предложить способ управления данными в актуальном состоянии:

import time
from dataclasses import dataclass

import pandas
from threading import Thread


@dataclass
class DataFrameHolder:
    """This dataclass holds a reference to the current DF in memory.
    This is necessary if you do operations without in-place modification of
    the DataFrame, since you will need replace the whole object.
    """
    dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])

    def update(self, data):
        self.dataframe = self.dataframe.append(data, ignore_index=True)


class StreamLoader:
    """This class is our worker communicating with the websocket"""

    def __init__(self, df_holder: DataFrameHolder) -> None:
        super().__init__()
        self.df_holder = df_holder

    def update_df(self):
        # read from websocket and update your DF.
        data = {
            'A': [1, 2, 3],
            'B': [4, 5, 6],
        }
        self.df_holder.update(data)

    def run(self):
        # limit loop for the showcase
        for _ in range(5):
            self.update_df()
            print("[1] Updated DF %s" % str(self.df_holder.dataframe))
            time.sleep(3)


class LightComputation:
    """This class is a random computation worker"""

    def __init__(self, df_holder: DataFrameHolder) -> None:
        super().__init__()
        self.df_holder = df_holder

    def compute(self):
        print("[2] Current DF %s" % str(self.df_holder.dataframe))

    def run(self):
        # limit loop for the showcase
        for _ in range(5):
            self.compute()
            time.sleep(5)


def main():
    # We create a DataFrameHolder to keep our DataFrame available.
    df_holder = DataFrameHolder()

    # We create and start our update worker
    stream = StreamLoader(df_holder)
    stream_process = Thread(target=stream.run)
    stream_process.start()

    # We create and start our computation worker
    compute = LightComputation(df_holder)
    compute_process = Thread(target=compute.run)
    compute_process.start()

    # We join our Threads, i.e. we wait for them to finish before continuing
    stream_process.join()
    compute_process.join()


if __name__ == "__main__":
    main()

Обратите внимание, что мы используем класс для хранения ссылка на текущий DataFrame, поскольку некоторые операции, такие как append, не обязательно выполняются на месте, поэтому, если мы напрямую отправим ссылку на объект DataFrame, модификация будет потеряна на работнике. Здесь объект DataFrameHolder будет оставаться в памяти в том же месте, поэтому рабочий может по-прежнему иметь доступ к обновленному фрейму данных.

Процессы могут быть более мощными

Теперь, если вам требуется большая вычислительная мощность, процессы может быть более полезным, поскольку они позволяют изолировать вашего работника от другого ядра. Чтобы запустить процесс вместо потока в python, вы можете использовать многопроцессорную библиотеку . API обоих объектов одинаков, и вам нужно будет только изменить конструкторы следующим образом

from threading import Thread
# I create a thread
compute_process = Thread(target=compute.run)


from multiprocessing import Process
# I create a process that I can use the same way
compute_process = Process(target=compute.run)

Теперь, если вы попытались изменить значения в приведенном выше сценарии, вы увидите, что DataFrame не обновляется правильно.

Для этого вам потребуется дополнительная работа, поскольку два процесса не разделяют память, и у вас есть несколько способов связи между ними (https://en.wikipedia.org/wiki/Inter-process_communication)

Ссылка @SimonCrane весьма интересна по этим вопросам и демонстрирует использование разделяемой памяти между двумя процессами с использованием multiprocessing.manager .

Вот версия с рабочим, использующим отдельный процесс с общей памятью:

import logging
import multiprocessing
import time
from dataclasses import dataclass
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from threading import Thread

import pandas


@dataclass
class DataFrameHolder:
    """This dataclass holds a reference to the current DF in memory.
    This is necessary if you do operations without in-place modification of
    the DataFrame, since you will need replace the whole object.
    """
    dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])

    def update(self, data):
        self.dataframe = self.dataframe.append(data, ignore_index=True)

    def retrieve(self):
        return self.dataframe


class DataFrameManager(BaseManager):
    """This dataclass handles shared DataFrameHolder.
    See https://docs.python.org/3/library/multiprocessing.html#examples
    """
    # You can also use a socket file '/tmp/shared_df'
    MANAGER_ADDRESS = ('localhost', 6000)
    MANAGER_AUTH = b"auth"

    def __init__(self) -> None:
        super().__init__(self.MANAGER_ADDRESS, self.MANAGER_AUTH)
        self.dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])

    @classmethod
    def register_dataframe(cls):
        BaseManager.register("DataFrameHolder", DataFrameHolder)


class DFWorker:
    """Abstract class initializing a worker depending on a DataFrameHolder."""

    def __init__(self, df_holder: DataFrameHolder) -> None:
        super().__init__()
        self.df_holder = df_holder


class StreamLoader(DFWorker):
    """This class is our worker communicating with the websocket"""

    def update_df(self):
        # read from websocket and update your DF.
        data = {
            'A': [1, 2, 3],
            'B': [4, 5, 6],
        }
        self.df_holder.update(data)

    def run(self):
        # limit loop for the showcase
        for _ in range(4):
            self.update_df()
            print("[1] Updated DF\n%s" % str(self.df_holder.retrieve()))
            time.sleep(3)


class LightComputation(DFWorker):
    """This class is a random computation worker"""

    def compute(self):
        print("[2] Current DF\n%s" % str(self.df_holder.retrieve()))

    def run(self):
        # limit loop for the showcase
        for _ in range(4):
            self.compute()
            time.sleep(5)


def main():
    logger = multiprocessing.log_to_stderr()
    logger.setLevel(logging.INFO)

    # Register our DataFrameHolder type in the DataFrameManager.
    DataFrameManager.register_dataframe()
    manager = DataFrameManager()
    manager.start()
    # We create a managed DataFrameHolder to keep our DataFrame available.
    df_holder = manager.DataFrameHolder()

    # We create and start our update worker
    stream = StreamLoader(df_holder)
    stream_process = Thread(target=stream.run)
    stream_process.start()

    # We create and start our computation worker
    compute = LightComputation(df_holder)
    compute_process = Process(target=compute.run)
    compute_process.start()

    # The managed dataframe is updated in every Thread/Process
    time.sleep(5)
    print("[0] Main process DF\n%s" % df_holder.retrieve())

    # We join our Threads, i.e. we wait for them to finish before continuing
    stream_process.join()
    compute_process.join()


if __name__ == "__main__":
    main()

Как видите, различия между многопоточностью и обработкой весьма незначительны.

С помощью нескольких настроек вы можете начать отсюда для подключения к тот же менеджер, если вы хотите использовать другой файл для обработки вашего процессора интенсивной обработки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...