Асинхронный вызов метода в Python? - PullRequest
163 голосов
/ 06 августа 2009

Мне было интересно, есть ли какая-нибудь библиотека для асинхронных вызовов методов в Python . Было бы здорово, если бы вы могли сделать что-то вроде

@async
def longComputation():
    <code>


token = longComputation()
token.registerCallback(callback_function)
# alternative, polling
while not token.finished():
    doSomethingElse()
    if token.finished():
        result = token.result()

Или для асинхронного вызова не асинхронной процедуры

def longComputation()
    <code>

token = asynccall(longComputation())

Было бы замечательно иметь более утонченную стратегию в качестве родной для языкового ядра. Было ли это учтено?

Ответы [ 12 ]

194 голосов
/ 06 августа 2009

Что-то вроде:

import threading

thr = threading.Thread(target=foo, args=(), kwargs={})
thr.start() # Will run "foo"
....
thr.is_alive() # Will return whether foo is running currently
....
thr.join() # Will wait till "foo" is done

См. Документацию в https://docs.python.org/2/library/threading.html#module-threading для получения более подробной информации; этот код должен работать и для Python 3.

133 голосов
/ 06 августа 2009

Вы можете использовать многопроцессорный модуль , добавленный в Python 2.6. Вы можете использовать пулы процессов и затем асинхронно получать результаты с помощью:

apply_async(func[, args[, kwds[, callback]]])

например:.

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=1)              # Start a worker processes.
    result = pool.apply_async(f, [10], callback) # Evaluate "f(10)" asynchronously calling callback when finished.

Это только одна альтернатива. Этот модуль предоставляет множество возможностей для достижения того, что вы хотите. Также из этого будет действительно легко сделать декоратор.

42 голосов
/ 29 марта 2016

Начиная с Python 3.5, вы можете использовать расширенные генераторы для асинхронных функций.

import asyncio
import datetime

Улучшенный синтаксис генератора:

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

Новый async/await синтаксис:

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)


loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()
30 голосов
/ 06 августа 2009

Это не в языковом ядре, а очень зрелая библиотека, которая делает то, что вы хотите, это Twisted . Он вводит объект Deferred, к которому вы можете прикрепить обратные вызовы или обработчики ошибок («errbacks»). Deferred - это, по сути, «обещание», что функция в конечном итоге даст результат.

19 голосов
/ 30 марта 2011

Вы можете реализовать декоратор, чтобы сделать ваши функции асинхронными, хотя это немного сложно. Модуль multiprocessing полон небольших изысков и, казалось бы, произвольных ограничений - тем не менее, еще одна причина заключать его в дружественный интерфейс.

from inspect import getmodule
from multiprocessing import Pool


def async(decorated):
    r'''Wraps a top-level function around an asynchronous dispatcher.

        when the decorated function is called, a task is submitted to a
        process pool, and a future object is returned, providing access to an
        eventual return value.

        The future object has a blocking get() method to access the task
        result: it will return immediately if the job is already done, or block
        until it completes.

        This decorator won't work on methods, due to limitations in Python's
        pickling machinery (in principle methods could be made pickleable, but
        good luck on that).
    '''
    # Keeps the original function visible from the module global namespace,
    # under a name consistent to its __name__ attribute. This is necessary for
    # the multiprocessing pickling machinery to work properly.
    module = getmodule(decorated)
    decorated.__name__ += '_original'
    setattr(module, decorated.__name__, decorated)

    def send(*args, **opts):
        return async.pool.apply_async(decorated, args, opts)

    return send

Код ниже иллюстрирует использование декоратора:

@async
def printsum(uid, values):
    summed = 0
    for value in values:
        summed += value

    print("Worker %i: sum value is %i" % (uid, summed))

    return (uid, summed)


if __name__ == '__main__':
    from random import sample

    # The process pool must be created inside __main__.
    async.pool = Pool(4)

    p = range(0, 1000)
    results = []
    for i in range(4):
        result = printsum(i, sample(p, 100))
        results.append(result)

    for result in results:
        print("Worker %i: sum value is %i" % result.get())

В реальном случае я бы немного подробнее остановился на декораторе, предоставив какой-то способ отключить его для отладки (при сохранении будущего интерфейса на месте) или, возможно, средство для работы с исключениями; но я думаю, что это достаточно хорошо демонстрирует принцип.

15 голосов
/ 08 октября 2012

Just

import threading, time

def f():
    print "f started"
    time.sleep(3)
    print "f finished"

threading.Thread(target=f).start()
7 голосов
/ 09 января 2014

Вы можете использовать eventlet. Он позволяет вам писать то, что кажется синхронным кодом, но работает асинхронно по сети.

Вот пример супер минимального сканера:

urls = ["http://www.google.com/intl/en_ALL/images/logo.gif",
     "https://wiki.secondlife.com/w/images/secondlife.jpg",
     "http://us.i1.yimg.com/us.yimg.com/i/ww/beta/y3.gif"]

import eventlet
from eventlet.green import urllib2

def fetch(url):

  return urllib2.urlopen(url).read()

pool = eventlet.GreenPool()

for body in pool.imap(fetch, urls):
  print "got body", len(body)
7 голосов
/ 09 марта 2011

Мое решение:

import threading

class TimeoutError(RuntimeError):
    pass

class AsyncCall(object):
    def __init__(self, fnc, callback = None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        self.Thread = threading.Thread(target = self.run, name = self.Callable.__name__, args = args, kwargs = kwargs)
        self.Thread.start()
        return self

    def wait(self, timeout = None):
        self.Thread.join(timeout)
        if self.Thread.isAlive():
            raise TimeoutError()
        else:
            return self.Result

    def run(self, *args, **kwargs):
        self.Result = self.Callable(*args, **kwargs)
        if self.Callback:
            self.Callback(self.Result)

class AsyncMethod(object):
    def __init__(self, fnc, callback=None):
        self.Callable = fnc
        self.Callback = callback

    def __call__(self, *args, **kwargs):
        return AsyncCall(self.Callable, self.Callback)(*args, **kwargs)

def Async(fnc = None, callback = None):
    if fnc == None:
        def AddAsyncCallback(fnc):
            return AsyncMethod(fnc, callback)
        return AddAsyncCallback
    else:
        return AsyncMethod(fnc, callback)

И работает точно так, как запрошено:

@Async
def fnc():
    pass
5 голосов
/ 03 марта 2014

Что-то вроде этого у меня работает, затем вы можете вызвать функцию, и она отправит себя в новый поток.

from thread import start_new_thread

def dowork(asynchronous=True):
    if asynchronous:
        args = (False)
        start_new_thread(dowork,args) #Call itself on a new thread.
    else:
        while True:
            #do something...
            time.sleep(60) #sleep for a minute
    return
2 голосов
/ 22 июля 2017

Вы можете использовать concurrent.futures (добавлено в Python 3.2).

import time
from concurrent.futures import ThreadPoolExecutor


def long_computation(duration):
    for x in range(0, duration):
        print(x)
        time.sleep(1)
    return duration * 2


print('Use polling')
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(long_computation, 5)
    while not future.done():
        print('waiting...')
        time.sleep(0.5)

    print(future.result())

print('Use callback')
executor = ThreadPoolExecutor(max_workers=1)
future = executor.submit(long_computation, 5)
future.add_done_callback(lambda f: print(f.result()))

print('waiting for callback')

executor.shutdown(False)  # non-blocking

print('shutdown invoked')
...