Проблема с многопоточными приложениями Python и соединениями с сокетами - PullRequest
8 голосов
/ 24 января 2011

Я исследую проблему с приложением Python, работающим на машине с Ubuntu с 4 ГБ ОЗУ. Этот инструмент будет использоваться для аудита серверов (мы предпочитаем использовать наши собственные инструменты). Он использует потоки для подключения ко многим серверам, и многие из соединений TCP терпят неудачу. Однако, если я добавлю задержку в 1 секунду между стартом каждого потока, то большинство соединений будет успешным. Я использовал этот простой сценарий, чтобы выяснить, что может происходить:

#!/usr/bin/python

import sys
import socket
import threading
import time

class Scanner(threading.Thread):
    def __init__(self, host, port):
        threading.Thread.__init__(self)
        self.host = host
        self.port = port
        self.status = ""

    def run(self):
        self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sk.settimeout(20)
        try:
            self.sk.connect((self.host, self.port))
        except Exception, err:
            self.status = str(err)
        else:
            self.status = "connected"
        finally:
            self.sk.close()


def get_hostnames_list(filename):
    return open(filename).read().splitlines()

if (__name__ == "__main__"):
    hostnames_file = sys.argv[1]
    hosts_list = get_hostnames_list(hostnames_file)
    threads = []
    for host in hosts_list:
        #time.sleep(1)
        thread = Scanner(host, 443)
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()
        print "Host: ", thread.host, " : ", thread.status

Если я запускаю это с time.sleep (1), закомментированным, скажем, против 300 хостов, многие из соединений теряют связь с ошибкой тайм-аута, тогда как они не истекают, если я помещаю задержку в одну секунду. I попробовал приложение в другом дистрибутиве Linux, работающем на более мощной машине, и было не так много ошибок подключения? Это связано с ограничением ядра? Могу ли я что-нибудь сделать, чтобы соединение работало без задержки?

UPDATE

Я также пробовал программу, ограничивающую количество потоков, доступных в пуле. Сокращая это до 20, я могу заставить все соединения работать, но он проверяет только 1 хост в секунду. Поэтому, что бы я ни пытался (включив режим сна (1) или ограничив число одновременных потоков), я не могу проверять более 1 хоста каждую секунду.

UPDATE

Я только что нашел этот вопрос , который кажется похожим на то, что я вижу.

UPDATE

Интересно, поможет ли это написание витой строки? Может кто-нибудь показать, как будет выглядеть мой пример, написанный с использованием витой?

Ответы [ 5 ]

5 голосов
/ 31 января 2011

Вы можете попробовать gevent:

from gevent.pool import Pool    
from gevent import monkey; monkey.patch_all() # patches stdlib    
import sys
import logging    
from httplib import HTTPSConnection
from timeit import default_timer as timer    
info = logging.getLogger().info

def connect(hostname):
    info("connecting %s", hostname)
    h = HTTPSConnection(hostname, timeout=2)
    try: h.connect()
    except IOError, e:
        info("error %s reason: %s", hostname, e)
    else:
        info("done %s", hostname)
    finally:
        h.close()

def main():
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")    
    info("getting hostname list")
    hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
    hosts_list = open(hosts_file).read().splitlines()    
    info("spawning jobs")
    pool = Pool(20) # limit number of concurrent connections
    start = timer()
    for _ in pool.imap(connect, hosts_list):
        pass
    info("%d hosts took us %.2g seconds", len(hosts_list), timer() - start)

if __name__=="__main__":
    main()

Может обрабатывать более одного хоста в секунду.

Вывод

2011-01-31 11:08:29,052 getting hostname list
2011-01-31 11:08:29,052 spawning jobs
2011-01-31 11:08:29,053 connecting www.yahoo.com
2011-01-31 11:08:29,053 connecting www.abc.com
2011-01-31 11:08:29,053 connecting www.google.com
2011-01-31 11:08:29,053 connecting stackoverflow.com
2011-01-31 11:08:29,053 connecting facebook.com
2011-01-31 11:08:29,054 connecting youtube.com
2011-01-31 11:08:29,054 connecting live.com
2011-01-31 11:08:29,054 connecting baidu.com
2011-01-31 11:08:29,054 connecting wikipedia.org
2011-01-31 11:08:29,054 connecting blogspot.com
2011-01-31 11:08:29,054 connecting qq.com
2011-01-31 11:08:29,055 connecting twitter.com
2011-01-31 11:08:29,055 connecting msn.com
2011-01-31 11:08:29,055 connecting yahoo.co.jp
2011-01-31 11:08:29,055 connecting taobao.com
2011-01-31 11:08:29,055 connecting google.co.in
2011-01-31 11:08:29,056 connecting sina.com.cn
2011-01-31 11:08:29,056 connecting amazon.com
2011-01-31 11:08:29,056 connecting google.de
2011-01-31 11:08:29,056 connecting google.com.hk
2011-01-31 11:08:29,188 done www.google.com
2011-01-31 11:08:29,189 done google.com.hk
2011-01-31 11:08:29,224 error wikipedia.org reason: [Errno 111] Connection refused
2011-01-31 11:08:29,225 done google.co.in
2011-01-31 11:08:29,227 error msn.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,228 error live.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,250 done google.de
2011-01-31 11:08:29,262 done blogspot.com
2011-01-31 11:08:29,271 error www.abc.com reason: [Errno 111] Connection refused
2011-01-31 11:08:29,465 done amazon.com
2011-01-31 11:08:29,467 error sina.com.cn reason: [Errno 111] Connection refused
2011-01-31 11:08:29,496 done www.yahoo.com
2011-01-31 11:08:29,521 done stackoverflow.com
2011-01-31 11:08:29,606 done youtube.com
2011-01-31 11:08:29,939 done twitter.com
2011-01-31 11:08:33,056 error qq.com reason: timed out
2011-01-31 11:08:33,057 error taobao.com reason: timed out
2011-01-31 11:08:33,057 error yahoo.co.jp reason: timed out
2011-01-31 11:08:34,466 done facebook.com
2011-01-31 11:08:35,056 error baidu.com reason: timed out
2011-01-31 11:08:35,057 20 hosts took us 6 seconds
4 голосов
/ 02 февраля 2011

Интересно, поможет ли это написание витой строки?Может кто-нибудь показать, как мой пример будет выглядеть написанным с использованием витой?

Этот вариант намного быстрее, чем код, который использует gevent:

#!/usr/bin/env python
import sys
from timeit import default_timer as timer

from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.python   import log

info = log.msg

class NoopProtocol(protocol.Protocol):
    def makeConnection(self, transport):
        transport.loseConnection()

def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30):
    info("connecting %s" % host)
    cc = protocol.ClientCreator(reactor, NoopProtocol)
    d = cc.connectSSL(host, port, contextFactory, timeout)
    d.addCallbacks(lambda _: info("done %s" % host),
                   lambda f: info("error %s reason: %s" % (host, f.value)))
    return d

def n_at_a_time(it, n):
    """Iterate over `it` concurently `n` items at a time.

    `it` - an iterator creating Deferreds
    `n`  - number of concurrent iterations
    return a deferred that fires on completion
    """
    return defer.DeferredList([task.coiterate(it) for _ in xrange(n)])

def main():
    try:
        log.startLogging(sys.stderr, setStdout=False)

        info("getting hostname list")
        hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
        hosts_list = open(hosts_file).read().splitlines()

        info("spawning jobs")
        start = timer()        
        jobs = (connect(host, 443, timeout=2) for host in hosts_list)
        d = n_at_a_time(jobs, n=20) # limit number of simultaneous connections
        d.addCallback(lambda _: info("%d hosts took us %.2g seconds" % (
            len(hosts_list), timer() - start)))
        d.addBoth(lambda _: (info("the end"), reactor.stop()))
    except:
        log.err()
        reactor.stop()

if __name__=="__main__":
    reactor.callWhenRunning(main)
    reactor.run()

Вот вариант, который использует t.i.d.inlineCallbacks.Требуется Python 2.5 или новее.Позволяет писать асинхронный код синхронным (блокирующим) способом:

#!/usr/bin/env python
import sys
from timeit import default_timer as timer

from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.python   import log

info = log.msg

class NoopProtocol(protocol.Protocol):
    def makeConnection(self, transport):
        transport.loseConnection()

@defer.inlineCallbacks
def connect(host, port, contextFactory=ssl.ClientContextFactory(), timeout=30):
    info("connecting %s" % host)
    cc = protocol.ClientCreator(reactor, NoopProtocol)
    try:
        yield cc.connectSSL(host, port, contextFactory, timeout)
    except Exception, e:
        info("error %s reason: %s" % (host, e))
    else:
        info("done %s" % host)

def n_at_a_time(it, n):
    """Iterate over `it` concurently `n` items at a time.

    `it` - an iterator creating Deferreds
    `n`  - number of concurrent iterations
    return a deferred that fires on completion
    """
    return defer.DeferredList([task.coiterate(it) for _ in xrange(n)])

@defer.inlineCallbacks
def main():
    try:
        log.startLogging(sys.stderr, setStdout=False)

        info("getting hostname list")
        hosts_file = sys.argv[1] if len(sys.argv) > 1 else "hosts.txt"
        hosts_list = open(hosts_file).read().splitlines()

        info("spawning jobs")
        start = timer()        
        jobs = (connect(host, 443, timeout=2) for host in hosts_list)
        yield n_at_a_time(jobs, n=20) # limit number of simultaneous connections
        info("%d hosts took us %.2g seconds" % (len(hosts_list), timer()-start))
        info("the end")
    except:
        log.err()
    finally:
        reactor.stop()

if __name__=="__main__":
    reactor.callWhenRunning(main)
    reactor.run()
3 голосов
/ 21 декабря 2013

В Python 3.4 представлен новый временный API для асинхронного ввода-вывода - asyncio module .

Этот подход похож на twisted ответ на основе :

#!/usr/bin/env python3.4
import asyncio
import logging
from contextlib import closing

class NoopProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        transport.close()

info = logging.getLogger().info

@asyncio.coroutine
def connect(loop, semaphor, host, port=443, ssl=True, timeout=15):
    try:
        with (yield from semaphor):
            info("connecting %s" % host)
            done, pending = yield from asyncio.wait(
                [loop.create_connection(NoopProtocol, host, port, ssl=ssl)],
                loop=loop, timeout=timeout)
            if done:
                next(iter(done)).result()
    except Exception as e:
        info("error %s reason: %s" % (host, e))
    else:
        if pending:
            info("error %s reason: timeout" % (host,))
            for ft in pending:
                ft.cancel()
        else:
            info("done %s" % host)

@asyncio.coroutine
def main(loop):
    logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
    limit, timeout, hosts = parse_cmdline()

    # connect `limit` concurrent connections
    sem = asyncio.BoundedSemaphore(limit)
    coros = [connect(loop, sem, host, timeout=timeout) for host in hosts]
    if coros:
        yield from asyncio.wait(coros, loop=loop)

if __name__=="__main__":
    with closing(asyncio.get_event_loop()) as loop:
        loop.run_until_complete(main(loop))

Как и вариант twisted, он использует NoopProtocol, который ничего не делает, но немедленно отключается при успешном соединении.

Количество одновременных подключений ограничено семафором.

Код на основе сопрограмм .

Пример

Чтобы узнать, сколько успешных ssl-соединений мы можем установить с первыми 1000 хостами из списка лучших миллионов Alexa:

$ curl -O http://s3.amazonaws.com/alexa-static/top-1m.csv.zip
$ unzip *.zip
$ /usr/bin/time perl -nE'say $1 if /\d+,([^\s,]+)$/' top-1m.csv | head -1000 |\
    python3.4 asyncio_ssl.py - --timeout 60 |& tee asyncio.log

Результат - менее половины всех соединений успешны. В среднем он проверяет ~ 20 хостов в секунду. Время ожидания для многих сайтов истекло. Если хост не совпадает с именами хостов из сертификата сервера, соединение также не будет установлено. Он включает example.com против www.example.com -подобных сравнений.

3 голосов
/ 24 января 2011

Как насчет реального пула потоков?

#!/usr/bin/env python3

# http://code.activestate.com/recipes/577187-python-thread-pool/

from queue import Queue
from threading import Thread

class Worker(Thread):
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try: func(*args, **kargs)
            except Exception as exception: print(exception)
            self.tasks.task_done()

class ThreadPool:
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads): Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        self.tasks.join()

Пример:

import threadpool
pool = threadpool.ThreadPool(20) # 20 threads
pool.add_task(print, "test")
pool.wait_completion()

Это в Python 3, но не должно быть слишком сложно конвертировать в 2.x. Я не удивлен, если это решит вашу проблему.

0 голосов
/ 16 октября 2013

Прежде всего, попробуйте использовать неблокирующие сокеты. Другая причина может заключаться в том, что вы потребляете все временные порты. Попробуйте снять ограничение на это.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...