Как я могу получить точный размер многопроцессорной очереди ???Документы говорят, что это не надежно.Любой способ решить это? - PullRequest
0 голосов
/ 12 февраля 2019

У меня есть этот код, который имеет два процесса: один в очереди, другой - в очереди.

Мне нужно проверять размер очереди каждые 60 секунд.q.size () действительно дает результат, но я хочу, чтобы результат был точным.поэтому есть ли другие способы сделать это ??

Чтобы быть точным, мне нужно контролировать ввод и вывод очереди в минуту и ​​размер очереди в минуту.

from multiprocessing import Queue, Process
import os
import time
import datetime as dt
import statsd
import random

statsd_client = statsd.StatsClient(host="localhost", port=8125, 
prefix=None, maxudpsize=512, ipv6=False)


q = Queue()


#put_timer = statsd_client.timer('put')
def queue_add_proc1():
    print("process 1 Id :", os.getpid())
    print("adding items to queue")
    x = 0
    upload_time = time.time()

    enque_count=0

    while x < 10000:
        #put_timer.start()
        curr_time = time.time()
        if curr_time - upload_time > 60:
            statsd_client.incr('enque_count_everyMinute', enque_count)
            statsd_client.incr('queue_size_enqueing', q.qsize())

            print("metric sent")
            enque_count = 0
            upload_time = curr_time
        q.put(x*2)
        #put_timer.stop(send=True)
        print("added to queue")
        x =x+ 1
        enque_count+=1
        time.sleep(0.014)
        print("done")


#pop_timer = statsd_client.timer('get')
def queue_pop_proc2():
    print("Process 2 ID :",os.getpid())
    print("popping values from queue")
    upload_time = time.time()
    deque_count = 0
    while not q.empty():
        curr_time = time.time()
        if curr_time - upload_time > 60:
            # upload dequed count
            statsd_client.incr('deque_count_everyMinute', deque_count)
            statsd_client.incr('queue_size_dequeing', q.qsize())
            print("metric sent")
            deque_count = 0
            upload_time = curr_time

        print(" popped item ", q.get())
        print("dequeued")
        deque_count += 1
        time.sleep(0.03)


if __name__ == '__main__':
    msgs_added_each_minute = list()
    msgs_popped_each_minute = list()

    print("Main process ID :", os.getpid())

    p1 = Process(target=queue_add_proc1)

    p2 = Process(target=queue_pop_proc2)

    p1.start()
    p2.start()

    p2.join()
    p1.join()

1 Ответ

0 голосов
/ 12 февраля 2019

Результат queue.qsize() является точным.Неточность заключается в том, как пользователи часто его используют.

Поскольку Queue распределяется одновременно между различными потоками / процессами, когда процесс A проверяет свой размер, процесс B может чередовать его выполнение и изменять его.Это может привести к проблемам логики.

Рассмотрим следующий пример:

queue.put("something")

def process_a(queue):
    """Does something if queue has at least one element."""
    if queue.qize() > 0:
        # now, Process B takes over and steals the only element in the queue
        element = queue.get()  # UNEXPECTED: process A will block here 
        do_something(element)

def process_b(queue):
    """Gets an element from the queue."""
    queue.get()

a = Process(process_a, args=(queue,))
b = Process(process_b, args=(queue,))
a.start()
b.start()

Проблема здесь в том, что логика полагается на определенное состояние очереди и воздействует на него.Тем не менее, состояние очереди может измениться в любой момент в параллельном сценарии.Следовательно, логика выше не будет работать, как ожидалось.

Для вашего конкретного случая использования это не должно быть проблемой, так как вы выполняете мониторинг состояния очереди в определенный момент времени.Поэтому вам все равно, будет ли очередь иметь другой размер спустя одну миллисекунду.Что вас волнует, так это средняя пропускная способность очереди.

...