Я использую Python3.7
для отправки и получения данных.
У меня проблема в том, что когда производитель отправляет много данных, RabbitMQ
получает все данные из RabbitMQ
и транслирует их.
sendmessage.py:
async def send(tx):
transport, protocol = await aioamqp.connect(host=MQ_HOST,
port=MQ_PORT,
login=MQ_USER,
password=MQ_SEED)
channel = await protocol.channel()
await channel.exchange_declare(exchange_name='event.tx',
type_name='fanout')
await channel.basic_publish(json.dumps(tx.to_dict()),
exchange_name='event.tx',
routing_key='')
await protocol.close()
transport.close()
txs = []
loop = asyncio.get_event_loop()
for i in range(1000):
tx = create_tx()
asyncio.ensure_future(send(tx))
loop.run_forever()
receive.py:
async def event_run(self):
try:
transport, protocol = await aioamqp.connect(host=MQ_HOST,
port=MQ_PORT,
login=MQ_USER,
password=MQ_SEED)
except aioamqp.AmqpClosedConnection:
return
try:
self._channel = await protocol.channel()
await self._channel.exchange_declare(exchange_name=TX_EXCHANGE,
type_name='fanout')
await self._channel.queue_declare(queue_name=EVENT_NAME(self.name),
exclusive=True)
await self._channel.queue_bind(exchange_name=TX_EXCHANGE,
queue_name=EVENT_NAME(self.name),
routing_key='')
await self._channel.basic_consume(self.process_event,
queue_name=EVENT_NAME(self.name),
no_ack=True)
except aioamqp.ChannelClosed:
if not transport.is_closing():
transport.close()
Нет проблем с отправкой от одного до десяти данных, таких как код, показанный выше, и их тестированием,но когда вы получаете большой объем данных, например 1000, потребуется слишком много времени, чтобы вернуть его и начать повторную трансляцию.
Одновременная отправка 1000 данных увеличит время ожидания от очереди до передачи.
Даже если я получаю большое количество данных, как я могу получить их для трансляции, как только я их получу?
Если код не может решить эту проблему, можем ли мы исправить это вместо плагина?модули, такие как плагины управления?