Функциональный способ прерывания отложенной итерации в зависимости от времени ожидания и сравнения между предыдущим и следующим, в то время как LazyList vs Stream - PullRequest
1 голос
/ 12 апреля 2020

Фон

У меня есть следующий сценарий. Я хочу, чтобы метод класса выполнялся из внешней библиотеки несколько раз, и я хочу делать это до тех пор, пока не будут выполнены определенные условия тайм-аута и условия результата (по сравнению с предыдущим результатом). Кроме того, я хочу собрать возвращаемые значения даже при «неудачном» прогоне (прогоне с «провальным» результатом условия, который должен прервать дальнейшее выполнение).

До сих пор я достиг этого с инициализацией пустого var result: Result, var stop: Boolean и использование while l oop, которое выполняется, пока условия выполняются, и изменяет внешнее состояние. Я хотел бы избавиться от этого и использовать функциональный подход.

Некоторый контекст. Предполагается, что каждый прогон будет длиться от 0 до 60 минут, а общее время итерации ограничено 60 минутами. Теоретически, нет ограничений на количество выполнений в этот период, но на практике это обычно 2-60 раз.

Проблема в том, что выполнение занимает много времени, поэтому мне нужно остановить выполнение. Моя идея состоит в том, чтобы использовать какой-нибудь ленивый Iterator или Stream в сочетании с scanLeft и Option.

Код

Плита котла

Этот код не Это не совсем уместно, но используется в моих примерах захода на посадку и дает идентичные, но несколько случайные псевдо-результаты во время выполнения. 1028 *

Полный пример

val streamStart = System.currentTimeMillis()
val stream = for {
  i <- (0 to 600).toStream
  if System.currentTimeMillis() < streamStart + timeout
} yield Baz(i, lib.run(i))
var last: Option[Baz] = None
val head = stream.headOption
val tail = if (stream.nonEmpty) stream.tail else stream
val streamVersion = (tail
  .scanLeft((head, true))((x, y) => {
    if (x._1.exists(_.result.a > y.result.a)) (Some(y), false)
    else (Some(y), true)
  })
  .takeWhile {
    case (baz, continue) =>
      if (!baz.eq(head)) last = baz
      continue
  }
  .map(_._1)
  .toList :+ last).flatten

LazyList подход (Scala 2.13)

Полный пример

val lazyListStart = System.currentTimeMillis()
val lazyList = for {
  i <- (0 to 600).to(LazyList)
  if System.currentTimeMillis() < lazyListStart + timeout
} yield Baz(i, lib.run(i))
var last: Option[Baz] = None
val head = lazyList.headOption
val tail = if (lazyList.nonEmpty) lazyList.tail else lazyList
val lazyListVersion = (tail
  .scanLeft((head, true))((x, y) => {
    if (x._1.exists(_.result.a > y.result.a)) (Some(y), false)
    else (Some(y), true)
  })
  .takeWhile {
    case (baz, continue) =>
      if (!baz.eq(head)) last = baz
      continue
  }
  .map(_._1)
  .toList :+ last).flatten

Результат

Кажется, что оба подхода дают правильный конечный результат:

List(Baz(0,Result(4,170)), Baz(1,Result(5,208)))

и они прерывают выполнение по желанию.

Редактировать: Желаемый результат - не выполнять следующую итерацию, но по-прежнему возвращать результат итерации, вызвавшей прерывание. Таким образом, желаемый результат -

List(Baz(0,Result(4,170)), Baz(1,Result(5,208)), Baz(2,Result(2,256))

и lib.run(i) должен выполняться только 3 раза.

Это достигается с помощью подхода while, а также подхода LazyList, но не Stream подход, который выполняется lib.run 4 раза (Плохо!).

Вопрос

Есть ли другой подход без сохранения состояния, который, мы надеемся, более элегантен?

Редактировать

Я понял, что мои примеры были ошибочными и не возвращали «неудачный» результат, который он должен, и что они продолжали выполняться после состояния остановки. Я переписал код и примеры, но считаю, что суть вопроса та же.

1 Ответ

5 голосов
/ 12 апреля 2020

Я бы использовал что-то более высокого уровня, например fs2 .
(или любую другую потоковую библиотеку высокого уровня, например: monix observables , akka streams или zio zstreams )

def runUntilOrTimeout[F[_]: Concurrent: Timer, A](work: F[A], timeout: FiniteDuration)
                                                 (stop: (A, A) => Boolean): Stream[F, A] = {
  val interrupt =
    Stream.sleep_(timeout)

  val run =
    Stream
      .repeatEval(work)
      .zipWithPrevious                                         
      .takeThrough {
        case (Some(p), c) if stop(p, c) => false
        case _                          => true
      } map {
        case (_, c) => c
      }

  run mergeHaltBoth interrupt
}

Вы можете видеть, как это работает здесь .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...