Наше приложение представляет собой систему обработки сообщений с несколькими компонентами, связанными с очередями RabbitMQ. Таким образом, обработка сообщений выполняется асинхронно. Теперь мне нужно добавить адаптер HTTP, который связывается с системой. Поскольку HTTP синхронен с запросом / ответом, мне нужен способ соединения синхронных и асинхронных потоков. Текущее решение:
- HTTP-запросы отправляются в одну очередь. Каждый запрос имеет уникальный идентификатор запроса для корреляции.
- HTTP-запрос блокируется
CompletableFuture
. - Запрос обрабатывается, и ответ отправляется обратно в другую очередь.
- Потребитель очереди использует ответ для завершения
CompletableFuture
сопоставления с идентификатором запроса.
Адаптер HTTP реализован с использованием Akka HTTP. Запросы обрабатываются с использованием handleWithAsyncHandler()
с функцией типа Function<HttpRequest, CompletionStage<HttpResponse>>
.
Проблема заключается в том, что адаптеру HTTP необходимо управлять картой (Map<String, CompletableFuture>
) всех ожидающих запросов. Для каждого запроса создается новый объект CompletableFuture
, который помещается на карту. Когда в очереди получен ответ, соответствующее CompletableFuture
завершается, чтобы завершить sh запрос. Это кажется плохим запахом в коде, потому что мне нужно тщательно управлять этой картой. Например, если не удалось сгенерировать ответ на запрос, его необходимо удалить с карты.
Интересно, есть ли другие способы, кроме использования карты для отслеживания всех ожидающих запросов.