Акка-стрим группа только правые элементы либо - PullRequest
0 голосов
/ 04 марта 2020

У меня есть источник, который излучает Either[String, MyClass]. Я хочу вызвать внешнюю службу с пакетами MyClass и продолжить нисходящий поток с Either[String, ExternalServiceResponse], поэтому мне нужно сгруппировать элементы потока.

Если поток будет излучать только элементы MyClass, он будет будь проще - просто позвони grouped:

val source: Source[MyClass, NotUsed] = <custom implementation>
source
  .grouped(10)                 // Seq[MyClass]
  .map(callExternalService(_)) // ExternalServiceResponse

Но как сгруппировать только элементы на правой стороне Either в моем сценарии?

val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
  .???                                                      // Either[String, Seq[MyClass]]
  .map {
    case Right(myClasses) => callExternalService(myClasses)
    case Left(string) => Left(string)
  }                                                         // Either[String, ExternalServiceResponse]

РЕДАКТИРОВАТЬ:

Это работает, но есть ли еще идиоматический c способ?

val source: Source[Either[String, MyClass], NotUsed] = <custom implementation>
source
  .groupBy(2, either => either.isRight)
  .grouped(10)
  .map(input => input.headOption match {
    case Some(Right(_)) =>
      callExternalService(input.map(item => item.right.get))
    case _ =>
      input
  )
  .mapConcat(_.to[scala.collection.immutable.Iterable])
  .mergeSubstreams 

1 Ответ

0 голосов
/ 04 марта 2020

Это должно преобразовать источник Either[L, R] в источник Either[L, Seq[R]] с настраиваемой группировкой Right с.

def groupRights[L, R](groupSize: Int)(in: Source[Either[L, R], NotUsed]): Source[Either[L, Seq[R]], NotUsed] =
  in.map(Option _)  // Yep, an Option[Either[L, R]]
    .concat(Source.single(None)) // to emit when `in` completes
    .statefulMapConcat { () =>
      val buffer = new scala.collection.mutable.ArrayBuffer[R](groupSize)

      def dumpBuffer(): List[Either[L, Seq[R]] = {
        val out = List(Right(buffer.toList))
        buffer.clear()
        out
      }

      incoming: Option[Either[L,R]] => {
        incoming.map { _.fold(
            l => List(Left(l)),  // unfortunate that we have to re-wrap
            r => {
              buffer += r
              if (buffer.size == groupSize) {
                dumpBuffer()
              } else {
                Nil
              }
            }
          )
        }.getOrElse(dumpBuffer()) // End of stream
      }
    }

Помимо этого, я отмечу, что нисходящий код в вызов внешней службы может быть переписан как

.map(_.right.map(callExternalService))

Если вы можете надежно вызвать внешнюю службу с параллелизмом n, возможно, стоит также сделать это с помощью:

.mapAsync(n) { e.fold(
    l => Future.successful(Left(l)),
    r => Future { Right(callExternalService(r)) }
  )
}

You может даже, если хотите увеличить пропускную способность за счет сохранения заказа, заменить mapAsync на mapAsyncUnordered.

...