Многопроцессорная обработка Python - лучший способ понять прогресс для каждого процесса - PullRequest
3 голосов
/ 04 октября 2019

Я хотел бы знать, как продвигаются мои процессы. На данный момент то, что я использую, не очень эффективно. Это mwe:

import time
from multiprocessing import Pool as ProcessPool
import progressbar
import random

def some_random_calculation(n):
    with progressbar.ProgressBar(max_value=n) as bar:
        for i in range(0,n):
            time.sleep(1)
            bar.update(i)

if __name__=='__main__':

    arguments = [random.randint(4,10) for i in range(4)]

    pool = ProcessPool(4)
    results = pool.map_async(some_random_calculation, arguments)
    print(results.get())
    pool.close() 
    pool.join()

. В этом случае я использую progressbar2, однако вывод постоянно обновляется в той же строке, когда процесс превышает 1:

Вы видите на изображении, что столбцы расположены в отсортированном порядке только потому, что после окончания первого столбца новый процесс создается другими процессами. Когда есть несколько процессов, одна строка обновляется в одной строке.

Я ищу решение своей проблемы, было бы здорово иметь динамически обновляемые n полосы. Тем не менее, вероятно, есть более разумный способ получить представление о ходе различных процессов. Любой совет?

1 Ответ

1 голос
/ 04 октября 2019

Так что это далеко не идеально , предмет довольно сложный, если вы хотите все сделать правильно. Но одно можно сказать наверняка, вы должны следить за ходом процесса за пределами подпроцессов.

Самый быстрый и, вероятно, самый простой способ сделать это - это вызвать функцию вызова, которая возвращает статус, и управляющий объект снаружи можетдержать пользователя в курсе прогресса. Это будет выглядеть примерно так:

import os, signal
from threading import Thread, enumerate as t_enumerate
from time import time, sleep
from random import random

clear = lambda: os.system('cls' if os.name=='nt' else 'clear')

def sig_handler(signal, frame):
    for t in t_enumerate():
        if t.getName() != 'MainThread':
            t.stop()
    exit(0)
signal.signal(signal.SIGINT, sig_handler)

class worker(Thread):
    def __init__(self, init_value=0):
        Thread.__init__(self)
        self.init_value = init_value
        self.progress = 0
        self.run_state = True
        self.start() # Start ourselves instead of from outside.

    def poll(self):
        return self.progress

    def stop(self):
        self.run_state = False

    def run(self):
        main_thread = None
        for t in t_enumerate():
            if t.getName() == 'MainThread':
                main_thread = t
                break

        while main_thread and self.run_state and main_thread.isAlive():
            for i in range(0, 100):
                self.init_value *= i
                self.progress = i
                sleep(random())
            break # Yea kinda unessecary while loop. meh..

workers = [worker(0) for i in range(4)]

while len(t_enumerate()) > 1:
    clear()
    for index, worker_handle in enumerate(workers):
        progress = worker_handle.poll()
        print(f'Thread {index} is at {progress}/100.')
    sleep(1)

Другой подход заключается в том, чтобы каждый поток получал блокировку в пуле потоков перед печатью. Но это добавляет сложности, для начала им всем нужно будет синхронизировать , когда пришло время печатать, чтобы они не могли произвольно получить блокировку для печати, но вы находитесь в какой-то другой части выводапроцесс, где что-то еще печатается. Или они будут печатать в неправильном порядке, или вам нужно будет отследить, какую строку вы должны откатить назад, чтобы переписать ..

Возможно, здесь будет гуру Threading с лучшим ответом, но этоэто мои два цента. Просто добавьте функцию опроса, выполните комбинированное обновление статуса и работайте с очень ограниченными вычислительными возможностями, необходимыми для вызова каждого потока. Если у вас их нет, вы не окажете никакого влияния на производительность, если будете звонить несколько раз.

...