Как получить возвращаемое значение из потока в Python? - PullRequest
260 голосов
/ 01 августа 2011

Функция foo ниже возвращает строку 'foo'. Как я могу получить значение 'foo', которое возвращается из цели потока?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

«Один очевидный способ сделать это», показанный выше, не работает: thread.join() вернул None.

Ответы [ 21 ]

242 голосов
/ 13 января 2013

FWIW, модуль multiprocessing имеет хороший интерфейс для этого, используя класс Pool.И если вы хотите придерживаться потоков, а не процессов, вы можете просто использовать класс multiprocessing.pool.ThreadPool в качестве замены.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.
195 голосов
/ 01 августа 2011

Один из способов, которые я видел, - это передать изменяемый объект, такой как список или словарь, конструктору потока вместе с индексом или другим идентификатором некоторого вида.Затем поток может сохранить свои результаты в своем выделенном слоте в этом объекте.Например:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

Если вы действительно хотите, чтобы join() вернул возвращаемое значение вызванной функции, вы можете сделать это с помощью подкласса Thread, например:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

Это немного затруднительно из-за некоторого искажения имен, и оно получает доступ к "частным" структурам данных, которые специфичны для реализации Thread ... но это работает.

Для python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return
76 голосов
/ 15 января 2013

Ответ Джейка хорош, но если вы не хотите использовать пул потоков (вы не знаете, сколько потоков вам понадобится, но создаете их по мере необходимости), тогда хорошим способом передачи информации между потоками является встроенный класс Queue.Queue , поскольку он обеспечивает безопасность потоков.

Я создал следующий декоратор, чтобы он действовал аналогично пулу потоков:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

Тогда вы просто используете его как:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

Декорированная функция создает новый поток при каждом вызове и возвращает объект Thread, содержащий очередь, которая получит результат.

UPDATE

Прошло довольно много времени с тех пор, как я опубликовал этот ответ, но он все еще получает представления, поэтому я решил обновить его, чтобы отразить способ, которым я делаю это в более новых версиях Python:

Python 3.2 добавлен в модуль concurrent.futures, который обеспечивает высокоуровневый интерфейс для параллельных задач. Он предоставляет ThreadPoolExecutor и ProcessPoolExecutor, поэтому вы можете использовать пул потоков или процессов с одинаковыми API.

Одним из преимуществ этого API-интерфейса является то, что отправка задачи в Executor возвращает объект Future, который будет дополнен возвращаемым значением вызываемого вами запроса.

Это делает ненужным присоединение объекта queue, что немного упрощает декоратор:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

При этом будет использоваться модуль по умолчанию, если он не был передан.

Использование очень похоже на ранее:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

Если вы используете Python 3.4+, одна действительно приятная особенность использования этого метода (и объектов Future в целом) заключается в том, что возвращаемое будущее можно обернуть, чтобы превратить его в asyncio.Future с asyncio.wrap_future. Это позволяет легко работать с сопрограммами:

result = await asyncio.wrap_future(long_task(10))

Если вам не нужен доступ к базовому объекту concurrent.Future, вы можете включить перенос в декоратор:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

Затем, когда вам нужно вытолкнуть интенсивно работающий процессор или блокировать код из потока цикла событий, вы можете поместить его в декорированную функцию:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
32 голосов
/ 29 апреля 2016

Другое решение, которое не требует изменения существующего кода:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

Его также можно легко настроить для многопоточной среды:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result
19 голосов
/ 31 октября 2016

ответ Parris / kindall join / return, портированный на Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

Обратите внимание, класс Thread реализован по-другому в Python 3.

17 голосов
/ 01 августа 2014

Я украл ответ Уиндалла и немного его почистил.

Ключевая часть добавляет * args и ** kwargs в join () для обработки времени ожидания

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

ОБНОВЛЕНИЕ ОТВЕТА НИЖЕ

Это мой самый популярный ответ, поэтому я решил обновить код, который будет работать как на py2, так и на py3.

Кроме того, я вижу много ответов на этот вопрос, которые показывают отсутствие понимания в отношении Thread.join (). Некоторым совершенно не удается обработать аргумент timeout. Но есть также угловой случай, о котором вам следует знать в отношении случаев, когда у вас есть (1) целевая функция, которая может возвращать None, и (2) вы также передаете аргумент timeout в join (). Пожалуйста, смотрите "ТЕСТ 4", чтобы понять этот угловой случай.

Класс ThreadWithReturn, который работает с py2 и py3:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

if sys.version_info >= (3, 0):
    _thread_target_key = '_target'
    _thread_args_key = '_args'
    _thread_kwargs_key = '_kwargs'
else:
    _thread_target_key = '_Thread__target'
    _thread_args_key = '_Thread__args'
    _thread_kwargs_key = '_Thread__kwargs'

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key))

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

Некоторые примеры испытаний приведены ниже:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

Можете ли вы определить угловой случай, с которым мы можем столкнуться при проведении ТЕСТА 4?

Проблема в том, что мы ожидаем, что метод giveMe () вернет None (см. ТЕСТ 2), но мы также ожидаем, что join () вернет None, если время ожидания истекло.

returned is None означает либо:

(1) это то, что вернул giveMe (), или

(2) истекло время ожидания соединения ()

Этот пример тривиален, поскольку мы знаем, что метод giveMe () всегда будет возвращать None. Но в реальном случае (когда цель может законно вернуть None или что-то еще), мы бы хотели явно проверить, что произошло.

Ниже описано, как решить этот угловой случай:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()
10 голосов
/ 25 ноября 2017

Использование очереди:

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
6 голосов
/ 01 мая 2013

Мое решение проблемы - обернуть функцию и поток в класс. Не требует использования пулов, очередей или передачи переменных типа c. Это также не блокирует. Вместо этого вы проверяете статус. Смотрите пример того, как использовать его в конце кода.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
3 голосов
/ 18 ноября 2017

Принимая во внимание @ iman комментарий к @ JakeBiesinger ответ Я рекомендовал иметь разное количество потоков:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

Приветствия

Guy.

2 голосов
/ 27 мая 2016

Вы можете использовать Пул в качестве пула рабочих процессов, как показано ниже:

from multiprocessing import Pool


def f1(x, y):
    return x*y


if __name__ == '__main__':
    with Pool(processes=10) as pool:
        result = pool.apply(f1, (2, 3))
        print(result)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...