Akka Streams onFailuresWackBoff не перезапускает поток - PullRequest
0 голосов
/ 18 января 2019

Я пытаюсь использовать RestartFlow в Akka Streams javadsl, чтобы перезапустить один из этапов моего потока, если на этом этапе возникают какие-либо сбои, но, похоже, он не перезапускает поток, а просто отбрасывает сообщение.

Я уже видел это: RestartFlow в Akka Streams не работает должным образом , но я нахожусь на версии 2.5.19, так что это должно быть исправлено?

Я пробовал оба RestartFlow.onFailuresWithBackoff и RestartFlow.withBackoff, но ни один из них не сработал. Я также пытался поиграть с общей стратегией системного супервизора Actor, но, похоже, она просто перехватывает исключение, чтобы оно не выбрасывалось из потока, и плюс, кажется, не предлагало мне стратегии отсрочки и максимальной повторной попытки.

Поток:

public Consumer.DrainingControl<Done> stream() {
    return Consumer.committableSource(consumerSettings,
        Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
            ConfigKeys.CONSUMER_TOPIC)))
        .via(RestartFlow.onFailuresWithBackoff(
                Duration.ofSeconds(1), // min backoff
                Duration.ofSeconds(2), // max backoff,
                0.2, // adds 20% "noise" to vary the intervals slightly
                10, // limits the amount of restarts to 10
                this::dispatchMessageFlow))
        .via(Committer.flow(CommitterSettings.create(system)))
        .toMat(Sink.ignore(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(mat);
}

А потом поток:

private Flow<ConsumerMessage.CommittableMessage<String, String>,
    ConsumerMessage.Committable, NotUsed> dispatchMessageFlow() {
    return Flow.<ConsumerMessage.CommittableMessage<String, String>>create()
            .mapAsyncUnordered(
                config.getInt(ConfigKeys.PARALLELISM),
                msg ->
                    streamProcessor.process(msg.record().value())
                        .whenComplete((done, e) -> {
                            if (e != null) {
                                throw new RuntimeException(e);
                            } else {
                                if (done.status().isSuccess()){
                                    streamingConsumerLogger.info("Successfully posted message, got response:\n{}",
                                        done.toString());
                                } else {
                                    throw new RuntimeException("HTTP Error!");
                                }
                            }
                        })
                        .thenApply(done -> msg.committableOffset()));
}

Я вижу исключение один раз, когда Акка заявляет, что он собирается перезапустить график из-за сбоя, но ничего больше после этого. Согласно моему пониманию, я должен был увидеть это еще 10 раз. Потребитель продолжает прослушивать новые сообщения, поэтому создается впечатление, что сообщение просто отброшено.

java.util.concurrent.CompletionException: java.lang.RuntimeException: HTTP Error!
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:769)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: HTTP Error!
    at com.company.app.messageforwarder.StreamingConsumerService.lambda$null$0(StreamingConsumerService.java:72)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    ... 6 more

Если кто-нибудь может помочь, пожалуйста, укажите мне правильное направление, я был бы признателен.

1 Ответ

0 голосов
/ 22 января 2019

это работает немного по-другому. Короче говоря - если произойдет ошибка, сообщение будет удалено, но источник / поток будет просто перезапущен, не уничтожая весь поток. это описано в документации RestartFlow.onFailuresWithBackoff :

Процесс перезапуска по своей природе с потерями, поскольку нет никакой координации между отменой и отправкой сообщений. Сигнал завершения с любого конца обернутого потока вызовет завершение другого конца, и любые транзитные сообщения будут потеряны. Во время отката этот поток будет противодавлять.

...