Я предполагаю, что этот код предназначен только для целей этого вопроса, а ваши реальные требования немного отличаются.
Я предоставляю решение для потоков, и вы можете использовать что-то похожее на следующее, чтобы получить этореализация специального окна для вашего варианта использования.
import scala.collection.mutable
val stream = {
def loop(i: Int): Stream[(String, String)] = (s"Day$i", s"Data$i") #:: loop(i + 1)
loop(1)
}
def specialWindowedStream[T](source: Stream[T], window: Int): Stream[List[T]] = {
val queue = mutable.Queue.empty[T]
def loop(source: Stream[T]): Stream[List[T]] = {
queue.enqueue(source.head)
if (queue.size > window) {
queue.dequeue()
}
queue.toList #:: loop(source.tail)
}
loop(source)
}
val windowedStream = specialWindowedStream(stream, 5)
windowedStream.zipWithIndex.take(6).foreach(println)
// (List((Day1,Data1)),0)
// (List((Day1,Data1), (Day2,Data2)),1)
// (List((Day1,Data1), (Day2,Data2), (Day3,Data3)),2)
// (List((Day1,Data1), (Day2,Data2), (Day3,Data3), (Day4,Data4)),3)
// (List((Day1,Data1), (Day2,Data2), (Day3,Data3), (Day4,Data4),(Day5,Data5)),4)
// (List((Day2,Data2), (Day3,Data3), (Day4,Data4), (Day5,Data5),(Day6,Data6)),5)