Почему мой многопоточный скрипт на python, который использует Queue, threading.Thread и подпроцесс, настолько ненадежен - PullRequest
1 голос
/ 29 декабря 2010

У меня есть три сценария оболочки P1, P2 и P3, которые я пытаюсь связать.Эти три сценария оболочки должны выполняться последовательно, но в любой момент времени могут быть запущены несколько P1, P2 и P3.

Мне нужно выполнить их для десятков файлов и быстро, и, следовательно, желание использовать потокии делаю вещи параллельно.

Я использую Python Thread, Queue и модуль подпроцесса для достижения этой цели.

Моя проблема в том, что когда у меня число потоков больше единицы, программа работает хаотично и потоки не передаются друг другу в воспроизводимом режиме.Иногда все пять потоков работают отлично и работают до конца.

Это моя первая попытка сделать что-либо с использованием потоков, и я уверен, что это из-за обычных проблем с потоками, связанных с условиями гонки.Но я хочу знать, как я могу очистить свой код.

Фактический код: (https://github.com/harijay/xtaltools/blob/master/process_multi.py). Псевдокод приведен ниже. Извините, если код грязный.

Мой вопросis Почему у меня нестабильное поведение при использовании этого дизайна. Все потоки обращаются к разным файлам в любой момент времени. Также subprocess.call возвращается только после завершения сценария оболочки и создания файла, который он создает, записывается на диск.

Что я могу сделать по-другому? Я попытался объяснить свой дизайн здесь как можно более кратко.

Мой базовый дизайн:

P1_Queue = Queue()
P2_Queue = Queue()
P3_Queue = Queue()

class P1_Thread(Thread):
    def __init__(self,P1_Queue,P2_Queue):
        Thread.__init__(self)
        self.in_queue = P1_Queue
        self.out_queue = P2_Queue

    def run(self):
        while True:
            my_file_to_process = self.in_queue.get()
            if my_file_to_process = None:
                break
            P1_runner = P1_Runner(my_file_to_process)
            P1_runner.run_p1_using_subprocess()
            self.out_queue.put(my_file_to_process)

Класс p1 Runner берет дескриптор входного файла, а затемвызывает subprocess.call () для запуска сценария оболочки, который использует ввод файла и создает новый выходной файл, используя метод run_p1_using_subprocess.

class P1_runner(object):

    def __init__(self,inputfile):
        self.my_shell_script = """#!/usr/bin/sh
prog_name <<eof
input 1
...
eof"""
       self.my_shell_script_file = open("some_unique_p1_file_name.sh")
       os.chmod("some_unique_file_name.sh",0755)

    def run_p1_using_subprocess(self):
        subprocess.call([self.my_shell_script_file])

I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated

The chaining is achieved using a series of Thread Pools.
p1_worker_list = []
p2_worker_list = []
p3_worker_list = []

for i in range(THREAD_COUNT):
    p1_worker = P1_Thread(P1_Queue,P2_Queue)
    p1_worker.start()
    p1_worker_list.append(p1_worker)

for worker in p1_worker_list:
    worker.join()

And then again the same code block for p2 and p3

for i in range(THREAD_COUNT):
    p2_worker = P2_Thread(P2_Queue,P3_Queue)
    p2_worker.start()
    p2_worker_list.append(p1_worker)

for worker in p2_worker_list:
    worker.join()

Спасибо большое за вашу помощь / совет

Ответы [ 2 ]

2 голосов
/ 30 декабря 2010

Что ж, это действительно плохо:

runner.run()

Вы никогда не должны вызывать метод запуска потока вручную.Вы начинаете тему с .start ().Ваш код - ОГРОМНЫЙ беспорядок, и никто здесь не собирается пробираться через него, чтобы найти вашу ошибку.

1 голос
/ 29 декабря 2010

Условие выхода потока заставляет их совершить самоубийство, когда другой поток опустошает свою входную очередь:

    my_file_to_process = self.in_queue.get()
    if my_file_to_process = None:  # my sister ate faster than I did, so...
        break # ... I kill myself!

Потоки умирают только потому, что не нашли работу, когда были готовы к большему.

Вместо этого вы должны заставить потоки переходить в спящий режим (ждать), пока не будет сигнализировано событие в их входной очереди, и только в том случае, если оркестратор (основная программа) сигнализирует, что обработка завершена (установите флаг самоубийства, исигнализировать все очереди).

(я вижу, вы уже изменили код).

Что @Falmarri, вероятно, означает в своей заметке в другом месте, что ваш вопрос не оконкретная проблема (что-то, на что могут ответить другие), потому что общее использование библиотеки threading в вашем коде неверно, а использование языка программирования в целом неудобно.Например:

  • Вызов worker.join() заставляет основную программу дождаться завершения всех потоков P1 по порядку, прежде чем запускать потоки P2, что побеждает любую попытку параллелизма.
  • Вы должны либо переопределить Thread.run(), либо предоставить вызываемый объект для конструктора.Нет необходимости в Pn_runner классах.
  • Все классы потоков делают то же самое.Вам не нужен другой класс для каждой стадии процесса.
  • Если вы уже используете Python, то нет смысла вызывать внешнюю программу (тем более сценарий оболочки), если вы абсолютно не можете легко выполнить работув чистом Python.
  • Из-за вышеизложенного ваша программа записывает сценарии оболочки в файловую систему очень странно и почти наверняка не требуется.

Что я предлагаю сделать, чтобы решитьЭта ваша конкретная проблема:

  1. Попробуйте придерживаться 100% Python.Если вы не можете, или это кажется слишком сложным, вы, по крайней мере, нашли конкретную функциональность, к которой нужно обращаться извне.
  2. Создайте решение, которое не использует параллелизм.
  3. Измерьте производительность программы и попытайтесь улучшить ее алгоритмически.
  4. Избегайте многопоточности, если можете.Программа с привязкой к ЦП будет использовать все доступные циклы без многопоточности.Программа, которая слишком привязана к диску (или связана с каким-либо внешним / удаленным ресурсом), в конечном итоге будет ждать диска, если ей больше нечего делать.Чтобы извлечь выгоду из многопоточности, программа должна иметь правильный баланс между вычислениями и использованием внешних ресурсов (или должна иметь возможность обслуживать запросы по мере их поступления, даже если занята другим).
  5. Делайте это pythonic способ: начните с простого и постепенно увеличивайте функциональность и сложность, избегая при этом всего, что кажется сложным.

Если вы хотите научиться многопоточности в Python, то непременно ищитепростая проблема для экспериментов.И если все, что вам нужно, - это запускать несколько сценариев оболочки параллельно, тогда bash и другие оболочки уже имеют соответствующие условия, и вам не нужно использовать Python.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...