Это должно преобразовать источник Either[L, R]
в источник Either[L, Seq[R]]
с настраиваемой группировкой Right
с.
def groupRights[L, R](groupSize: Int)(in: Source[Either[L, R], NotUsed]): Source[Either[L, Seq[R]], NotUsed] =
in.map(Option _) // Yep, an Option[Either[L, R]]
.concat(Source.single(None)) // to emit when `in` completes
.statefulMapConcat { () =>
val buffer = new scala.collection.mutable.ArrayBuffer[R](groupSize)
def dumpBuffer(): List[Either[L, Seq[R]] = {
val out = List(Right(buffer.toList))
buffer.clear()
out
}
incoming: Option[Either[L,R]] => {
incoming.map { _.fold(
l => List(Left(l)), // unfortunate that we have to re-wrap
r => {
buffer += r
if (buffer.size == groupSize) {
dumpBuffer()
} else {
Nil
}
}
)
}.getOrElse(dumpBuffer()) // End of stream
}
}
Помимо этого, я отмечу, что нисходящий код в вызов внешней службы может быть переписан как
.map(_.right.map(callExternalService))
Если вы можете надежно вызвать внешнюю службу с параллелизмом n
, возможно, стоит также сделать это с помощью:
.mapAsync(n) { e.fold(
l => Future.successful(Left(l)),
r => Future { Right(callExternalService(r)) }
)
}
You может даже, если хотите увеличить пропускную способность за счет сохранения заказа, заменить mapAsync
на mapAsyncUnordered
.