Как отсортировать многопроцессную очередь по атрибуту объекта - PullRequest
0 голосов
/ 24 марта 2020

С помощью обычного списка я мог бы отсортировать список на основе атрибута объектов с помощью:

queue.sort(key=lambda weed: (weed.x_coord), reverse=True)

Однако, с многопроцессорной очередью это было невозможно, так как я могу выполнить sh такую ​​же сортировку с многопроцессорной очередью? Или лучше избегать многопроцессорной очереди, если я хочу, чтобы очередь сортировалась в конце?

Требование состоит в том, что очередь / список должны быть поточно-ориентированными и безопасными для обработки, поскольку очередь / список будет заполняться двумя параллельными потоками.

Два процесса (p1 и p2) вставка объектов в общую очередь будет продолжаться параллельно с третьим процессом (конечным автоматом), который читает из очереди (см. код ниже). Т.е. процесс конечного автомата будет не ожидать окончания процесса p1 и p2.

Реализация пока:

import multiprocessing

class Weed():
    x=None
    y=None
    def __init__(self,x,y):
        self.x=x
        self.y=y

def p1(q):
    """
    Function that inserts weed in the shared queue
    """
    # append squares of mylist to queue
    q.put(Weed(10.1,7.3))
    q.put(Weed(8.3,2.8))
    q.put(Weed(5.1,4.2))
    q.put(Weed(15.4,5.0))

def p2(q):
    """
    Function that inserts weed in the shared queue
    """
    # append squares of mylist to queue
    q.put(Weed(25.1,1))
    q.put(Weed(1.3,1))
    q.put(Weed(9.1,1))
    q.put(Weed(13.4,1))


def state_machine(q):
    """
    Function that sorts the queue (w.r.t x-coord.) and prints it out
    """
    print("Queue elements:")
    while not q.empty():
        q.sort(key=lambda x: (x.x), reverse=True) # Gives error - 
        print(q.get().x)
    print("Queue is now empty!")

if __name__ == "__main__":

    # creating multiprocessing Queue
    q = multiprocessing.Queue()

    # creating new processes
    p1 = multiprocessing.Process(target=p1, args=(q,))
    p2 = multiprocessing.Process(target=p2, args=(q,))
    p3 = multiprocessing.Process(target=state_machine, args=(q,))

    # running process p1 to generate some weeds
    p1.start()


    # running process p2 to generate some weeds
    p2.start()


    # running process p3 to sort the weed queue (by x coord.) and print them out
    p3.start()


    p1.join()
    p2.join()
    p3.join()

1 Ответ

0 голосов
/ 24 марта 2020

В вашем примере 3 процесса не запускаются одновременно (вы запускаете их и присоединяетесь к ним перед началом следующего), я предполагаю, что в реальном случае есть параллелизм.

Будьте осторожны, хотя : в реальном случае пустая очередь не означает, что другие задачи завершены. Вам понадобится другой механизм синхронизации.

Я предлагаю вернуться к обычному списку внутри функции state_machine и передавать элементы из многопроцессорной очереди в список по мере их поступления. Затем вы можете отсортировать список и распечатать элементы по порядку. У вас не будет проблем с параллелизмом, поскольку внутренний список изменяется только потоком, выполняющим state_machine.

def state_machine(q):
    """
    Function that sorts the queue (w.r.t x-coord.) and prints it out
    """
    print("Queue elements:")
    internal_queue = []
    # here, we assume that all tasks have finished working.
    # if it is not the case, you should add a barrier to wait
    while not q.empty():
        internal_queue.append(q.get())

    internal_queue.sort(key=lambda item: (item.x), reverse=True)
    for elt in internal_queue:
        print(elt.x)
    print("Queue is now empty!")

Печать программы:

Queue elements:
25.1
15.4
13.4
10.1
9.1
8.3
5.1
1.3
Queue is now empty!

[EDIT]

В реальном сценарии вы не хотите ждать, пока потребитель не закончит sh, прежде чем начать печать. Однако вам придется найти компромисс между двумя проблемами:

  • если вы слишком долго ждете, прежде чем начинать потреблять элементы, вы в основном вернетесь к ожиданию, когда производители завершат sh.
  • если вы потребляете элементы своей очереди слишком быстро (т.е. распечатываете их, как только они поступают), ваша очередь будет иметь тенденцию быть пустой в большинстве случаев, и сортировка больше не имеет смысла.

Там нет (ИМХО) оптимального решения, вот предложение, в котором внутренняя очередь регулярно обновляется, давая производителям время для завершения sh их работы:

def state_machine(q):
    """
    Function that sorts the queue (w.r.t x-coord.) and prints it out
    """
    internal_queue = []
    def update_internal_queue():
        while not q.empty():
            internal_queue.append(q.get())
        internal_queue.sort(key=lambda item: (item.x), reverse=True)


    # wait a bit for the beginning of the elements to arrive
    time.sleep(5)
    update_internal_queue()
    print("Queue elements:")

    while internal_queue:
        time.sleep(1) # we can optionally wait a bit before each print
        update_internal_queue() # see if other elements arrived in the meantime
        print(internal_queue.pop(0).x)
    print("Queue is now empty!")
...