Ошибка тайм-аута python socket.connect в многопоточном или многопроцессорном режиме - PullRequest
7 голосов
/ 17 ноября 2010

Как и в следующем примере, я хотел бы общаться со многими ПК в определенном диапазоне IP-адресов.

My PC ---+------> Client A PC
         +------> Client B PC
         +------> Client C PC
         .................
         +------> Client Z PC

Поскольку клиентов слишком много для связи, я попробовал это с помощью многопоточности.socket.connect () постоянно выдает ошибку тайм-аута.Если я попробую его в однопоточном режиме, проблем не возникнет.

Я погуглил и обнаружил следующее:

Python Interpreter блокирует многопоточные DNS-запросы?

говорит, что на некоторой платформе модуль сокетов может быть небезопасным для потоков.

Поэтому я изменил свой код на многопроцессорную обработку.Тем не менее он по-прежнему выдает ту же ошибку.

В следующем примере кода test_single () заканчивается нормально.test_mp () и test_mt () оба делают ошибку тайм-аута.

Вы когда-нибудь испытывали такое ненормальное поведение?Среда тестирования - Windows XP SP3, python 2.5.4.Также пробовал на python 2.6.6 и 2.7.0, та же ошибка.

import multiprocessing
import Queue
import socket
import threading

PROCESS_NUM = 5
PORT = 8888

def search_proc(ip):
    try:
        csock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        csock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        csock.settimeout(5.0)
        csock.connect((ip, PORT))
        csock.shutdown(socket.SHUT_RDWR)
        csock.close()
        return ip, "ok"
    except socket.error, msg:
        return ip, "fail", msg

def mp_connect(ip_range):
    pool = multiprocessing.Pool( PROCESS_NUM )
    for output in pool.imap_unordered(search_proc, ip_range):
        print output

def test_mp():
    ip_range = []
    for i in range(256):
        ip_range.append("192.168.123.%d"%(i,))

    mp_connect(ip_range)

def test_mt():
    def search_thread(ip_queue):
        while True:
            ip = ip_queue.get()
            print search_proc(ip)
            ip_queue.task_done()
    ip_queue = Queue.Queue()

    for i in range(256):
        ip_queue.put("192.168.123.%d"%(i,))

    for i in range(PROCESS_NUM):
        th = threading.Thread(target=search_thread, args=(ip_queue,))
        th.setDaemon(True)
        th.start()

    ip_queue.join()

def test_single():
    ip_range = []
    for i in range(256):
        print search_proc("192.168.123.%d"%(i,))

if __name__ == "__main__":
    multiprocessing.freeze_support()
    test_mp()
    #test_single()
    #test_mt()

1 Ответ

5 голосов
/ 17 января 2011

Дэвид Бизли (David Beazley) провел большое исследование в области Python GIL и того, как это влияет на IO и многопоточность.Вы можете найти информацию о его исследованиях здесь , здесь .

...