Lagom topi c подписчик - как повторить попытку в Future Exception? - PullRequest
0 голосов
/ 09 июля 2020

У меня есть подписчик topi c в lagom, как показано ниже

fooService.fooTopic().subscribe
  .atLeastOnce(
    Flow[fooMsg].map {
      case fooMsg(_) =>
        foo()
      case a =>
        println(a)
    }.async.map{ _ =>
      Done
    }
  )

, чтобы подписаться на этот topi c, я использую atLeastOnce в качестве метода, так что если есть какое-либо исключение, Я хочу, чтобы поток был перезапущен / повторен. когда я выбрасываю обычное исключение, он может продолжать повторять обычную попытку

  private def foo() = {
    throw new RuntimeException("testing error")
  }

, но когда исключение произойдет в будущем, независимо от того, как я его пробовал, Flow не перезапускается. Вот одна из моих попыток обработать исключение в будущем

  private def foo() = {
    val test: Future[Int] = Future(throw new RuntimeException("asd"))
    val result = for {
      y1 <- test
    } yield (y1)

    result.onComplete{
      case Success(value) => println("SUCCESS")
      case Failure(exception) => println(exception.getMessage)
                                 throw exception
    }
  }
  private def foo() = {
    val test: Future[Int] = Future(throw new RuntimeException("asd"))

    test.onComplete{
      case Success(value) => println("SUCCESS")
      case Failure(exception) => println(exception.getMessage)
                                 throw exception
    }
  }

, оно покажет исключение, но Flow не перезапустит его автоматически. Как мне обработать / выбросить исключение в Future?

1 Ответ

1 голос
/ 09 июля 2020

Я думаю, вам не нужно перезапускать полный поток, если вы потерпели неудачу только в одном будущем. Я предлагаю повторить попытку только Future. Например, вы можете написать такой код, который будет повторять ваш вызов, заменив Future.successful (10) на вызов вашего метода:

        val test: Future[Int] = Future(throw new RuntimeException("asd")).recoverWith {
          case NonFatal(e) =>
            Future.successful(10)
        }

        val result = for {
          y1 <- test
        } yield (y1)

Кроме того, вы можете писать код, как хотите, он будет сбой и повторите попытку, но вам нужно вернуть результат вашего Future:

  kafka.topic1.subscribe.atLeastOnce(Flow[String]
    .mapAsync(1) {
      case envelope: String =>

        val test: Future[String] = Future(throw new RuntimeException("asd"))
      /*.recoverWith {
          case NonFatal(e) =>
            Future.successful("10")
        }*/

        val result = for {
          y1 <- test
        } yield (y1)

        println(s"code block $envelope")
       result.onComplete{
          case Success(value) => println(s"Message from topic: $envelope $result")
          case Failure(exception) => println(exception.getMessage)
            throw exception
        }
      result.map(_ => Done)
    }
)
...