У меня есть Source[Animal]
, где Animal
имеет 2 типа Cat
и Dog
. source
это что-то вроде dog1, dog2, dog3, cat1, dog4, dog5, cat2, cat3, dog6, dog7, dog8, dog9, dog10, dog11, dog12, cat4 ...
Я пытаюсь преобразовать его в следующее Source[Seq[Animal]] - (dog1, dog2, dog3, cat1), (dog4, dog5, cat2), (cat3), (dog6, dog7, dog8), (dog9, dog10, dog11), (dog12, cat4) ...
Как это работает:
- не более 3 собак в партии, не более 1 кошки в партии(в качестве альтернативы можно также решить следующее: не более 4 животных на одну партию, не более 1 кошки на партию)
- кошка должна быть только последним (иначе говоря, обрамляющим) элементом в партии
- также, я не могу показать скорость в примере, но должен быть тайм-аут, после которого партия (даже если не полная и без кошки) все еще испускается. Что-то вроде
groupedWithin(4, FiniteDuration(3, SECONDS))
- общий порядок важен и его необходимо поддерживать
Я пробовал что-то с batchWeighted
и groupedWithin
, но у меня нет правильногорешения пока нет.
Одна из идей, которую я попробовал, состояла в том, чтобы взвесить Dog
как 1
и Cat
как 1000
и использовать batchWeighted
с max weight = 1003
, но это не гарантирует, что Cat
всегда является последним элементом пакета ... Попытка сделать то же самое с max weight = 3
всегда ставит Cat
в отдельные группы.
Если был гибрид batchWithin
и takeWhile
(без завершения)тогда он, возможно, решил этот вариант использования.
Это довольно прямолинейная вещь, которую нужно решить, если она просто повторяется по List
, но из-за ограничения на использование FlowOps
делает это немного сложным
Редактировать: В настоящее время я делаю следующее:
.groupedWithin(4, FiniteDuration(4, SECONDS))
.map(frameBatch(_, Vector(), 0))
// groupedWithin internally returns a Vector so is fast for indexed operations
@tailrec
private def frameBatch(
items: Seq[Animal],
result: Vector[Seq[Animal]],
offset: Int
): Vector[Seq[Animal]] = {
val index = seq.indexWhere(!_.isDog, offset) // assume there's an isDog() for simplicity
if (index == -1) {
if (offset == 0) {
Vector(items)
} else {
result :+ items.slice(offset, items.size)
}
} else {
frameBatchAtSyncs(items, result :+ items.slice(offset, index), index + 1)
}
}