Как добавить многопроцессорность к потребителю с помощью pika (RabbitMQ) в python - PullRequest
1 голос
/ 27 марта 2019

У меня есть очень простой код производителя-потребителя, написанный с помощью pika framework на python. Проблема в том, что на стороне потребителя слишком медленно работает с сообщениями в очереди. Я провел несколько тестов и обнаружил, что с помощью многопроцессорной обработки я могу ускорить рабочий процесс до 27 раз. Проблема в том, что я не знаю, как правильно добавить многопроцессорную функциональность в мой код.

import pika
import json
from datetime import datetime
from functions import download_xmls


def callback(ch, method, properties, body):
    print('Got something')
    body = json.loads(body)
    type = body[-1]['Type']
    print('Object type in work currently ' + type)
    cnums = [x['cadnum'] for x in body[:-1]]
    print('Got {} cnums to work with'.format(len(cnums)))

    date_start = datetime.now()
    download_xmls(type,cnums)
    date_end = datetime.now()
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))


def consume(queue_name = 'bot-test'):
    parameters = pika.URLParameters('server@address')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='bot-test')
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

Как мне начать с добавления многопроцессорной функциональности отсюда?

1 Ответ

0 голосов
/ 27 марта 2019

Пика имеет обширный пример кода , который я рекомендую вам проверить. Обратите внимание, что этот код предназначен только для примера . В случае работы с потоками вам придется использовать более интеллектуальный способ управления потоками.

Цель состоит в том, чтобы не блокировать поток, который выполняет цикл ввода-вывода Пики, и правильно перезвонить в цикл ввода-вывода из ваших рабочих потоков. Вот почему add_callback_threadsafe существует и используется в этом коде.


ПРИМЕЧАНИЕ: команда RabbitMQ контролирует список рассылки rabbitmq-users и только иногда отвечает на вопросы по StackOverflow.

...