Повторная попытка Monix Task - почему здесь требуется Task.defer? - PullRequest
0 голосов
/ 27 октября 2018

Я недавно обнаружил случай, который не могу полностью понять при работе с Monix Task:

Есть две функции (в обработчике сообщений очереди):

  def handle(msg: RollbackMsg): Task[Unit] = {
    logger.info(s"Attempting to rollback transaction ${msg.lockId}")
    Task.defer(doRollback(msg)).onErrorRestart(5).foreachL { _ =>
      logger.info(s"Transaction ${msg.lockId} rolled back")
    }
  }

  private def doRollback(msg: RollbackMsg): Task[Unit] =
    (for {
      originalLock         <- findOrigLock(msg.lockId)
      existingClearanceOpt <- findExistingClearance(originalLock)
      _                    <- clearLock(originalLock, existingClearanceOpt)
    } yield ()).transact(xa)

Внутренние компонентыиз doRollback для понимания - это набор doobie вызовов, возвращающих ConnectionIO[_] монаду, а затем запускается transact, превращая композицию в Monix Task.

Теперь, как видно из функции handle, я бы хотел, чтобы весь процесс повторялся 5 раз в случае сбоя.Загадочная часть заключается в том, что этот простой вызов:

doRollback(msg).onErrorRestart(5)

на самом деле не перезапускает операцию при исключении (проверено в тестах).Чтобы получить такое поведение при повторных попытках, я должен явно обернуть его в Task.defer или каким-либо другим способом уже в Task «контексте».

И вот вопрос, который я не до конца понимаю: почему это так?doRollback уже дает мне Task экземпляр, так что я должен иметь возможность вызвать onErrorRestart, не так ли?Если это не так, как я могу быть уверен, что экземпляр Task, который я получаю откуда-то, можно перезапустить или нет?

Что мне здесь не хватает?

...