Интегрировать HTTP-запрос / ответ с асинхронными сообщениями в RabbitMQ - PullRequest
1 голос
/ 15 января 2020

Наше приложение представляет собой систему обработки сообщений с несколькими компонентами, связанными с очередями RabbitMQ. Таким образом, обработка сообщений выполняется асинхронно. Теперь мне нужно добавить адаптер HTTP, который связывается с системой. Поскольку HTTP синхронен с запросом / ответом, мне нужен способ соединения синхронных и асинхронных потоков. Текущее решение:

  1. HTTP-запросы отправляются в одну очередь. Каждый запрос имеет уникальный идентификатор запроса для корреляции.
  2. HTTP-запрос блокируется CompletableFuture.
  3. Запрос обрабатывается, и ответ отправляется обратно в другую очередь.
  4. Потребитель очереди использует ответ для завершения CompletableFuture сопоставления с идентификатором запроса.

Адаптер HTTP реализован с использованием Akka HTTP. Запросы обрабатываются с использованием handleWithAsyncHandler() с функцией типа Function<HttpRequest, CompletionStage<HttpResponse>>.

Проблема заключается в том, что адаптеру HTTP необходимо управлять картой (Map<String, CompletableFuture>) всех ожидающих запросов. Для каждого запроса создается новый объект CompletableFuture, который помещается на карту. Когда в очереди получен ответ, соответствующее CompletableFuture завершается, чтобы завершить sh запрос. Это кажется плохим запахом в коде, потому что мне нужно тщательно управлять этой картой. Например, если не удалось сгенерировать ответ на запрос, его необходимо удалить с карты.

Интересно, есть ли другие способы, кроме использования карты для отслеживания всех ожидающих запросов.

1 Ответ

1 голос
/ 16 января 2020

По сути, akka-http может быть в стиле asyn c. Вам не нужно реализовывать эту очередь для сопоставления идентификатора запроса.

Необходимо учитывать, что НЕ используйте диспетчер по умолчанию .

Лучше определить диспетчер блокировки для обработки CompletableFuture.supplyAsyn c

Например

my-blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 16
  }
  throughput = 1
}

import static akka.http.javadsl.server.Directives.completeWithFuture;
import static akka.http.javadsl.server.Directives.post;

// GOOD (the blocking is now isolated onto a dedicated dispatcher):
final Route routes = post(() -> {
    final MessageDispatcher dispatcher = system.dispatchers().lookup("my-blocking-dispatcher");
    return completeWithFuture(CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                }
                return HttpResponse.create()
                        .withEntity(Long.toString(System.currentTimeMillis()));
            }, dispatcher // uses the good "blocking dispatcher" that we
            // configured, instead of the default dispatcher to isolate the blocking.
    ));
});

...