У меня есть три сценария оболочки P1, P2 и P3, которые я пытаюсь связать.Эти три сценария оболочки должны выполняться последовательно, но в любой момент времени могут быть запущены несколько P1, P2 и P3.
Мне нужно выполнить их для десятков файлов и быстро, и, следовательно, желание использовать потокии делаю вещи параллельно.
Я использую Python Thread, Queue и модуль подпроцесса для достижения этой цели.
Моя проблема в том, что когда у меня число потоков больше единицы, программа работает хаотично и потоки не передаются друг другу в воспроизводимом режиме.Иногда все пять потоков работают отлично и работают до конца.
Это моя первая попытка сделать что-либо с использованием потоков, и я уверен, что это из-за обычных проблем с потоками, связанных с условиями гонки.Но я хочу знать, как я могу очистить свой код.
Фактический код: (https://github.com/harijay/xtaltools/blob/master/process_multi.py). Псевдокод приведен ниже. Извините, если код грязный.
Мой вопросis Почему у меня нестабильное поведение при использовании этого дизайна. Все потоки обращаются к разным файлам в любой момент времени. Также subprocess.call возвращается только после завершения сценария оболочки и создания файла, который он создает, записывается на диск.
Что я могу сделать по-другому? Я попытался объяснить свой дизайн здесь как можно более кратко.
Мой базовый дизайн:
P1_Queue = Queue()
P2_Queue = Queue()
P3_Queue = Queue()
class P1_Thread(Thread):
def __init__(self,P1_Queue,P2_Queue):
Thread.__init__(self)
self.in_queue = P1_Queue
self.out_queue = P2_Queue
def run(self):
while True:
my_file_to_process = self.in_queue.get()
if my_file_to_process = None:
break
P1_runner = P1_Runner(my_file_to_process)
P1_runner.run_p1_using_subprocess()
self.out_queue.put(my_file_to_process)
Класс p1 Runner берет дескриптор входного файла, а затемвызывает subprocess.call () для запуска сценария оболочки, который использует ввод файла и создает новый выходной файл, используя метод run_p1_using_subprocess.
class P1_runner(object):
def __init__(self,inputfile):
self.my_shell_script = """#!/usr/bin/sh
prog_name <<eof
input 1
...
eof"""
self.my_shell_script_file = open("some_unique_p1_file_name.sh")
os.chmod("some_unique_file_name.sh",0755)
def run_p1_using_subprocess(self):
subprocess.call([self.my_shell_script_file])
I have essentially similar classes for P2 and P3 . All of which call a shell script that is custom generated
The chaining is achieved using a series of Thread Pools.
p1_worker_list = []
p2_worker_list = []
p3_worker_list = []
for i in range(THREAD_COUNT):
p1_worker = P1_Thread(P1_Queue,P2_Queue)
p1_worker.start()
p1_worker_list.append(p1_worker)
for worker in p1_worker_list:
worker.join()
And then again the same code block for p2 and p3
for i in range(THREAD_COUNT):
p2_worker = P2_Thread(P2_Queue,P3_Queue)
p2_worker.start()
p2_worker_list.append(p1_worker)
for worker in p2_worker_list:
worker.join()
Спасибо большое за вашу помощь / совет