Я недавно обнаружил случай, который не могу полностью понять при работе с Monix Task
:
Есть две функции (в обработчике сообщений очереди):
def handle(msg: RollbackMsg): Task[Unit] = {
logger.info(s"Attempting to rollback transaction ${msg.lockId}")
Task.defer(doRollback(msg)).onErrorRestart(5).foreachL { _ =>
logger.info(s"Transaction ${msg.lockId} rolled back")
}
}
private def doRollback(msg: RollbackMsg): Task[Unit] =
(for {
originalLock <- findOrigLock(msg.lockId)
existingClearanceOpt <- findExistingClearance(originalLock)
_ <- clearLock(originalLock, existingClearanceOpt)
} yield ()).transact(xa)
Внутренние компонентыиз doRollback
для понимания - это набор doobie вызовов, возвращающих ConnectionIO[_]
монаду, а затем запускается transact
, превращая композицию в Monix Task
.
Теперь, как видно из функции handle
, я бы хотел, чтобы весь процесс повторялся 5 раз в случае сбоя.Загадочная часть заключается в том, что этот простой вызов:
doRollback(msg).onErrorRestart(5)
на самом деле не перезапускает операцию при исключении (проверено в тестах).Чтобы получить такое поведение при повторных попытках, я должен явно обернуть его в Task.defer
или каким-либо другим способом уже в Task
«контексте».
И вот вопрос, который я не до конца понимаю: почему это так?doRollback
уже дает мне Task
экземпляр, так что я должен иметь возможность вызвать onErrorRestart
, не так ли?Если это не так, как я могу быть уверен, что экземпляр Task
, который я получаю откуда-то, можно перезапустить или нет?
Что мне здесь не хватает?