Я относительно новичок в Akka и пытаюсь разобраться с конкретным сценарием, касающимся HTTP Akka.
В настоящее время у меня есть реализация Source.queue
следующим образом:
poolClientFlow = Http.get(context().system())
.cachedHostConnectionPoolHttps(ConnectHttp.toHostHttps(host), settings, context().system().log());
queue = Source.<Pair<HttpRequest, RequestMeta>>queue(maxOpenRequests, OverflowStrategy.dropNew())
.throttle(maxRequestsPerSecond, Duration.ofSeconds(1))
.via(poolClientFlow)
.to(Sink.foreach(this::handleResponse))
.run(materializer);
Я подключаюсь ксторонний API, который реализует ограничение скорости. Если я выполняю вызовы, выходящие за их пределы, они отвечают кодом ошибки http 429 (слишком много запросов) и временем ожидания, после которого можно безопасно выполнить следующий запрос. Если я проигнорирую это время ожидания, сторонний API накажет меня, экспоненциально увеличив время ожидания, которого мне нужно избегать.
Что я хотел бы сделать, это pause испускание элементов изэта очередь в нисходящий поток всякий раз, когда материализованный HTTPResponse имеет код ошибки http 429. Однако я не вижу какого-либо способа внешнего контроля выброса элементов из очереди.
Пожалуйста, дайте мне знать, если есть способы решения этой проблемы или альтернативные подходы, которые я могу использовать. Заранее спасибо.