Многопроцессорность и сокеты в Python - PullRequest
4 голосов
/ 17 декабря 2011

Я пытаюсь совместить многопроцессорное и сокетное программирование, но я застрял на этом этапе. Проблема в том, что я получаю эту ошибку:

  File "multiprocesssockserv.py", line 11, in worker
    clientsocket = socket.fromfd(clientfileno, socket.AF_INET, socket.SOCK_STREAM)
error: [Errno 9] Bad file descriptor

Полный код, вызвавший ошибку, выглядит следующим образом:

import multiprocessing as mp
import logging
import socket

logger = mp.log_to_stderr(logging.WARN)

def worker(queue):
    while True:
        clientfileno = queue.get()
        print clientfileno
        clientsocket = socket.fromfd(clientfileno, socket.AF_INET, socket.SOCK_STREAM)
        clientsocket.recv()
        clientsocket.send("Hello World")
        clientsocket.close()

if __name__ == '__main__':
    num_workers = 5
    socket_queue = mp.Queue()
    workers = [mp.Process(target=worker, args=(socket_queue,)) for i in
            range(num_workers)]

    for p in workers:
        p.daemon = True
        p.start()

    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.bind(('',9090))
    serversocket.listen(5)
    while True:
        client, address = serversocket.accept()
        socket_queue.put(client.fileno())

edit: я использую socket.fromfd, потому что не могу поместить сокеты в очередь :) Мне нужен способ получить доступ к одним и тем же сокетам из разных процессов. В этом суть моей проблемы.

Ответы [ 3 ]

7 голосов
/ 17 декабря 2011

Поработав над этим некоторое время, я решил подойти к этой проблеме под другим углом, и мне кажется, что мне подходит следующий метод.

import multiprocessing as mp
import logging
import socket
import time

logger = mp.log_to_stderr(logging.DEBUG)

def worker(socket):
    while True:
        client, address = socket.accept()
        logger.debug("{u} connected".format(u=address))
        client.send("OK")
        client.close()
if __name__ == '__main__':
    num_workers = 5

    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    serversocket.bind(('',9090))
    serversocket.listen(5)

    workers = [mp.Process(target=worker, args=(serversocket,)) for i in
            range(num_workers)]

    for p in workers:
        p.daemon = True
        p.start()

    while True:
        try:
            time.sleep(10)
        except:
            break
3 голосов
/ 03 августа 2012

Я не эксперт, поэтому не могу дать реального объяснения, но если вы хотите использовать очереди, вам нужно уменьшить дескриптор, а затем воссоздать его:

в вашем основном :

client, address = serversocket.accept()
client_handle = multiprocessing.reduction.reduce_handle(client.fileno())
socket_queue.put(client_handle)

и у вашего работника:

clientHandle = queue.get()
file_descriptor = multiprocessing.reduction.rebuild_handle(client_handle)
clientsocket = socket.fromfd(file_descriptor, socket.AF_INET, socket.SOCK_STREAM)

также

import multiprocessing.reduction

Это будет работать с вашим исходным кодом. Однако в настоящее время у меня возникают проблемы с закрытием сокетов в рабочих процессах после их создания, как я описал.

0 голосов
/ 05 февраля 2013

Вот некоторый рабочий код для вышеупомянутого - https://gist.github.com/sunilmallya/4662837 сервер сокетов multiprocessing.reduction с родительской обработкой, передающей соединения клиенту после приема соединений

...