Почему рабочие перестают работать в бассейне? - PullRequest
0 голосов
/ 12 мая 2019

Чтобы распараллелить обработку некоторых данных, я хочу создать 2 разных рабочих.Каждый из них использует свой источник данных.Поэтому я называю их индивидуально с рабочим пулом одного работника.(см. Код)

Я хочу использовать очередь для каждого из них в отдельности, чтобы получить входные данные.Самый первый прогон работает нормально.Но если я добавлю больше рабочих мест для рабочего, рабочий не обработает их.Вы также можете увидеть список рабочих мест, увеличивающихся.

Что я здесь не так делаю?Желаемым выводом будет распечатка каждого задания, которое я помещаю в очередь.

Вы можете проверить приведенный ниже код, написанный на Python 3.7, который должен воспроизвести ошибку.

Бонус Вопрос:Если я уберу «time.sleep (0.1)» в разделе main и запусту сценарий в PowerShell, сценарий завершится, прежде чем выводится результат второго работника.Как использовать .join здесь для того, чтобы дождаться запуска процессов?

# Creates workers
def worker1(q):
    while True:
        item = q.get(True)
        print('I am Worker 1: I started working, len of the queue is : ' + str(q.qsize()))
        print(item)

        # Simulating a long calculation
        time.sleep(1)


def worker2(q):
    while True:
        item = q.get(True)
        print('I am Worker 2: I started working, len of the queue is :  ' + str(q.qsize()))
        print(item)

        # Simulating a long calculation
        time.sleep(1)


if __name__ == '__main__':

    # Configs
    n_workers = 2

    # Define Queues
    q1 = multiprocessing.Manager().Queue()
    q2 = multiprocessing.Manager().Queue()

    the_queue = [q1, q2]
    the_pool = []
    workers = [worker1, worker2]

    # Creating one worker for each Queue
    for i in range(n_workers):
        the_pool.append(multiprocessing.Pool(1, workers[i],(the_queue[i],)))

    # First Run
    for i in range(n_workers):
        the_queue[i].put("hello " + str(i))
        print('I just put in a value, len of the queue is :  ' + str(the_queue[i].qsize()))

    # Give some time before adding new tasks
    time.sleep(0.1)

    # Next Runs
    for ii in range(10):
        for i in range(n_workers):
            the_queue[i].put("hello " + str(i))
            print('I just put in a value, len of the queue is :   ' + str(the_queue[i].qsize()))
...