Есть ли поток FIFO в Scala? - PullRequest
       40

Есть ли поток FIFO в Scala?

14 голосов
/ 26 сентября 2011

Я ищу поток FIFO в Scala, т.е. что-то, что обеспечивает функциональность

  • immutable.Stream (поток, который может быть конечным и запоминающимэлементы, которые уже были прочитаны)
  • mutable.Queue (что позволяет добавлять элементы в FIFO)

Поток должен быть закрытым и должен блокировать доступк следующему элементу, пока элемент не будет добавлен или поток не будет закрыт.

На самом деле я немного удивлен, что библиотека коллекции не включает (кажется, что) такую ​​структуру данных, так как это IMOдовольно классический.

Мои вопросы:

  • 1) Я что-то упустил?Уже существует класс, обеспечивающий эту функциональность?

  • 2) Хорошо, если он не включен в библиотеку коллекции, он может быть просто тривиальной комбинацией существующих классов коллекции.Тем не менее, я попытался найти этот тривиальный код, но моя реализация выглядит все еще довольно сложной для такой простой проблемы.Есть ли более простое решение для такого FifoStream?

    class FifoStream[T] extends Closeable {
    
    val queue = new Queue[Option[T]]
    
    lazy val stream = nextStreamElem
    
    private def nextStreamElem: Stream[T] = next() match {
        case Some(elem) => Stream.cons(elem, nextStreamElem)
        case None       => Stream.empty
    }
    
    /** Returns next element in the queue (may wait for it to be inserted). */
    private def next() = {
        queue.synchronized {
            if (queue.isEmpty) queue.wait()
            queue.dequeue()
        }
    }
    
    /** Adds new elements to this stream. */
    def enqueue(elems: T*) {
        queue.synchronized {
            queue.enqueue(elems.map{Some(_)}: _*)
            queue.notify()
        }
    }
    
    /** Closes this stream. */
    def close() {
        queue.synchronized {
            queue.enqueue(None)
            queue.notify()
        }
    }
    }
    

Решение Paradigmatic (слегка изменено)

Спасибо за ваши предложения.Я немного изменил решение парадигмы, чтобы toStream возвращал неизменный поток (допускает повторяемое чтение), чтобы он соответствовал моим потребностям.Просто для полноты вот код:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

Ответы [ 3 ]

15 голосов
/ 27 сентября 2011

В Scala потоки являются «функциональными итераторами». Люди ожидают, что они будут чистыми (без побочных эффектов) и неизменными. В вашем случае, каждый раз, когда вы выполняете итерацию в потоке, вы изменяете очередь (так что это не чисто). Это может создать много недоразумений, поскольку повторение дважды одного и того же потока приведет к двум разным результатам.

При этом вы должны использовать Java BlockingQueues, а не использовать собственную реализацию. Они считаются хорошо реализованными с точки зрения безопасности и производительности. Вот самый чистый код, который я могу придумать (используя ваш подход):

import java.util.concurrent.BlockingQueue
import scala.collection.JavaConversions._

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] ) {
  def toStream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, toStream )
    case None => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

object FIFOStream {
  def apply[A]() = new LinkedBlockingQueue
}
2 голосов
/ 26 сентября 2011

Я предполагаю, что вы ищете что-то вроде java.util.concurrent.BlockingQueue ?

Akka имеет BoundedBlockingQueue реализацию этого интерфейса. Конечно, есть реализации, доступные в java.util.concurrent .

Вы можете также рассмотреть возможность использования актеров от Akka для всего, что вы делаете. Используйте актеров, чтобы получать уведомления или выдвигать новое событие или сообщение вместо извлечения.

0 голосов
/ 26 сентября 2011

1) Похоже, вы ищете поток потока данных, который можно увидеть на таких языках, как Oz , который поддерживает шаблон производителя-потребителя.Такая коллекция недоступна в API коллекций, но вы всегда можете создать ее самостоятельно.

2) Поток данных опирается на концепцию переменных единственного назначения (так что онине нужно инициализироваться в точке объявления, и чтение их до инициализации вызывает блокировку):

val x: Int
startThread {
  println(x)
}
println("The other thread waits for the x to be assigned")
x = 1

Было бы просто реализовать такой поток, если бы переменные с одним присваиванием (или поток данных) поддерживались вязык (см. ссылка ).Поскольку они не являются частью Scala, вы должны использовать шаблон wait - synchronized - notify, как вы это сделали.

Можно использовать параллельные очереди из Java чтобы достичь этого, как и предложил другой пользователь.

...