Как отправить большие объемы данных производителя потребителям, не дожидаясь в RabbitMQ - PullRequest
0 голосов
/ 17 апреля 2019

Я использую Python3.7 для отправки и получения данных.

У меня проблема в том, что когда производитель отправляет много данных, RabbitMQ получает все данные из RabbitMQ и транслирует их.

sendmessage.py:

async def send(tx):
    transport, protocol = await aioamqp.connect(host=MQ_HOST,
                                                port=MQ_PORT,
                                                login=MQ_USER,
                                                password=MQ_SEED)

    channel = await protocol.channel()

    await channel.exchange_declare(exchange_name='event.tx',
                                   type_name='fanout')


    await channel.basic_publish(json.dumps(tx.to_dict()),
                                exchange_name='event.tx',
                                routing_key='')

    await protocol.close()
    transport.close()

txs = []
loop = asyncio.get_event_loop()
for i in range(1000):
    tx = create_tx()
    asyncio.ensure_future(send(tx))


loop.run_forever()

receive.py:

async def event_run(self):
        try:
            transport, protocol = await aioamqp.connect(host=MQ_HOST,
                                                        port=MQ_PORT,
                                                        login=MQ_USER,
                                                        password=MQ_SEED)
        except aioamqp.AmqpClosedConnection:
            return

        try:
            self._channel = await protocol.channel()

            await self._channel.exchange_declare(exchange_name=TX_EXCHANGE,
                                                 type_name='fanout')

            await self._channel.queue_declare(queue_name=EVENT_NAME(self.name),
                                              exclusive=True)

            await self._channel.queue_bind(exchange_name=TX_EXCHANGE,
                                           queue_name=EVENT_NAME(self.name),
                                           routing_key='')

            await self._channel.basic_consume(self.process_event,
                                              queue_name=EVENT_NAME(self.name),
                                              no_ack=True)
        except aioamqp.ChannelClosed:
            if not transport.is_closing():
                transport.close()

Нет проблем с отправкой от одного до десяти данных, таких как код, показанный выше, и их тестированием,но когда вы получаете большой объем данных, например 1000, потребуется слишком много времени, чтобы вернуть его и начать повторную трансляцию.

Одновременная отправка 1000 данных увеличит время ожидания от очереди до передачи.

Даже если я получаю большое количество данных, как я могу получить их для трансляции, как только я их получу?

Если код не может решить эту проблему, можем ли мы исправить это вместо плагина?модули, такие как плагины управления?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...