Я пытался переместить процессы, выполняемые сыном, из одного приложения в другое. Очереди реализованы с использованием протона qpid (AMQP 1.0), а брокер размещен в AWS. Приложение отправляет одно сообщение с полезной нагрузкой, включающей в себя несколько идентификаторов объектов, которые будут обработаны (в этом случае вставляются в базу данных postgres). Очередь unpacker
принимает эти идентификаторы и, в свою очередь, отправляет одно сообщение в очередь saver
, которая прослушивает сообщения и сохраняет объекты один за другим. unpacker
переносит эти отдельные сообщения в транзакции, например, так (для краткости загружен большой код):
def send(self, transaction):
# Get payload
payload = transaction.packed_payload
# Unpack messages
unpacked_messages = self.unpack_data(payload)
for message in unpacked_messages:
proton_message = Message(body=message)
transaction.send(self.sender, proton_message)
transaction.commit()
и saver
, например:
def on_message(self, event):
message_body = json.loads(event.message.body)
# data saving logic here
Это происходит работать действительно хорошо, если в упакованной транзакции 1000 или меньше объектов. Однако для больших объемов (и это, безусловно, рабочий случай) транзакция, по-видимому, будет успешной при фиксации, но будет обработано только 1000 сообщений, и появится следующая ошибка:
ERROR:proton:Could not process AMQP commands
Я пытался поднять max_prefetch
и constantPendingMessageLimitStrategy
безрезультатно, так как он последовательно отключает соединение на 1000-м сообщении, на AWS и локально.
Я что-то упускаю из-за ActiveMQ и его очереди конфигурации? Чем можно объяснить такое поведение?