Акка внешне контролирует испускание элементов из Source.queue - PullRequest
0 голосов
/ 14 октября 2019

Я относительно новичок в Akka и пытаюсь разобраться с конкретным сценарием, касающимся HTTP Akka.

В настоящее время у меня есть реализация Source.queue следующим образом:

poolClientFlow = Http.get(context().system())
            .cachedHostConnectionPoolHttps(ConnectHttp.toHostHttps(host), settings, context().system().log());
queue = Source.<Pair<HttpRequest, RequestMeta>>queue(maxOpenRequests, OverflowStrategy.dropNew())
                .throttle(maxRequestsPerSecond, Duration.ofSeconds(1))
                .via(poolClientFlow)
                .to(Sink.foreach(this::handleResponse))
                .run(materializer);

Я подключаюсь ксторонний API, который реализует ограничение скорости. Если я выполняю вызовы, выходящие за их пределы, они отвечают кодом ошибки http 429 (слишком много запросов) и временем ожидания, после которого можно безопасно выполнить следующий запрос. Если я проигнорирую это время ожидания, сторонний API накажет меня, экспоненциально увеличив время ожидания, которого мне нужно избегать.

Что я хотел бы сделать, это pause испускание элементов изэта очередь в нисходящий поток всякий раз, когда материализованный HTTPResponse имеет код ошибки http 429. Однако я не вижу какого-либо способа внешнего контроля выброса элементов из очереди.

Пожалуйста, дайте мне знать, если есть способы решения этой проблемы или альтернативные подходы, которые я могу использовать. Заранее спасибо.

...