Создание Seq после ожидания всех результатов с карты / foreach в Scala - PullRequest
0 голосов
/ 28 января 2020

Я пытаюсь l oop на входах и обрабатывать их для получения результатов. Просто для первого ввода, я хочу сделать некоторую обработку, которая занимает некоторое время. Функция в конечном итоге возвращает только значения из части else. Часть if выполняется после того, как функция возвращает значение. Я новичок в Scala и понимаю поведение, но не уверен, как это исправить. Я пробовал input.zipWithIndex.map вместо foreach, но результат тот же.

def getscores(
    inputs: inputs
): Future[Seq[scoreInfo]] = {
  var scores: Seq[scoreInfo] = Seq()
  inputs.zipWithIndex.foreach {
    case (f, i) => {
      if (i == 0) {
        // long operation that returns Future[Option[scoreInfo]]
        getgeoscore(f).foreach(gso => { 
          gso.foreach(score => {
            scores = scores.:+(score)
          })
        })
      } else {
        scores = scores.:+(
          scoreInfo(
            id = "",
            score = 5
          )
        )
      }
    }
  }
  Future {
    scores
  }
}

Ответы [ 3 ]

2 голосов
/ 28 января 2020

Для того, что вам нужно, я бы отбросил непостоянную переменную и заменил foreach на map, чтобы получить неизменный список Futures, и recover для обработки исключений, за которым следовал sequence как показано ниже:

def getScores(inputs: Inputs): Future[List[ScoreInfo]] = Future.sequence(
  inputs.zipWithIndex.map{ case (input, idx) =>
    if (idx == 0)
      getGeoScore(input).map(_.getOrElse(defaultScore)).recover{ case e => errorHandling(e) }
    else
      Future.successful(ScoreInfo("", 5))
  })

Для захвата / печати результата можно использовать onComplete:

getScores(inputs).onComplete(println)
1 голос
/ 28 января 2020

Часть, которую вы упускаете, - это понимание хитрого элемента параллелизма, и это то, что порядок исполнения при использовании нескольких фьючерсов не гарантирован. Если ваш блок здесь длительный, потребуется некоторое время, прежде чем добавить счет к scores

// long operation that returns Future[Option[scoreInfo]]
        getgeoscore(f).foreach(gso => { 
          gso.foreach(score => {
            // stick a println("here") in here to see what happens, for demonstration purposes only
            scores = scores.:+(score)
          })
        })

Поскольку он выполняется одновременно, ваша функция getscores также будет одновременно продолжать свою работу, повторяя все остальные inputs в вашем zipWithindex. Эта итерация, тем более что это тривиальная работа, вероятно, заканчивается задолго до того, как долгосрочный getgeoscore(f) завершит выполнение запланированного Future, и код выйдет из функции, перейдя к следующему коду после вызова *. 1010 *

val futureScores: Future[Seq[scoreInfo]] = getScores(inputs)
futureScores.onComplete{
  case Success(scoreInfoSeq) => println(s"Here's the scores: ${scoreInfoSeq.mkString(",")}"

}
//a this point the call to getgeoscore(f) could still be running and finish later, but you will never know

doSomeOtherWork()

Теперь, чтобы очистить это, поскольку вы можете запустить zipWithIndex для вашего inputs параметра, я предполагаю, что вы имеете в виду что-то вроде inputs:Seq[Input]. Если все, что вы хотите сделать, это работать с первым входом, то используйте функцию head, чтобы только получить первую опцию, так что getgeoscores(inputs.head), вам не нужен оставшийся код там.

Также, как примечание, если вы используете Scala, избавьтесь от привычки использовать изменяемые var s, , особенно , если вы работаете с параллелизмом. Scala основан на поддержке неизменяемости, поэтому, если вы захотите использовать var, попробуйте использовать val и посмотрите, как работать с библиотекой коллекций Scala, чтобы она работала.

0 голосов
/ 28 января 2020

В общем, то есть, когда у вас есть несколько одновременных вариантов будущего, я бы сказал, что ответ Лео описывает правильный способ сделать это. Однако вы хотите, чтобы только первый элемент преобразовывался в результате длительной операции. Таким образом, вы можете использовать будущее возвращение соответствующей функцией и добавлять другие элементы, когда длительный вызов возвращается, отображая будущий результат:

def getscores(inputs: Inputs): Future[Seq[ScoreInfo]] = 
    getgeoscore(inputs.head)
        .map { optInfo => 
            optInfo ++ inputs.tail.map(_ => scoreInfo(id = "", score = 5))
        }

Так что вам не нужно ни zipWithIndex, ни дополнительное будущее или объединить результаты нескольких фьючерсов с sequence. Отображение будущего просто дает вам новое будущее с результатом, преобразованным функцией, переданной в .map().

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...