Как я могу запустить parSequenceUnordered из Monix и обработать результаты каждой задачи? - PullRequest
2 голосов
/ 05 августа 2020

В настоящее время я работаю над реализацией клиентских http-запросов к API и решил изучить sttp и monix для этой задачи. Поскольку я новичок в Monix, я все еще не уверен, как запускать задачи и получать их результаты. Моя цель - получить последовательность результатов HTTP-запроса, которые я могу вызывать параллельно -> анализировать -> загрузить.

Ниже приведен фрагмент того, что я пробовал до сих пор:

import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
      .header("accept", "application/json")
      .response(asString)
      .body()
      .send()

    val tasks = Seq(r1).map(i => Task(i))
    Task.parSequenceUnordered(tasks).guarantee(backend.close())
  }
  
  import monix.execution.Scheduler.Implicits.global

  postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap$2052527361)
}

Моя путаница довольно проста (я предполагаю). Как я могу запустить Task.parSequenceUnordered, который я создал, и обрабатывать (анализировать результаты http) Задачи в последовательности?

Приятно иметь: из любопытства, можно ли наивно ввести ограничение скорости / регулирование при обработке последовательности задач запросов? Я не стремлюсь создавать что-то сложное. Это может быть так же просто, как разнесение пакетов запросов. Интересно, есть ли у Monix помощник для этого?

1 Ответ

1 голос
/ 05 августа 2020

Спасибо Олегу Пыжцову и сообществу monix gitter за то, что помогли мне разобраться в этом.

Цитата Олега здесь:

Поскольку вы уже используете бэкэнд с поддержкой моникса, тип r1 - Task[Response[Either[String,String]]]. Итак, когда вы выполняете Seq(r1).map(i => Task(i)), вы составляете последовательность задач, которые ничего не делают, кроме как дают вам другие задачи, которые дают вам результат (тип будет Seq[Task[Task[Response[...]]]]). Затем ваш код распараллеливает внешний уровень, задачи-которые-задачи, и в результате вы получаете задачи, с которых начали. Вам нужно только обработать Seq (r1), чтобы он запускал запросы параллельно.

Если вы используете Intellij, вы можете нажать Alt + =, чтобы увидеть тип выбора - это поможет, если вы можете ' t определить тип только по коду (но с опытом становится лучше).

Что касается ограничения скорости, у нас есть parSequenceN, который позволяет вам установить ограничение на параллелизм. Обратите внимание, что неупорядоченный означает лишь небольшое преимущество в производительности за счет того, что результаты находятся в случайном порядке на выходе, в любом случае они выполняются недетерминированно.

В итоге я получил (упрощенную) реализацию это выглядит примерно так:

import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task

object SO extends App {

  val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
    val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
      .header("accept", "application/json")
      .response(asString)
      .body()
      .send()

    val items = Seq(r1.map(x => x.body))
    Task.parSequenceN(1)(items).guarantee(backend.close())
  }
  
  import monix.execution.Scheduler.Implicits.global

   postTask.runToFuture.foreach(println)
}
...