Потребление и ответ на отдельные очереди |Реализация Пика - PullRequest
0 голосов
/ 30 июня 2019

Я изо всех сил пытаюсь решить мою проблему, надеюсь, что кто-нибудь из сообщества может помочь мне здесь.

Наше требование заблокировано и не может быть изменено, так как производитель, публикующий очереди, контролируется другой командой.

Производитель, который написан на JAVA, объявляет три очереди (TASK, RESPONSE, TASK_RESPONSE) и прослушивает их с помощью Spring Framework.

Хеш-карта отправляется в очередь TASK и TASK_RESPONSE изклиент Java AMQP (Производитель).

Нам нужно использовать эти хэш-карты и отправлять ответы следующим образом.

  • Если очередь TASK обработана, ответ долженотправлять в очередь RESPONSE постепенно.

  • Если очередь TASK_RESPONSE обрабатывается, ответ необходимо отправлять в очередь TASK_RESPONSE постепенно (режим RPC).

Теперь нам нужно использовать и опубликовать это на python, поскольку нам нужно выполнить некоторую фоновую обработку задач.

Я пытался работать с celery и dramatiq, но не смог понять, как это можно сделать с ними, поэтому я попытался написать сам (с помощью учебных пособий, доступных онлайн)

Проблема в том, что я могу использовать сообщения, ноне может ответить в очередь RESPONSE.Вот мой код.

from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor

import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection

from settings import *

logging.basicConfig(handlers=[StreamHandler()], level=logging.INFO, format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)


class QueueConsumer(object):
    """The consumer class to manage connections to the AMQP server/queue"""

    def __init__(self, queue, logger, parameters, thread_id=0):
        self.channel = None
        self.connection = None
        self.queue_name_task = queue['task']
        self.queue_name_response = queue['response']
        self.logger = logger
        self.consumer_id = 'Consumer Thread: %d' % (thread_id,)
        self.parameters = pika.ConnectionParameters(**parameters)

    def consume(self):
        try:
            self.connection = SelectConnection(parameters=self.parameters, on_open_callback=self._on_connected)
            self.connection.ioloop.start()
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))
            self.connection.close()
            self.connection.ioloop.start()

    def _on_connected(self, connection):
        connection.channel(on_open_callback=self._on_channel_open)

    def _on_channel_open(self, channel):
        self.channel = channel
        try:
            # Declare Task Queue
            self.channel.queue_declare(queue=self.queue_name_task,
                                       exclusive=False,
                                       durable=True,
                                       auto_delete=False,
                                       callback=self._on_queue_declared)
            self.logger.info("{} Opened Channel....".format(self.consumer_id))

            # Declare Task Response Queue
            self.channel.queue_declare(queue=self.queue_name_response,
                                       exclusive=False,
                                       durable=True,
                                       auto_delete=False)
            self.logger.info("{} Opened Channel....".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))

    def _on_queue_declared(self, frame):
        self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
        self.channel.basic_qos(prefetch_count=1)
        try:
            self.channel.basic_consume(queue=self.queue_name_task,
                                       on_message_callback=self.handle_delivery,
                                       auto_ack=True)
            self.logger.info("{} Declared queue...".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))

    def handle_delivery(self, channel, method, header, body):
        try:
            start_time = datetime.datetime.now()
            _logger.info("Received...")
            _logger.info("Content: %s" % body)
            req = json.loads(self.decode(body))

            # Do something
            sleep(randint(10, 20))

            time_taken = datetime.datetime.now() - start_time
            log_msg = "[{}] Time Taken: {}.{}".format(req['bar']['baz'], time_taken.seconds, time_taken.microseconds)
            _logger.info(log_msg)

            # Publish the result to another queue.
            try:
                self.channel.basic_publish(exchange='',
                                           routing_key=self.queue_name_response,
                                           properties=pika.BasicProperties(),
                                           body=log_msg)
                _logger.info("Message Published...\t(%s)" % self.queue_name_response)
            except Exception as e:
                self.logger.error('{} Message publishing failed:--> {}'.format(self.consumer_id, str(e)))

        except Exception as err:
            _logger.exception(err)

    def decode(self, body):
        try:
            _body = body.decode('utf-8')
        except AttributeError:
            _body = body

        return _body


if __name__ == "__main__":
    pika_parameters = OrderedDict([
        ('host', TF_BROKER_HOST),
        ('port', TF_BROKER_PORT),
        ('virtual_host', TF_BROKER_VHOST)
    ])

    queue = {'task': TF_IAAS_TASK_QUEUE, 'response': TF_IAAS_REPLY_QUEUE}

    try:
        with ThreadPoolExecutor(max_workers=TF_IAAS_THREAD_SIZE, thread_name_prefix=TF_IAAS_THREAD_PREFIX) as executor:
            start = 1
            for thread_id in range(start, (TF_IAAS_THREAD_SIZE + start)):
                executor.submit(QueueConsumer(queue, _logger, pika_parameters, thread_id).consume)

    except Exception as err:
        _logger.exception(err)

Публикация сообщений на RabbitMQ

import pika
import json
import random
import datetime
from faker import Faker
from random import randint

fake = Faker('en_US')


if __name__ == '__main__':
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))
        channel = connection.channel()

        channel.queue_declare(queue='tf_task', durable=True)
        started_at = datetime.datetime.now()
        properties = pika.BasicProperties(delivery_mode=2)
        for i in range(0, 10000):
            body = {
                'foo': randint(i, i+100),
                'bar': {
                    'baz': fake.name(),
                    'poo': float(random.randrange(155+i, 389+i))/100
                }
            }

            channel.basic_publish(exchange='',
                                  routing_key='tf_task',
                                  body=json.dumps(body),
                                  properties=properties)
            if i%10000 == 0:
                duration = datetime.datetime.now() - started_at
                print(i, duration.total_seconds())
        print(" [x] Sent 'Hello World!'")
        connection.close()
        now = datetime.datetime.now()
        duration = now - started_at
        print(duration.total_seconds())
    except Exception as e:
        print(e)
...