Я пытался написать простой класс продюсера, чтобы выучить актеров. Я хотел иметь производителя, который начинается с некоторого каталога, представленного объектом File, а затем отправляет сообщения другим субъектам для обработки файлов. Изначально я читал содержимое файлов, но, для простоты, теперь я просто собираю их пути.
Еще раз, это не имеет значения реального мира, но оно имеет практическую ценность для меня, поскольку я думаю, что это позволит мне лучше понять актеров. Вот что у меня есть:
import java.io._
import java.util.concurrent._
import scala.actors.Actor
import scala.io.Source
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
case class FileSystemObject(path:File)
case class FileContent(content:String)
case object Stop
case object Processed
class ResultAcumulator extends Actor {
var results:List[String] = Nil
var finished = false
def act() = {
loop {
react {
case FileContent(content) => {
results ::= content
}
case Stop => {
finished = true;
exit
}
}
}
}
}
class FileSystemReader(accumulator:Actor) extends Actor {
def act() = {
loop {
react {
case FileSystemObject(path) => {
if(path.isFile) {
accumulator ! FileContent(path.toString)
sender ! Processed
}
}
case Stop => exit
}
}
}
}
class FileSystemProducer(start:File,acumulator:Actor,reader:Actor) extends Actor {
var totalFilesProcessed = 0
def act() = {
val files = start.listFiles
files.foreach{ f =>
(reader ! FileSystemObject(f))
}
loop {
react {
case Processed => {
totalFilesProcessed += 1
if(totalFilesProcessed == files.length) {
reader ! Stop
acumulator ! Stop
Xo.decrementLatch
}
}
}
}
}
}
object Xo {
var latch = new CountDownLatch(1)
def decrementLatch = latch.countDown
def main(args : Array[String]) = {
val acumulator = new ResultAcumulator
val fsReader = new FileSystemReader(acumulator)
val producer = new FileSystemProducer(new File("d:/rails/a"),acumulator,fsReader)
acumulator.start
fsReader.start
producer.start
latch.await
acumulator.results.foreach(println)
}
}
В состоянии, в котором он сейчас находится, он работает вечно, и я не вижу выхода. Ах, еще одна вещь. Перед выходом из программы я бы хотел перечислить результаты "обработано". Я немного искал и нашел класс CountDownLatch. Я хотел бы, чтобы это было реализовано с помощью цикла / реакции, а не времени / получения. Я уверен, что проблема вызвана тем, что у меня есть эти строки:
files.foreach{ f =>
(reader ! FileSystemObject(f))
}
и что у меня петля реагирования немного ниже, но я понятия не имею, как ее исправить.