У вас уже есть правильное представление о том, что вы можете сделать, чтобы использовать свои данные.
Наилучшее решение зависит от ваших реальных потребностей, поэтому я постараюсь охватить возможности рабочим примером.
Что вы хотите
Если я полностью понимаю вашу потребность, вы хотите
- непрерывно обновлять DataFrame (из веб-сокета)
- во время выполнения некоторых вычисления в одном и том же DataFrame
- , поддерживающие актуальность DataFrame для вычислительных работников,
- одно вычисление требует значительных ресурсов процессора
- другое - нет.
Что вам нужно
Как вы сказали, вам потребуется способ запуска различных потоков или процессов, чтобы обеспечить выполнение вычислений.
Как насчет потоков
Самый простой способ достичь желаемого - использовать библиотеку threading . Поскольку потоки могут совместно использовать память, и у вас есть только один работник, фактически обновляющий DataFrame, довольно просто предложить способ управления данными в актуальном состоянии:
import time
from dataclasses import dataclass
import pandas
from threading import Thread
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
class StreamLoader:
"""This class is our worker communicating with the websocket"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(5):
self.update_df()
print("[1] Updated DF %s" % str(self.df_holder.dataframe))
time.sleep(3)
class LightComputation:
"""This class is a random computation worker"""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
def compute(self):
print("[2] Current DF %s" % str(self.df_holder.dataframe))
def run(self):
# limit loop for the showcase
for _ in range(5):
self.compute()
time.sleep(5)
def main():
# We create a DataFrameHolder to keep our DataFrame available.
df_holder = DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Thread(target=compute.run)
compute_process.start()
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
Обратите внимание, что мы используем класс для хранения ссылка на текущий DataFrame, поскольку некоторые операции, такие как append
, не обязательно выполняются на месте, поэтому, если мы напрямую отправим ссылку на объект DataFrame, модификация будет потеряна на работнике. Здесь объект DataFrameHolder
будет оставаться в памяти в том же месте, поэтому рабочий может по-прежнему иметь доступ к обновленному фрейму данных.
Процессы могут быть более мощными
Теперь, если вам требуется большая вычислительная мощность, процессы может быть более полезным, поскольку они позволяют изолировать вашего работника от другого ядра. Чтобы запустить процесс вместо потока в python, вы можете использовать многопроцессорную библиотеку . API обоих объектов одинаков, и вам нужно будет только изменить конструкторы следующим образом
from threading import Thread
# I create a thread
compute_process = Thread(target=compute.run)
from multiprocessing import Process
# I create a process that I can use the same way
compute_process = Process(target=compute.run)
Теперь, если вы попытались изменить значения в приведенном выше сценарии, вы увидите, что DataFrame не обновляется правильно.
Для этого вам потребуется дополнительная работа, поскольку два процесса не разделяют память, и у вас есть несколько способов связи между ними (https://en.wikipedia.org/wiki/Inter-process_communication)
Ссылка @SimonCrane весьма интересна по этим вопросам и демонстрирует использование разделяемой памяти между двумя процессами с использованием multiprocessing.manager .
Вот версия с рабочим, использующим отдельный процесс с общей памятью:
import logging
import multiprocessing
import time
from dataclasses import dataclass
from multiprocessing import Process
from multiprocessing.managers import BaseManager
from threading import Thread
import pandas
@dataclass
class DataFrameHolder:
"""This dataclass holds a reference to the current DF in memory.
This is necessary if you do operations without in-place modification of
the DataFrame, since you will need replace the whole object.
"""
dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
def update(self, data):
self.dataframe = self.dataframe.append(data, ignore_index=True)
def retrieve(self):
return self.dataframe
class DataFrameManager(BaseManager):
"""This dataclass handles shared DataFrameHolder.
See https://docs.python.org/3/library/multiprocessing.html#examples
"""
# You can also use a socket file '/tmp/shared_df'
MANAGER_ADDRESS = ('localhost', 6000)
MANAGER_AUTH = b"auth"
def __init__(self) -> None:
super().__init__(self.MANAGER_ADDRESS, self.MANAGER_AUTH)
self.dataframe: pandas.DataFrame = pandas.DataFrame(columns=['A', 'B'])
@classmethod
def register_dataframe(cls):
BaseManager.register("DataFrameHolder", DataFrameHolder)
class DFWorker:
"""Abstract class initializing a worker depending on a DataFrameHolder."""
def __init__(self, df_holder: DataFrameHolder) -> None:
super().__init__()
self.df_holder = df_holder
class StreamLoader(DFWorker):
"""This class is our worker communicating with the websocket"""
def update_df(self):
# read from websocket and update your DF.
data = {
'A': [1, 2, 3],
'B': [4, 5, 6],
}
self.df_holder.update(data)
def run(self):
# limit loop for the showcase
for _ in range(4):
self.update_df()
print("[1] Updated DF\n%s" % str(self.df_holder.retrieve()))
time.sleep(3)
class LightComputation(DFWorker):
"""This class is a random computation worker"""
def compute(self):
print("[2] Current DF\n%s" % str(self.df_holder.retrieve()))
def run(self):
# limit loop for the showcase
for _ in range(4):
self.compute()
time.sleep(5)
def main():
logger = multiprocessing.log_to_stderr()
logger.setLevel(logging.INFO)
# Register our DataFrameHolder type in the DataFrameManager.
DataFrameManager.register_dataframe()
manager = DataFrameManager()
manager.start()
# We create a managed DataFrameHolder to keep our DataFrame available.
df_holder = manager.DataFrameHolder()
# We create and start our update worker
stream = StreamLoader(df_holder)
stream_process = Thread(target=stream.run)
stream_process.start()
# We create and start our computation worker
compute = LightComputation(df_holder)
compute_process = Process(target=compute.run)
compute_process.start()
# The managed dataframe is updated in every Thread/Process
time.sleep(5)
print("[0] Main process DF\n%s" % df_holder.retrieve())
# We join our Threads, i.e. we wait for them to finish before continuing
stream_process.join()
compute_process.join()
if __name__ == "__main__":
main()
Как видите, различия между многопоточностью и обработкой весьма незначительны.
С помощью нескольких настроек вы можете начать отсюда для подключения к тот же менеджер, если вы хотите использовать другой файл для обработки вашего процессора интенсивной обработки.