Python - обработка компромисса в очереди - PullRequest
2 голосов
/ 09 мая 2019

У меня есть очередь с 100 числами в ней, от 1 до 100. Сначала у меня есть один процесс для заполнения очереди, который печатает Queue filled. Далее у меня есть две функции, которые печатают текущее значение очереди. Я пытаюсь обменять между печатью значений очереди между процессами. Вот мой код:

import multiprocessing as mp

def fillQueue(lookup,q):
    list(map(q.put,lookup))
    print('Queue filled')

def printQueue1(q):
    while not q.empty():
        print('Process 1:', (q.get()))
    print('Process 1: Queue is empty!')

def printQueue2(q):
    while not q.empty():
        print('Process 2:', (q.get()))
    print('Process 2: Queue is empty!')

if __name__ == "__main__":
    pool = mp.Pool(processes=3)
    manager = mp.Manager()
    q = manager.Queue()

    lookup = []
    count = 1
    while count < 101:
        lookup.append(count)
        count = count + 1

    p2 = pool.apply_async(printQueue1,(q,))
    p3 = pool.apply_async(printQueue2,(q,))
    p1 = pool.apply_async(fillQueue,(lookup,q))

    pool.close()
    pool.join()

Это дает мне:

Process 1: 1
Process 1: 2
Process 1: 3
Process 1: 4
Process 1: 5
Process 2: 6
Process 1: 7
Process 2: 8
Process 1: 9
Process 2: 10

Я пытаюсь получить:

Queue filled
Process 1: 1
Process 2: 2
Process 1: 3
Process 2: 4
Process 1: 5

Есть идеи, как этого добиться? Каждый раз, когда я запускаю программу, я получаю разные результаты, поэтому происходит что-то странное. Спасибо!

Ответы [ 2 ]

1 голос
/ 09 мая 2019

Вы можете создать Queue объект для каждого процесса, который будет действовать как «эстафета», чтобы сигнализировать, какой процесс получает в очередь следующий элемент из главной очереди, а затем в главном цикле каждой рабочей функции он должен сначалапопробуйте удалить из очереди свою собственную очередь "жезлов", прежде чем пытаться удалить из основной очереди, после чего он должен "передать жезл" следующему процессу, поместив элемент в очередь "жезлов" следующего процесса.Процесс очереди должен запускать процессы снятия с очереди, помещая элемент в очередь «жезлов» процесса, который должен выполняться первым.Это работает, потому что Queue.get блокируется, пока в очереди не появится элемент:

import multiprocessing as mp
import time

def fillQueue(lookup, q, baton_first):
    list(map(q.put,lookup))
    print('Queue filled')
    baton_first.put(None)

def printQueue(id, q, baton_self, baton_other):
    while True:
        baton_self.get()
        try:
            if q.empty():
                break
            print('Process %s:' % id, (q.get()))
        # use finally to always pass on the baton whether the loop breaks or not
        finally:
            baton_other.put(None)
        time.sleep(1) # the actual work should be performed here
    print('Process %s: Queue is empty!' % id)

if __name__ == "__main__":
    pool = mp.Pool(processes=3)
    manager = mp.Manager()
    q = manager.Queue()
    baton1 = manager.Queue()
    baton2 = manager.Queue()

    p2 = pool.apply_async(printQueue,(1, q, baton1, baton2))
    p3 = pool.apply_async(printQueue,(2, q, baton2, baton1))
    p1 = pool.apply_async(fillQueue, (list(range(1, 11)), q, baton1))

    pool.close()
    pool.join()

Это выводит:

Queue filled
Process 1: 1
Process 2: 2
Process 1: 3
Process 2: 4
Process 1: 5
Process 2: 6
Process 1: 7
Process 2: 8
Process 1: 9
Process 2: 10
Process 1: Queue is empty!
Process 2: Queue is empty!
1 голос
/ 09 мая 2019

Итак, apply_async применяет процессы асинхронно - это означает, что все три запускаемых вами процесса запускаются одновременно и в некотором роде сражаются друг с другом.

Поскольку вы не запускаете эти процессы детерминистически, порядок их выполнения, вероятно, будет меняться при каждом запуске процесса.

Я предполагаю, что вы хотите:

  1. Очередь для заполнения ДО того, как процессы попытаются получить к ней доступ
  2. «Работа», которая будет равномерно распределена между процессами

Даже если вы не ограничите функции каким-либо образом, порядокони будут get() предметов, все еще довольно случайно.Если вам действительно нужна функция 1, чтобы получать только шансы, и функция 2, чтобы получать только четности, и чтобы они были в строгом порядке, вам, вероятно, не нужна многопроцессорная обработка ...

import multiprocessing as mp


def fillQueue(lookup, q):
    list(map(q.put, lookup))
    print('Queue filled')


def printQueue(q, id):
    while not q.empty():
        print('Process {}: {}'.format(id, q.get()))
    print('Process {}: Queue is empty!'.format(id))


if __name__ == "__main__":
    pool = mp.Pool(processes=3)
    manager = mp.Manager()
    q = manager.Queue()

    # no need to construct a list with a counter, we can just use the generator
    lookup = range(101)

    # do not fill the queue while processes are running, do it beforehand!
    fillQueue(lookup, q)

    # don't need different functions, since they are doing the same work
    # just fire off multiple copies of the same function
    p1 = pool.apply_async(printQueue, (q, 1,))
    p2 = pool.apply_async(printQueue, (q, 2,))

    pool.close()
    pool.join()

Пример вывода:

Queue filled
Process 2: 0
Process 2: 1
Process 2: 2
Process 2: 3
Process 2: 4
Process 2: 5
Process 1: 6
Process 2: 7
Process 1: 8
Process 2: 9
Process 2: 10
Process 1: 11
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...