Python многопроцессорная связь с экземплярами SocketServer - PullRequest
1 голос
/ 07 января 2012

У меня есть набор процессов, назовем их A, B и C, которые должны взаимодействовать друг с другом.А должен общаться с В и С;B должен общаться с A и C;и C должен взаимодействовать с A и B. A, B и C могут быть расположены на разных машинах или на одной машине.

Моя мысль заключалась в том, чтобы общаться через сокеты и использовать «localhost», если они все на одном компьютере (например, A на порту 11111, B на порту 22222 и т. Д.).Таким образом, нелокальный процесс будет рассматриваться как локальный процесс.Чтобы сделать это, я подумал, что настрою экземпляр SocketServer для каждого из A, B и C, и каждый из них будет знать адреса двух других.Когда бы ни требовалось установить связь, например, от А до В, А открывал сокет для В и записывал данные.Затем постоянно работающий сервер B считывает данные и сохраняет их в списке для последующего использования при необходимости.

Проблема, с которой я сталкиваюсь, заключается в том, что хранимая информация не передается между finish_requestметод (который обрабатывает прослушивание) и метод __call__ (который обрабатывает разговор).(Серверный класс вызывается, потому что мне это нужно для чего-то другого. Я не верю, что это имеет отношение к проблеме.)

Мой вопрос, будет ли это работать так, как я себе представлял?Будут ли multiprocessing, threading и socketserver хорошо играть вместе на одной машине?Я не заинтересован в использовании других механизмов для взаимодействия между процессами (например, Queue или Pipe).У меня есть рабочее решение с теми.Я хочу знать, возможен ли такой подход, даже если он менее эффективен.И, если это так, что я делаю неправильно, что мешает ему работать?

Ниже приведен минимальный пример, иллюстрирующий проблему:

import uuid
import sys
import socket
import time
import threading
import collections
import SocketServer
import multiprocessing

class NetworkMigrator(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
    def __init__(self, server_address, client_addresses, max_migrants=1):
        SocketServer.TCPServer.__init__(self, server_address, None)
        self.client_addresses = client_addresses
        self.migrants = collections.deque(maxlen=max_migrants)
        self.allow_reuse_address = True
        t = threading.Thread(target=self.serve_forever)
        t.daemon = True
        t.start()

    def finish_request(self, request, client_address):
        try:
            rbufsize = -1
            wbufsize = 0
            rfile = request.makefile('rb', rbufsize)
            wfile = request.makefile('wb', wbufsize)

            data = rfile.readline().strip()
            self.migrants.append(data)
            print("finish_request::  From: %d  To: %d  MID: %d  Size: %d -- %s" % (client_address[1], 
                                                                                   self.server_address[1], 
                                                                                   id(self.migrants), 
                                                                                   len(self.migrants), 
                                                                                   data))

            if not wfile.closed:
                wfile.flush()
            wfile.close()
            rfile.close()        
        finally:
            sys.exc_traceback = None

    def __call__(self, random, population, args):
        client_address = random.choice(self.client_addresses)
        migrant_index = random.randint(0, len(population) - 1)
        data = population[migrant_index]
        data = uuid.uuid4().hex
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            sock.connect(client_address)
            sock.send(data + '\n')
        finally:
            sock.close()
        print("      __call__::  From: %d  To: %d  MID: %d  Size: %d -- %s" % (self.server_address[1], 
                                                                               client_address[1], 
                                                                               id(self.migrants), 
                                                                               len(self.migrants), 
                                                                               data))
        if len(self.migrants) > 0:
            migrant = self.migrants.popleft()
            population[migrant_index] = migrant
        return population


def run_it(migrator, rand, pop):
    for i in range(10):
        pop = migrator(r, pop, {})
        print("        run_it::  Port: %d  MID: %d  Size: %d" % (migrator.server_address[1], 
                                                                 id(migrator.migrants), 
                                                                 len(migrator.migrants)))
        time.sleep(1)


if __name__ == '__main__':
    import random
    r = random.Random()
    a = ('localhost', 11111)
    b = ('localhost', 22222)
    c = ('localhost', 33333)
    am = NetworkMigrator(a, [b, c], max_migrants=11)
    bm = NetworkMigrator(b, [a, c], max_migrants=22)
    cm = NetworkMigrator(c, [a, b], max_migrants=33)

    fun = [am, bm, cm]
    pop = [["larry", "moe", "curly"], ["red", "green", "blue"], ["small", "medium", "large"]]
    jobs = []
    for f, p in zip(fun, pop):
        pro = multiprocessing.Process(target=run_it, args=(f, r, p))
        jobs.append(pro)
        pro.start()
    for j in jobs:
        j.join()
    am.shutdown()
    bm.shutdown()
    cm.shutdown()

Просмотр результатов этого примера, будет три типа печати:

        run_it::  Port: 11111  MID: 3071227860  Size: 0
      __call__::  From: 11111  To: 22222  MID: 3071227860  Size: 0 -- e00e0891e0714f99b86e9ad743731a00
finish_request::  From: 60782  To: 22222  MID: 3071227972  Size: 10 -- e00e0891e0714f99b86e9ad743731a00

"MID" - это id, если в этом случае используется migrants deque.«От» и «Кому» - порты, отправляющие / получающие передачу.И я просто устанавливаю данные как случайную шестнадцатеричную строку прямо сейчас, чтобы я мог отслеживать отдельные передачи.

Я не понимаю, почему, даже с тем же MID, в какой-то момент он скажет, чтоего размер не равен нулю, а затем он скажет, что его размер равен 0. Я чувствую, что это должно происходить из-за того, что вызовы многопоточные.Если эти строки используются вместо последних 2 for циклов, система работает так, как я ожидал:

for _ in range(10):
    for f, p in zip(fun, pop):
        f(r, p, {})
        time.sleep(1)

Так что же происходит с многопроцессорной версией, которая ее ломает?

1 Ответ

1 голос
/ 23 февраля 2012

Когда мы создаем 3 новых объекта NetworkMigrator, запускаются 3 новых потока, каждый из которых прослушивает новые соединения TCP.Позже мы запускаем 3 новых процесса для функции run_it.Всего у нас 4 процесса, первый из которых содержит 4 потока (1 основной + 3 сервера).Теперь проблема в том, что другие 3 процесса не будут иметь доступа к изменениям, внесенным объектами прослушивающими потоками сервера.Это потому, что процессы не разделяют память по умолчанию.

Итак, если вы запустите 3 новых потока вместо процессов, вы заметите разницу:

pro = threading.Thread(target=run_it,args=(f,r,p))

Есть еще одна небольшая проблема.Это разделение между потоками также не совсем безопасно.Лучше всего использовать блокировки всякий раз, когда мы меняем состояние объектов.Лучше всего сделать что-то подобное ниже в методах finish_request и call .

lock = Lock()
...
lock.acquire()    
self.migrants.append(data)
lock.release()

Если вы недовольны многопоточностью и хотите многопроцессорность, вы можете использовать прокси-объекты, как описано здесь: http://docs.python.org/library/multiprocessing.html#proxy-objects

Что касается идентификатора объекта, то это не является неожиданным.Новые процессы передаются в состояниях объектов (включая идентификатор объекта) в этот момент времени.Новый процесс продолжает сохранять эти идентификаторы объектов, но мы говорим здесь о двух совершенно разных пространствах памяти, поскольку они являются разными процессами.Таким образом, любые изменения, сделанные основным процессом, не будут отражены в созданных подпроцессах.

...