Как получить возвращаемое значение из потока в Python? - PullRequest
260 голосов
/ 01 августа 2011

Функция foo ниже возвращает строку 'foo'. Как я могу получить значение 'foo', которое возвращается из цели потока?

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

«Один очевидный способ сделать это», показанный выше, не работает: thread.join() вернул None.

Ответы [ 21 ]

2 голосов
/ 19 марта 2016

Вы можете определить изменяемую область выше области действия многопоточной функции и добавить к ней результат. (Я также изменил код для совместимости с python3)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

Возвращает {'world!': 'foo'}

Если вы используете функцию ввода в качестве ключа к вашим результатам, каждый уникальный вход гарантированно даст запись в результатах

2 голосов
/ 01 августа 2011

join всегда возвращают None, я думаю, вы должны создать подкласс Thread для обработки кодов возврата и т. Д.

1 голос
/ 18 августа 2016

Я использую эту оболочку, которая удобно превращает любую функцию для запуска в Thread - заботясь о ее возвращаемом значении или исключении. Это не добавляет Queue накладных расходов.

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Примеры использования

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Примечания по threading модулю

Удобное возвращаемое значение и обработка исключений для многопоточной функции - это частая «Pythonic» потребность, и она действительно должна уже предлагаться модулем threading - возможно, непосредственно в стандартном классе Thread. ThreadPool имеет слишком много накладных расходов для простых задач - 3 управления потоками, много бюрократии. К сожалению, макет Thread изначально был скопирован с Java - что вы видите, например. из все еще бесполезного 1-го (!) параметра конструктора group.

1 голос
/ 17 апреля 2018

Идея GuySoft великолепна, но я думаю, что объект не обязательно должен наследоваться от Thread, и start () может быть удален из интерфейса:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo
1 голос
/ 08 января 2018

Как уже упоминалось, многопроцессорный пул намного медленнее, чем базовая многопоточность.Использование очередей, предложенных в некоторых ответах, является очень эффективной альтернативой.Я использую его со словарями, чтобы иметь возможность запускать множество небольших потоков и восстанавливать несколько ответов, комбинируя их со словарями:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)
0 голосов
/ 26 декабря 2018

Если из вызова функции нужно проверить только True или False, я нашел бы более простое решение - обновить глобальный список.

import threading

lists = {"A":"True", "B":"True"}

def myfunc(name: str, mylist):
    for i in mylist:
        if i == 31:
            lists[name] = "False"
            return False
        else:
            print("name {} : {}".format(name, i))

t1 = threading.Thread(target=myfunc, args=("A", [1, 2, 3, 4, 5, 6], ))
t2 = threading.Thread(target=myfunc, args=("B", [11, 21, 31, 41, 51, 61], ))
t1.start()
t2.start()
t1.join()
t2.join()

for value in lists.values():
    if value == False:
        # Something is suspicious 
        # Take necessary action 

Это более полезно, если вы хотите узнать, вернул ли какой-либо из потоков ложное состояние, чтобы предпринять необходимые действия.

0 голосов
/ 26 мая 2018

ответ Киндалла в Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return
0 голосов
/ 18 декабря 2015

Определите вашу цель на
1) принять аргумент q
2) заменить любые утверждения return foo на q.put(foo); return

так что функция

def func(a):
    ans = a * a
    return ans

станет

def func(a, q):
    ans = a * a
    q.put(ans)
    return

и тогда вы будете действовать так же

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

И вы можете использовать функциональные декораторы / обертки, чтобы сделать так, чтобы вы могли использовать ваши существующие функции как target без их изменения, но следуйте этой базовой схеме.

0 голосов
/ 10 февраля 2017

Одно из обычных решений - обернуть вашу функцию foo с помощью декоратора, например

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

Тогда весь код может выглядеть следующим образом

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Примечание

Одна важная проблема заключается в том, что возвращаемые значения могут быть неупорядоченными .(На самом деле return value не обязательно сохраняется в queue, поскольку вы можете выбрать произвольную поточно-ориентированную структуру данных)

0 голосов
/ 15 ноября 2017

Очень простой способ сделать это для таких чайников, как я:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self):
        # in this class and function we will put our test target function
        test()

t = AnyThread()

# having our test target function
def test():
    # do something in this function:
    result = 3 + 2
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run()
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 5
>>> 5

Главное здесь - это queue модуль. Мы создаем queue.Queue() экземпляр и включаем его в нашу функцию. Мы подаем его с нашим результатом, который позже мы выходим за пределы потока.

Пожалуйста, посмотрите еще один пример с аргументами, переданными нашей тестовой функции:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self, a, b):
        # in this class and function we will put our execution test function
        test(a, b)

t = AnyThread()

# having our test target function
def test(a, b):
    # do something in this function:
    result = a + b
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run(3+i, 2+i)
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 7
>>> 9
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...