Запуск многопроцессорной обработки / многопоточности в клиенте pika и направление данных на указанный c датафрейм - PullRequest
0 голосов
/ 01 февраля 2020

Я новичок в Python многопоточность / обработка и RabbitMQ. По сути, у меня есть потребитель RabbitMQ, который передает мне данные больницы в реальном времени. Каждое сообщение содержит жизненно важные показатели пациента на пациента. Мне нужно хранить как минимум пять таких сообщений на пациента, чтобы запустить мой лог c и использовать для отключения будильника. Кроме того, поскольку число пациентов неизвестно, я думаю о многопоточности или многопроцессорности, чтобы моя тревога была почти в реальном времени и увеличивалась. Мой подход состоит в том, чтобы создать глобальный фрейм данных для каждого пациента, а затем добавить сообщения, относящиеся к этому пациенту, в фрейм данных. Но теперь у меня возникла проблема с созданием многопотоковой обработки / процесса и отправкой данных в соответствующий фрейм данных пациента. Вот мой код


bed_list=[]
thread_list=[]
bed_df={}
alarms=0

def spo2(body,bed):
    body_data= body.decode()
    print(body_data)
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    if bed_id=bed:
        primary_attributes= json_normalize(packet)
         '''some logic'''
        global bed_df
        bed_df[bed_id]= bed_df[bed_id].append(packet) # creating the global dataframe to store five messages
        print(bed_df[bed_id])

        ''' some other calcuation'''

            phy_channel.basic_publish(body=json.dumps(truejson),exchange='nicu')# throwing out the alarm with another queue
            bed_df[bed_id]= bed_df[bed_id].tail(4)  # resets the size of the dataframe 


def callback(ch, method, properties, body):
    body_data= body.decode()
    packet= json.loads(body_data)
    bed_id= packet['beds'][0]['bedId']
    print(bed_id)
    global bed_list
    if bed_id not in bed_list:
      bed_list.append(bed_id)


#pseudo code
 for bed in bed_list:
     proc = Process(target=spo2, args=(bed,))
     procs.append(proc)
     proc.start()

Я не могу найти выход, где я могу создать поток / процесс для каждого пациента (bed_id), чтобы всякий раз, когда я получал сообщение для этого пациента (bed_id), я может направить его в эту ветку. Я проверил очереди, но в документации не очень ясно, как реализовать этот случай.

1 Ответ

0 голосов
/ 01 февраля 2020

Прежде чем вы go по этому маршруту, вы должны оценить, если это даже необходимо. Одним из важных ограничений является полоса пропускания rabbitmq .

. Создайте однопоточное приложение и начните получать синтетические сообщения c rabbitmq. Увеличивайте частоту мсг / с до тех пор, пока она не сможет больше поддерживать скорость.

Если эта скорость намного выше, чем вероятно на практике, то все готово. : -)

Если нет, тогда вы запускаете профилирование вашего приложения, чтобы найти, какие его части занимают больше всего времени. Это ваши узкие места. Только когда вы знаете, что такое узкие места, вы можете посмотреть на соответствующий код и подумать, как их улучшить.

Обратите внимание, что multiprocessing и threading делают разные вещи и имеют разные приложения. Если ваше приложение ограничено количеством вычислений, которые оно может выполнить, multiprocessing может помочь, распространив вычисления по нескольким ядрам ЦП. Обратите внимание, что это работает только в том случае, если расчеты независимы друг от друга. Если ваше приложение тратит много времени на ожидание ввода-вывода, threading может помочь вам выполнить вычисления в одном потоке, в то время как другой ожидает ввода-вывода.

Но ни один из них не является бесплатным с точки зрения сложности , Например, с threading вы должны защитить чтение и запись ваших фреймов данных с помощью блокировок, чтобы только один поток одновременно мог читать или модифицировать указанный фрейм данных. С multiprocessing вы должны отправлять данные из рабочих процессов обратно в родительский процесс.

В этом случае я думаю, что multiprocessing будет наиболее полезным. Вы можете настроить ряд процессов, каждый из которых отвечает за часть коек / пациентов. Если у rabbitmq может быть несколько слушателей, каждый рабочий процесс может обрабатывать только сообщения от пациентов, за которых он отвечает. В противном случае вы должны распространять сообщения для соответствующего процесса. Каждый рабочий процесс теперь обрабатывает сообщения (и сохраняет кадры данных) для нескольких пациентов. Когда предупреждение запускается на основании вычислений, выполненных на данных, рабочий должен только отправить сообщение с подробным описанием идентификатора пациента и характера предупреждения родительскому процессу.

...