Как я могу заставить этот, вероятно, простой пример актера продюсера работать? - PullRequest
1 голос
/ 31 марта 2011

Я пытался написать простой класс продюсера, чтобы выучить актеров. Я хотел иметь производителя, который начинается с некоторого каталога, представленного объектом File, а затем отправляет сообщения другим субъектам для обработки файлов. Изначально я читал содержимое файлов, но, для простоты, теперь я просто собираю их пути. Еще раз, это не имеет значения реального мира, но оно имеет практическую ценность для меня, поскольку я думаю, что это позволит мне лучше понять актеров. Вот что у меня есть:

import java.io._
import java.util.concurrent._
import scala.actors.Actor
import scala.io.Source
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong

case class FileSystemObject(path:File)
case class FileContent(content:String)

case object Stop
case object Processed

class ResultAcumulator extends Actor {

    var results:List[String] = Nil
    var finished = false

    def act() = {
        loop {
            react {
                case FileContent(content) => {
                    results ::= content
                }
                case Stop => {
                    finished = true;
                    exit
                }
            }
        }
    }
}

class FileSystemReader(accumulator:Actor) extends Actor {
    def act() = {
        loop {
            react {
                case FileSystemObject(path) => {
                    if(path.isFile) {
                        accumulator ! FileContent(path.toString)
                        sender ! Processed
                    }
                }
                case Stop => exit
            }
        }
    }
}

class FileSystemProducer(start:File,acumulator:Actor,reader:Actor) extends Actor {
    var totalFilesProcessed = 0

    def act() = {
        val files = start.listFiles
        files.foreach{ f =>
            (reader ! FileSystemObject(f))
        }
        loop {
            react {
                case Processed => {
                    totalFilesProcessed += 1
                    if(totalFilesProcessed == files.length) {
                        reader ! Stop
                        acumulator ! Stop
                        Xo.decrementLatch
                    }
                }
            }
        }

    }
}
object Xo {

     var latch = new CountDownLatch(1)

     def decrementLatch = latch.countDown

     def main(args : Array[String]) = {

         val acumulator = new ResultAcumulator
         val fsReader = new FileSystemReader(acumulator)
         val producer = new FileSystemProducer(new File("d:/rails/a"),acumulator,fsReader)

         acumulator.start
         fsReader.start
         producer.start

         latch.await

         acumulator.results.foreach(println)

     }
}

В состоянии, в котором он сейчас находится, он работает вечно, и я не вижу выхода. Ах, еще одна вещь. Перед выходом из программы я бы хотел перечислить результаты "обработано". Я немного искал и нашел класс CountDownLatch. Я хотел бы, чтобы это было реализовано с помощью цикла / реакции, а не времени / получения. Я уверен, что проблема вызвана тем, что у меня есть эти строки:

files.foreach{ f =>
    (reader ! FileSystemObject(f))
}

и что у меня петля реагирования немного ниже, но я понятия не имею, как ее исправить.

1 Ответ

3 голосов
/ 31 марта 2011

Я думаю, что соответствующая часть - это case FileSystemObject (path) => {if (path.isFile) {аккумулятор!FileContent (path.toString) отправитель!Обработано}} Здесь вещи, которые не являются «обычными файлами», например, каталоги, не отправляются в аккумулятор.Так что, если в вашем "d: / rails / a" есть подкаталоги, тест totalFilesProcessed == files.length всегда будет неудачным.

...