Выключение диспетчера «AttributeError: объект ForkAwareLocal» не имеет атрибута «connection» при использовании пространства имен и разделяемой памяти - PullRequest
0 голосов
/ 04 февраля 2020

Я пытаюсь:

  1. разделить фрейм данных между процессами
  2. обновить общий анализ на основе вычислений, выполненных (но не изменяющих) этот фрейм данных

Я использую multiprocessing.Manager() для создания dict в общей памяти (для хранения результатов) и Namespace для хранения / совместного использования моего фрейма данных, с которого я хочу прочитать.

import multiprocessing

import pandas as pd
import numpy as np


def add_empty_dfs_to_shared_dict(shared_dict, key):
    shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
    row_to_insert = namespace.df.loc[ind]
    df = shared_dict[ind]
    df[ind] = row_to_insert
    shared_dict[ind] = df


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    namespace = manager.Namespace()

    n = 100
    dataframe_to_be_shared = pd.DataFrame({
        'player_id': list(range(n)),
        'data': np.random.random(n),
    }).set_index('player_id')

    namespace.df = dataframe_to_be_shared

    for i in range(n):
        add_empty_dfs_to_shared_dict(shared_dict, i)

    jobs = []
    for i in range(n):
        p = multiprocessing.Process(
            target=edit_df_in_shared_dict,
            args=(shared_dict, namespace, i)
        )
        jobs.append(p)
        p.start()

    for p in jobs:
        p.join()

    print(shared_dict[1])

При выполнении вышеизложенного он корректно записывает в shared_dict, поскольку мой оператор печати выполняется с некоторыми данными. Я также получаю сообщение об ошибке менеджера:

Process Process-88:
Traceback (most recent call last):
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 788, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/henrysorsky/Library/Preferences/PyCharm2019.2/scratches/scratch_13.py", line 34, in edit_df_in_shared_dict
    row_to_insert = namespace.df.loc[ind]
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 1099, in __getattr__
    return callmethod('__getattribute__', (key,))
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 792, in _callmethod
    self._connect()
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/managers.py", line 779, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 492, in Client
    c = SocketClient(address)
  File "/Users/henrysorsky/.pyenv/versions/3.7.3/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
    s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused

Я понимаю, что это исходит от менеджера и, похоже, из-за его неправильного завершения работы. Единственная похожая проблема, которую я могу найти в Интернете:

Список общего доступа между процессами в python server

предлагает объединить все дочерние процессы, что я уже делаю.

1 Ответ

0 голосов
/ 04 февраля 2020

Таким образом, после полного ночного сна я понял, что на самом деле чтение кадра данных в общей памяти вызывало проблемы, и что примерно в 20-м дочернем процессе некоторые из них терпели неудачу при этом чтении. Я добавил максимальное количество процессов для одновременного запуска, и это решило его.

Для всех, кто интересуется, код, который я использовал:

import multiprocessing

import pandas as pd
import numpy as np

def add_empty_dfs_to_shared_dict(shared_dict, key):
    shared_dict[key] = pd.DataFrame()


def edit_df_in_shared_dict(shared_dict, namespace, ind):
    row_to_insert = namespace.df.loc[ind]
    df = shared_dict[ind]
    df[ind] = row_to_insert
    shared_dict[ind] = df


if __name__ == '__main__':
    # region define inputs

    max_jobs_running = 4
    n = 100

    # endregion

    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    namespace = manager.Namespace()

    dataframe_to_be_shared = pd.DataFrame({
        'player_id': list(range(n)),
        'data': np.random.random(n),
    }).set_index('player_id')

    namespace.df = dataframe_to_be_shared

    for i in range(n):
        add_empty_dfs_to_shared_dict(shared_dict, i)

    jobs = []
    jobs_running = 0
    for i in range(n):
        p = multiprocessing.Process(
            target=edit_df_in_shared_dict,
            args=(shared_dict, namespace, i)
        )
        jobs.append(p)
        p.start()

        jobs_running += 1

        if jobs_running >= max_jobs_running:
            while jobs_running >= max_jobs_running:
                jobs_running = 0
                for p in jobs:
                    jobs_running += p.is_alive()

    for p in jobs:
        p.join()

    for key, value in shared_dict.items():
        print(f"key: {key}")
        print(f"value: {value}")
        print("-" * 50)

Это, вероятно, будет лучше обрабатываться Queue и Pool, а не мое хакерское исправление.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...