Я пытаюсь:
- разделить фрейм данных между процессами
- обновить общий анализ на основе вычислений, выполненных (но не изменяющих) этот фрейм данных
Я использую multiprocessing.Manager()
для создания dict
в общей памяти (для хранения результатов) и Namespace
для хранения / совместного использования моего фрейма данных, с которого я хочу прочитать.
import multiprocessing
import pandas as pd
import numpy as np
def add_empty_dfs_to_shared_dict(shared_dict, key):
shared_dict[key] = pd.DataFrame()
def edit_df_in_shared_dict(shared_dict, namespace, ind):
row_to_insert = namespace.df.loc[ind]
df = shared_dict[ind]
df[ind] = row_to_insert
shared_dict[ind] = df
if __name__ == '__main__':
manager = multiprocessing.Manager()
shared_dict = manager.dict()
namespace = manager.Namespace()
n = 100
dataframe_to_be_shared = pd.DataFrame({
'player_id': list(range(n)),
'data': np.random.random(n),
}).set_index('player_id')
namespace.df = dataframe_to_be_shared
for i in range(n):
add_empty_dfs_to_shared_dict(shared_dict, i)
jobs = []
for i in range(n):
p = multiprocessing.Process(
target=edit_df_in_shared_dict,
args=(shared_dict, namespace, i)
)
jobs.append(p)
p.start()
for p in jobs:
p.join()
print(shared_dict[1])
При выполнении вышеизложенного он корректно записывает в shared_dict
, поскольку мой оператор печати выполняется с некоторыми данными. Я также получаю сообщение об ошибке менеджера:
Process Process-88:
Traceback (most recent call last):
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 788, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Users/henrysorsky/Library/Preferences/PyCharm2019.2/scratches/scratch_13.py", line 34, in edit_df_in_shared_dict
row_to_insert = namespace.df.loc[ind]
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 1099, in __getattr__
return callmethod('__getattribute__', (key,))
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 792, in _callmethod
self._connect()
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 779, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 492, in Client
c = SocketClient(address)
File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused
Я понимаю, что это исходит от менеджера и, похоже, из-за его неправильного завершения работы. Единственная похожая проблема, которую я могу найти в Интернете:
Список общего доступа между процессами в python server
предлагает объединить все дочерние процессы, что я уже делаю.