Поток Python. Потоки могут быть остановлены только с помощью закрытого метода self. Thread_stop () - PullRequest
6 голосов
/ 07 октября 2011

У меня есть функция, которая принимает большой массив пар x, y в качестве входных данных, который выполняет некоторую сложную подгонку кривой, используя numpy и scipy, а затем возвращает одно значение. Чтобы попытаться ускорить процесс, я пытаюсь создать два потока, которым я передаю данные, используя Queue.Queue. После того, как данные сделаны. Я пытаюсь завершить потоки, а затем завершить вызывающий процесс и вернуть управление оболочке.

Я пытаюсь понять, почему я должен прибегнуть к закрытому методу в потоке. Постарайтесь остановить мои потоки и вернуть управление в командную строку.

self.join () не завершает программу. Единственный способ вернуть контроль - использовать метод приватной остановки.

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            # self.join(timeout=None) does not work
            self._Thread__stop()

Вот примерный код моего кода:

    class CalcThread(threading.Thread):
        def __init__(self,in_queue,out_queue,function):
            threading.Thread.__init__(self)
            self.in_queue = in_queue
            self.out_queue = out_queue
            self.function = function
            self.finished = threading.Event()

        def stop(self):
            print "STOP CALLED"
            self.finished.set()
            print "SET DONE"
            self._Thread__stop()

        def run(self):
            while not self.finished.isSet():
                params_for_function = self.in_queue.get()
                try:
                    tm = self.function(paramsforfunction)
                    self.in_queue.task_done()
                    self.out_queue.put(tm)
                except ValueError as v:
                    #modify params and reinsert into queue
                    window = params_for_function["window"]
                    params_for_function["window"] = window + 1
                    self.in_queue.put(params_for_function)

    def big_calculation(well_id,window,data_arrays):
            # do some analysis to calculate tm
            return tm

    if __name__ == "__main__":
        NUM_THREADS = 2
        workers = []
        in_queue = Queue()
        out_queue = Queue()

        for i in range(NUM_THREADS):
            w = CalcThread(in_queue,out_queue,big_calculation)
            w.start()
            workers.append(w)

        if options.analyze_all:
              for i in well_ids:
                  in_queue.put(dict(well_id=i,window=10,data_arrays=my_data_dict))

        in_queue.join()
        print "ALL THREADS SEEM TO BE DONE"
        # gather data and report it from out_queue
        for i in well_ids:
            p = out_queue.get()
            print p
            out_queue.task_done()
            # I had to do this to get the out_queue to proceed
            if out_queue.qsize() == 0:
                out_queue.join()
                break
# Calling this stop method does not seem to return control to the command line unless I use threading.Thread private method

        for aworker in workers:
            aworker.stop()

Ответы [ 3 ]

5 голосов
/ 07 октября 2011

В общем, плохая идея - убить поток, который изменяет общий ресурс.

Задачи с интенсивным использованием ЦП в нескольких потоках хуже, чем бесполезные в Python, если вы не освободите GIL во время выполнения вычислений.Многие numpy функции do выпускают GIL.

ThreadPoolExecutor пример из документов

import concurrent.futures # on Python 2.x: pip install futures 

calc_args = []
if options.analyze_all:
    calc_args.extend(dict(well_id=i,...) for i in well_ids)

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    future_to_args = dict((executor.submit(big_calculation, args), args)
                           for args in calc_args)

    while future_to_args:
        for future in concurrent.futures.as_completed(dict(**future_to_args)):
            args = future_to_args.pop(future)
            if future.exception() is not None:
                print('%r generated an exception: %s' % (args,
                                                         future.exception()))
                if isinstance(future.exception(), ValueError):
                    #modify params and resubmit
                    args["window"] += 1
                    future_to_args[executor.submit(big_calculation, args)] = args

            else:
                print('f%r returned %r' % (args, future.result()))

print("ALL work SEEMs TO BE DONE")

Вы можете заменить ThreadPoolExecutor на ProcessPoolExecutor если нет общего состояния.Введите код в вашу функцию main().

4 голосов
/ 07 октября 2011

Для уточнения моего комментария - если единственная цель ваших потоков - использовать значения из очереди и выполнять с ними функцию, вам определенно лучше сделать что-то подобное ИМХО:

q = Queue()
results = []

def worker():
  while True:
    x, y = q.get()
    results.append(x ** y)
    q.task_done()

for _ in range(workerCount):
  t = Thread(target = worker)
  t.daemon = True
  t.start()

for tup in listOfXYs:
  q.put(tup)

q.join()

# Some more code here with the results list.

q.join() будет блокироваться, пока снова не станет пустым. Рабочие потоки будут продолжать пытаться получить значения, но не найдут их, поэтому они будут ждать бесконечно, как только очередь опустеет. Когда ваш скрипт завершит выполнение, рабочие потоки умрут, потому что они будут помечены как потоки демонов.

0 голосов
/ 07 октября 2011

Я попробовал метод g.d.d.c, и он дал интересный результат. Я мог бы заставить его точное вычисление x ** y работать просто отлично распределить между потоками.

Когда я вызывал свою функцию внутри рабочего цикла True. Я мог бы выполнять вычисления среди нескольких потоков, только если бы я поместил time.sleep (1) в цикл for, который вызывает метод start () потоков.

Так в моем коде. Без time.sleep (1) программа выдала мне чистый выход без вывода или в некоторых случаях

"Исключение в потоке Thread-2 (вероятнее всего, возникает при завершении работы интерпретатора): Исключение в потоке Thread-1 (скорее всего, возникает при завершении работы интерпретатора):"

Как только я добавил time.sleep (), все прошло нормально.

for aworker in range(5):
    t = Thread(target = worker)
    t.daemon = True
    t.start()
    # This sleep was essential or results for my specific function were None
    time.sleep(1)
    print "Started"
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...