Как синхронизировать Python dict с многопроцессорностью - PullRequest
24 голосов
/ 30 марта 2010

Я использую Python 2.6 и многопроцессорный модуль для многопоточности. Теперь я хотел бы иметь синхронизированный dict (где единственная атомарная операция, которая мне действительно нужна, это оператор + = для значения).

Должен ли я обернуть диктовку вызовом multiprocessing.sharedctypes.synchronized ()? Или другой путь?

Ответы [ 4 ]

55 голосов
/ 01 апреля 2010

Введение

Кажется, есть много предложений кресел и никаких рабочих примеров. Ни один из ответов, перечисленных здесь, даже не предлагает использовать многопроцессорность, и это немного разочаровывает и беспокоит. Как любители Python, мы должны поддерживать наши встроенные библиотеки, и хотя параллельная обработка и синхронизация никогда не бывает тривиальным вопросом, я считаю, что это можно сделать тривиальным при правильном проектировании. Это становится чрезвычайно важным в современных многоядерных архитектурах и не может быть подчеркнуто достаточно! Тем не менее, я далеко не удовлетворен многопроцессорной библиотекой, так как она все еще находится в зачаточном состоянии, с довольно большим количеством подводных камней, ошибок и направленностью на функциональное программирование (которое я ненавижу). В настоящее время я все еще предпочитаю модуль Pyro (который значительно опережает свое время) над многопроцессорным, поскольку многопроцессорное ограничение состоит в невозможности совместного использования вновь созданных объектов во время работы сервера. Метод класса «register» объектов менеджера будет фактически регистрировать объект только ДО запуска менеджера (или его сервера). Достаточно болтовни, больше кода:

Server.py

from multiprocessing.managers import SyncManager


class MyManager(SyncManager):
    pass


syncdict = {}
def get_dict():
    return syncdict

if __name__ == "__main__":
    MyManager.register("syncdict", get_dict)
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.start()
    raw_input("Press any key to kill server".center(50, "-"))
    manager.shutdown()

В приведенном выше примере кода Server.py использует многопроцессорный SyncManager, который может предоставлять синхронизированные общие объекты. Этот код не будет работать в интерпретаторе, потому что многопроцессорная библиотека довольно разборчива в том, как найти «вызываемый» для каждого зарегистрированного объекта. Запуск Server.py запустит настроенный SyncManager, который совместно использует словарь синдиката для использования несколькими процессами и может быть подключен к клиентам либо на одном компьютере, либо, если он запущен на IP-адресе, отличном от loopback, на других компьютерах. В этом случае сервер работает по шлейфу (127.0.0.1) на порту 5000. При использовании параметра authkey используются безопасные соединения при манипулировании синдикатом. При нажатии любой клавиши менеджер выключается.

Client.py

from multiprocessing.managers import SyncManager
import sys, time

class MyManager(SyncManager):
    pass

MyManager.register("syncdict")

if __name__ == "__main__":
    manager = MyManager(("127.0.0.1", 5000), authkey="password")
    manager.connect()
    syncdict = manager.syncdict()

    print "dict = %s" % (dir(syncdict))
    key = raw_input("Enter key to update: ")
    inc = float(raw_input("Enter increment: "))
    sleep = float(raw_input("Enter sleep time (sec): "))

    try:
         #if the key doesn't exist create it
         if not syncdict.has_key(key):
             syncdict.update([(key, 0)])
         #increment key value every sleep seconds
         #then print syncdict
         while True:
              syncdict.update([(key, syncdict.get(key) + inc)])
              time.sleep(sleep)
              print "%s" % (syncdict)
    except KeyboardInterrupt:
         print "Killed client"

Клиент также должен создать настроенный SyncManager, регистрирующий «syncdict», на этот раз без передачи вызываемого объекта для получения общего dict. Затем он использует настроенный SycnManager для подключения с использованием IP-адреса обратной связи (127.0.0.1) к порту 5000 и авторизации, устанавливающей безопасное соединение с менеджером, запущенным в Server.py. Он извлекает общий синтаксис dict, вызывая зарегистрированного вызываемого в диспетчере. Он запрашивает у пользователя следующее:

  1. Ключ синдиката для работы на
  2. Величина приращения значения, к которому обращается ключ каждый цикл
  3. Время сна за цикл в секундах

Затем клиент проверяет, существует ли ключ. Если этого не произойдет, это создает ключ на синдикате. Затем клиент входит в «бесконечный» цикл, в котором он обновляет значение ключа с шагом, спит указанное количество и печатает синдикт только для повторения этого процесса, пока не произойдет KeyboardInterrupt (Ctrl + C).

досадные проблемы

  1. Методы регистрации менеджера ДОЛЖНЫ вызываться до запуска менеджера, в противном случае вы получите исключения, даже если вызов dir для менеджера обнаружит, что у него действительно есть метод, который был зарегистрирован.
  2. Все манипуляции с dict должны выполняться с помощью методов, а не назначения dict (syncdict ["blast"] = 2 с треском провалится из-за того, что многопроцессорная обработка разделяет пользовательские объекты)
  3. Использование метода dict в SyncManager устранит досадную проблему № 2, за исключением того, что досадная проблема № 1 не позволяет регистрировать и распространять прокси, возвращенный SyncManager.dict (). (SyncManager.dict () может быть вызван только ПОСЛЕ запуска менеджера, а регистрация будет работать только ДО запуска менеджера, поэтому SyncManager.dict () полезен только при функциональном программировании и передаче прокси в Processes в качестве аргумента, подобного примеры документов делаю)
  4. Сервер И клиент должны зарегистрироваться, хотя интуитивно кажется, что клиент сможет выяснить это только после подключения к менеджеру (добавьте это в список разработчиков многопроцессорных систем пожеланий)

Закрытие

Надеюсь, вам понравился этот довольно подробный и немного трудоемкий ответ, как и мне. У меня были большие проблемы с пониманием того, почему я так много боролся с модулем многопроцессорной обработки, где Pyro делает его быстрым, и теперь, благодаря этому ответу, я ударил ногтем по голове. Я надеюсь, что это полезно для сообщества Python о том, как улучшить многопроцессорный модуль, так как я верю, что он имеет многообещающие возможности, но в зачаточном состоянии он не соответствует тому, что возможно. Несмотря на описанные досадные проблемы, я думаю, что это все еще вполне жизнеспособная альтернатива и довольно простая. Вы также можете использовать SyncManager.dict () и передать его процессам в качестве аргумента, как показано в документах, и, возможно, это будет еще более простое решение, в зависимости от ваших требований это просто неестественно для меня.

4 голосов
/ 30 марта 2010

Я бы посвятил отдельный процесс поддержанию «общего диктата»: просто используйте, например, xmlrpclib , чтобы сделать это крошечное количество кода доступным для других процессов, предоставляя через xmlrpclib функцию, принимающую key, incrementчтобы выполнить инкремент и один, взяв только key и вернув значение, с семантическими деталями (есть ли значение по умолчанию для отсутствующих ключей и т. д., и т. д.) в зависимости от потребностей вашего приложения.

Затем вы можете использоватьлюбой подход, который вам нравится для реализации выделенного процесса shared-dict: от однопоточного сервера с простым диктом в памяти до простой sqlite DB и т. д. и т. д. и т. д. Я предлагаю вам начать с кода «так же просто, как вы».может сойти с рук "(в зависимости от того, нужен ли вам постоянный общий дикт или вам не нужно постоянство), а затем измерить и оптимизировать, как и при необходимости.

4 голосов
/ 30 марта 2010

В ответ на соответствующее решение проблемы одновременной записи. Я провел очень быстрое исследование и обнаружил, что эта статья предлагает решение для блокировки / семафора. (http://effbot.org/zone/thread-synchronization.htm)

Хотя этот пример не является специфичностью словаря, я вполне уверен, что вы можете кодировать объект-оболочку на основе классов, чтобы помочь вам работать со словарями, основанными на этой идее.

Если бы у меня было требование реализовать что-то подобное в поточно-ориентированном виде, я бы, вероятно, использовал решение семафоров Python. (Предполагая, что моя более ранняя методика слияния не сработает.) Я считаю, что семафоры обычно снижают эффективность потоков из-за их природы блокировки.

С сайта:

Семафор - это более продвинутый механизм блокировки. Семафор имеет внутренний счетчик, а не флаг блокировки, и он блокируется только в том случае, если больше чем определенное количество потоков пыталось удержать семафор. В зависимости от того, как инициализируется семафор, это позволяет нескольким потокам одновременно обращаться к одному и тому же фрагменту кода.

semaphore = threading.BoundedSemaphore()
semaphore.acquire() # decrements the counter
... access the shared resource; work with dictionary, add item or whatever.
semaphore.release() # increments the counter
3 голосов
/ 30 марта 2010

Есть ли причина, по которой словарь должен делиться в первую очередь? Могли бы вы позволить каждому потоку поддерживать свой собственный экземпляр словаря и либо объединять в конце обработки потока, либо периодически использовать обратный вызов для объединения копий отдельных словарей потоков вместе?

Я точно не знаю, что вы делаете, поэтому учтите, что мой письменный план может не работать дословно. То, что я предлагаю, - это скорее дизайнерская идея высокого уровня.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...