Я пытаюсь использовать RestartFlow
в Akka Streams javadsl, чтобы перезапустить один из этапов моего потока, если на этом этапе возникают какие-либо сбои, но, похоже, он не перезапускает поток, а просто отбрасывает сообщение.
Я уже видел это: RestartFlow в Akka Streams не работает должным образом , но я нахожусь на версии 2.5.19, так что это должно быть исправлено?
Я пробовал оба RestartFlow.onFailuresWithBackoff
и RestartFlow.withBackoff
, но ни один из них не сработал. Я также пытался поиграть с общей стратегией системного супервизора Actor, но, похоже, она просто перехватывает исключение, чтобы оно не выбрасывалось из потока, и плюс, кажется, не предлагало мне стратегии отсрочки и максимальной повторной попытки.
Поток:
public Consumer.DrainingControl<Done> stream() {
return Consumer.committableSource(consumerSettings,
Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
ConfigKeys.CONSUMER_TOPIC)))
.via(RestartFlow.onFailuresWithBackoff(
Duration.ofSeconds(1), // min backoff
Duration.ofSeconds(2), // max backoff,
0.2, // adds 20% "noise" to vary the intervals slightly
10, // limits the amount of restarts to 10
this::dispatchMessageFlow))
.via(Committer.flow(CommitterSettings.create(system)))
.toMat(Sink.ignore(), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(mat);
}
А потом поток:
private Flow<ConsumerMessage.CommittableMessage<String, String>,
ConsumerMessage.Committable, NotUsed> dispatchMessageFlow() {
return Flow.<ConsumerMessage.CommittableMessage<String, String>>create()
.mapAsyncUnordered(
config.getInt(ConfigKeys.PARALLELISM),
msg ->
streamProcessor.process(msg.record().value())
.whenComplete((done, e) -> {
if (e != null) {
throw new RuntimeException(e);
} else {
if (done.status().isSuccess()){
streamingConsumerLogger.info("Successfully posted message, got response:\n{}",
done.toString());
} else {
throw new RuntimeException("HTTP Error!");
}
}
})
.thenApply(done -> msg.committableOffset()));
}
Я вижу исключение один раз, когда Акка заявляет, что он собирается перезапустить график из-за сбоя, но ничего больше после этого. Согласно моему пониманию, я должен был увидеть это еще 10 раз. Потребитель продолжает прослушивать новые сообщения, поэтому создается впечатление, что сообщение просто отброшено.
java.util.concurrent.CompletionException: java.lang.RuntimeException: HTTP Error!
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:769)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: HTTP Error!
at com.company.app.messageforwarder.StreamingConsumerService.lambda$null$0(StreamingConsumerService.java:72)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
... 6 more
Если кто-нибудь может помочь, пожалуйста, укажите мне правильное направление, я был бы признателен.