Я ищу поток FIFO в Scala, т.е. что-то, что обеспечивает функциональность
- immutable.Stream (поток, который может быть конечным и запоминающимэлементы, которые уже были прочитаны)
- mutable.Queue (что позволяет добавлять элементы в FIFO)
Поток должен быть закрытым и должен блокировать доступк следующему элементу, пока элемент не будет добавлен или поток не будет закрыт.
На самом деле я немного удивлен, что библиотека коллекции не включает (кажется, что) такую структуру данных, так как это IMOдовольно классический.
Мои вопросы:
1) Я что-то упустил?Уже существует класс, обеспечивающий эту функциональность?
2) Хорошо, если он не включен в библиотеку коллекции, он может быть просто тривиальной комбинацией существующих классов коллекции.Тем не менее, я попытался найти этот тривиальный код, но моя реализация выглядит все еще довольно сложной для такой простой проблемы.Есть ли более простое решение для такого FifoStream?
class FifoStream[T] extends Closeable {
val queue = new Queue[Option[T]]
lazy val stream = nextStreamElem
private def nextStreamElem: Stream[T] = next() match {
case Some(elem) => Stream.cons(elem, nextStreamElem)
case None => Stream.empty
}
/** Returns next element in the queue (may wait for it to be inserted). */
private def next() = {
queue.synchronized {
if (queue.isEmpty) queue.wait()
queue.dequeue()
}
}
/** Adds new elements to this stream. */
def enqueue(elems: T*) {
queue.synchronized {
queue.enqueue(elems.map{Some(_)}: _*)
queue.notify()
}
}
/** Closes this stream. */
def close() {
queue.synchronized {
queue.enqueue(None)
queue.notify()
}
}
}
Решение Paradigmatic (слегка изменено)
Спасибо за ваши предложения.Я немного изменил решение парадигмы, чтобы toStream возвращал неизменный поток (допускает повторяемое чтение), чтобы он соответствовал моим потребностям.Просто для полноты вот код:
import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
lazy val toStream: Stream[A] = queue2stream
private def queue2stream: Stream[A] = queue take match {
case Some(a) => Stream cons ( a, queue2stream )
case None => Stream empty
}
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}