Акка досрочное прекращение - PullRequest
0 голосов
/ 15 февраля 2020

Я испытываю преждевременное прекращение работы AkkaSystem после прочтения первой записи, ввода мертвой буквы без выполнения задачи для всех записей

У меня 10 записей в моем файле, 2 записи имеют соответствующее имя файла, которое будет передано на s3.

что может быть не так Пожалуйста, предложите

Sample file record:
xxxxx,ABC,2019-05-10 00:11:00
yyyyyy,XYZ,2019-05-10 00:41:00
import akka.actor.{Actor, ActorSystem, Props}

import scala.io.Source
import scala.sys.process._

class HelloActor extends Actor {

  def receive: Receive = {
    case line: String => {
      //        print("Ahshan"+line)
      val row = line.split ( "," )
      val stdout = new StringBuilder
      val stderr = new StringBuilder
      val status = Seq ( "/bin/sh", "-c", "ls /Users/ahshan.md/Downloads/".concat ( row ( 2 ).substring ( 0, 10 ) ).concat ( "/*" ).concat ( row ( 0 ) ).concat ( "*" ) ) ! ProcessLogger ( stdout append _, stderr append _ )
      //        println(status)
      //        println("stdout: " + stdout)
      //        println("stderr: " + stderr)
      if (status == 0) {
        println ( "/bin/sh", "-c", "aws s3 cp ".concat ( stdout.mkString ).concat ( "  " ).concat ( "s3://ahshan/".concat ( row ( 1 ) ).concat ( "/" ).concat ( row ( 0 ) ).concat ( ".email" ) ) )
      }
      else {
        //      println ( "File Not Found:   " + row ( 0 ), row ( 1 ), row ( 2 ).substring ( 0, 10 ) )
        println ( "stderr: " + stderr )
      }
    }

    case "finished" => println ( "Hello Ahshan" )
    case _ => println ( "Exiting" )

  }

}


object AkkaHelloWorld extends App {

  // an actor needs an ActorSystem
  val system = ActorSystem ( "HelloSystem" )
  // create and start the actor
  val helloActor = system.actorOf ( Props [HelloActor], name = "helloActor" )

  try {
    val filename = "/Users/ahshan.md/Downloads/test.txt"
    for (line <- Source.fromFile ( filename ).getLines) {
      helloActor ! line
    }
  }
  finally {

system.terminate ()
  }
}

1 Ответ

0 голосов
/ 18 февраля 2020

ActorSystem и субъекты работают одновременно с вашим основным потоком, отправляя сообщение субъекту asyn c, и поток отправки немедленно продолжается, он не ожидает, пока субъект обработает сообщение.

Это означает, что в вашем коде основной поток запускает каждую строку для актера так быстро, как только может, и затем завершает систему актера. В этом примере может иметь смысл переместить логи завершения c в субъект и позволить субъекту завершить систему после ее завершения.

Это будет работать только в том случае, если ваше приложение является единственным действующим лицом, однако, как только вы добавите другого действующего лица, вам придется пересмотреть способ завершения работы системы.

В качестве дополнительной заметки с использованием API процесса в Scala будет блокировать до тех пор, пока процесс не будет завершен, вызов кода блокировки может иметь плохие последствия, для получения более подробной информации прочтите этот раздел документации: https://doc.akka.io/docs/akka/current/typed/dispatchers.html#blocking -needs-заботливое управление

...