Scala Futures: недетерминированный вывод - PullRequest
0 голосов
/ 27 января 2019

Я новичок в Scala, и я практикуюсь в библиотеке Futures, создав несколько схем повторов.При этом я получил следующий фрагмент кода:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def fCalc(): Future[Int] = Future(calc())

  resetRetries()

  val ff = fCalc() // 0 - should fail
    .fallbackTo(fCalc()) // 1 - should fail
    .fallbackTo(fCalc()) // 2 - should fail
    .fallbackTo(fCalc()) // 3 - should fail
    .fallbackTo(fCalc()) // 4 - should be a success

  Await.ready(ff, 10.second)

  println(ff.isCompleted)
  println(ff.value)
}

Каждый раз, когда я запускаю этот код, я получаю разные результаты.Вот примеры результатов, которые я получаю:

Выход 1

I am thread 12 This is going to fail. Retry count 1
I am thread 14 This is going to fail. Retry count 3
I am thread 13 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

Выход 2

I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 1
I am thread 13 This is going to fail. Retry count 3
I am thread 14 This is going to fail. Retry count 4
true
Some(Success(10))

Выход 3

I am thread 12 This is going to fail. Retry count 1
I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 12 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

Этоне всегда бывает, что результаты будут чередоваться между успехом и неудачей.Может быть больше, чем пара неудачных запусков, пока не появится успешный.

Насколько я понимаю, должно быть только 4 журнала "Я - нить х Это не удастся. Количество повторов х" и этидолжно быть следующее:

I am thread a This is going to fail. Retry count 1
I am thread b This is going to fail. Retry count 2
I am thread c This is going to fail. Retry count 3
I am thread d This is going to fail. Retry count 4

Не обязательно в этом порядке - поскольку я не знаю, как именно работает модель потоков Scala - но вы меня поняли.Тем не менее, я получаю этот недетерминированный вывод, с которым я не могу справиться. Итак ... мой вопрос: откуда берется этот недетерминированный вывод?

Я хотел бы отметить, что следующий механизм повторения дает последовательноте же результаты:

import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

object Retries extends App {

  var retries = 0

  def resetRetries(): Unit = retries = 0

  def calc() = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException("This failed")
  }

  def retry[T](op: => T)(retries: Int): Future[T] = Future(op) recoverWith { case _ if retries > 0 => retry(op)(retries - 1) }

  resetRetries()
  val retriableFuture: Future[Future[Int]] = retry(calc())(5)
  Await.ready(retriableFuture, 10 second)

  println(retriableFuture.isCompleted)
  println(retriableFuture.value)
}

Вывод

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Success(10))

В то время как если я уменьшу количество повторных попыток (retry(calc())(3)), результатом будет неудачное будущее, как и ожидалось

I am thread 11 This is going to fail. Retry count 1
I am thread 12 This is going to fail. Retry count 2
I am thread 11 This is going to fail. Retry count 3
I am thread 12 This is going to fail. Retry count 4
true
Some(Failure(java.lang.IllegalArgumentException: This failed))

Ответы [ 3 ]

0 голосов
/ 27 января 2019

Хотя технически @Tim является правильным, я не думаю, что он действительно отвечает на этот вопрос.

Я полагаю, что реальный источник вашего замешательства - ваше недопонимание того, что за конструкции:

f.fallbackTo(Future(calc()))

делает.И чем он отличается от

f.recoverWith({ case _ => Future(calc())})

Есть два важных различия:

  1. В случае fallbackTo Future(calc()) создается немедленно и, таким образом (почти) немедленно начинает выполнение calc().Таким образом, исходное и резервное будущее выполняются одновременно.В случае recoverWith резервное будущее создается только после неудачи исходного будущего.Эта разница влияет на порядок ведения журнала.Кроме того, это означает, что доступ к var retries является одновременным, и, таким образом, вы можете увидеть случай, когда все потоки фактически завершаются сбоем, потому что некоторые обновления к retries потеряны.

  2. Еще один сложный моментявляется то, что fallbackTo является задокументировано как (выделение мое)

Создает новое будущее, которое содержит результат этого будущего, если оно было успешно завершеноили, если нет, результат того будущего, если оно успешно завершено. Если оба фьючерса провалились , результирующее будущее содержит бросаемый объект первого будущего.

Эта разница на самом деле не являетсяповлиять на ваш пример, потому что исключение, которое вы генерируете во всех неудачных попытках, одинаково, но оно могло бы повлиять на результат, если бы они были другими.Например, если вы измените свой код на:

  def calc(attempt: Int) = if (retries > 3) 10 else {
    retries += 1
    println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
    throw new IllegalArgumentException(s"This failed $attempt")
  }

  def fCalc(attempt: Int): Future[Int] = Future(calc(attempt))

  val ff = fCalc(1) // 0 - should fail
      .fallbackTo(fCalc(2)) // 1 - should fail
      .fallbackTo(fCalc(3)) // 2 - should fail
      .fallbackTo(fCalc(4)) // 3 - should fail
      .fallbackTo(fCalc(5)) // 4 - should be a success

, то вы должны получить любой из этих двух результатов

Some(Failure(java.lang.IllegalArgumentException: This failed 1))
Some(Success(10))

и никогда не иметь никакого другого значения "fail".

Обратите внимание, что здесь я явно передаю attempt, чтобы не задавать условия гонки на retries.


Ответ на дополнительные комментарии (28 января)

Причина, по которой я явно передаю attempt в моем предыдущем примере, заключается в том, что это самый простой способ убедиться, что IllegalArgumentException, созданный логически первым calc, получит 1 как значение для всех (даже не очень реалистично)) расписания потоков.

Если вы просто заинтересованы в том, чтобы все журналы имели разные значения, есть гораздо более простой способ: используйте локальную переменную!

  def calc() = {
    val retries = atomicRetries.getAndIncrement()
    if (retries > 3) 10 
    else {
      println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $retries")
      throw new IllegalArgumentException(s"This failed $retries")
    }
  }

Таким образом, вы избегаете классического TOCTOU проблема.

0 голосов
/ 28 января 2019

Это то, что в конечном итоге сработало для меня:

(следующий код для метода calc() адекватно решает проблемы, связанные с дублированием регистрации и недетерминированными результатами фьючерсов)

var time = 0
  var resetTries = time = 0

  def calc() = this.synchronized {
    if (time > 3) 10 else {
      time += 1
      println(s"I am thread ${Thread.currentThread().getId} This is going to fail. Retry count $time") // For debugging purposes
      throw new IllegalStateException(("not yet"))
    }
  }

НетAtomicInteger обязательно - делает вещи еще более сложными, на мой взгляд.Оболочка synchronised - это то, что нужно.

Я должен подчеркнуть тот факт, что это только для демонстрационных целей, и использование такого дизайна в рабочем коде может быть не самой лучшей идеей (блокировка вызовов calcметод).Вместо этого следует использовать реализацию recoverWith.

Спасибо @SergGr, @Tim и @MichalPolitowksi за помощь

0 голосов
/ 27 января 2019

Это не проблема Scala, а более общая проблема многопоточности со значением retries.У вас есть несколько потоков, которые читают и записывают это значение без какой-либо синхронизации, поэтому вы не можете предсказать, когда будет запущен каждый поток или какое значение он увидит.

Похоже, что конкретная проблема заключается в том, что вы тестируете retries и затем обновить его позже.Вполне возможно, что все четыре потока проверят значение, прежде чем какой-либо из них обновит его.В этом случае они все увидят 0 и выдадут ошибку.

Решение состоит в том, чтобы превратить retries в AtomicInteger и использовать getAndIncrement.Это будет атомарно извлекать значение и увеличивать его, поэтому каждый поток будет видеть соответствующее значение.


Обновить следующие комментарии : Другой ответ объяснил, почему несколько потоковв то же время, поэтому я не буду повторять это здесь.При параллельном запуске нескольких потоков порядок ведения журнала всегда будет недетерминированным.

...