Как ждать завершения загрузки файла в актере Akka - PullRequest
0 голосов
/ 04 сентября 2018

Недавно я начал использовать Akka и использую его для создания REST API, используя Akka HTTP для загрузки файла. Файл может содержать миллионы записей, и для каждой записи мне нужно выполнить некоторую проверку и бизнес-логику. То, как я смоделировал свои акторы, корневой актор получает поток файлов, преобразует байты в строку и затем разделяет записи по разделителю строк. После этого он отправляет поток (запись за записью) другому субъекту для обработки, который, в свою очередь, передает записи другим субъектам на основе некоторой группировки. Чтобы отправить пар от основного корневого актера к актеру для обработки, я использую Sink.actorRefWithAck.

Это нормально работает для небольшого файла, но для большого файла я наблюдал, что я получаю несколько чанков, и первый чанк обрабатывается. Если я добавлю Thread.sleep на несколько секунд в зависимости от нагрузки, то он будет обрабатывать весь файл. Мне интересно, могу ли я узнать, был ли поток полностью обработан субъектом обработки, чтобы мне не приходилось иметь дело с Thread.sleep. Вот фрагмент кода, который я использовал:

val AckMessage = DefaultFileUploadProcessActor.Ack
val receiver = context.system.actorOf(
  Props(new DefaultFileUploadProcessActor(uuid, sourceId)(self, ackWith = AckMessage)))
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = DefaultFileUploadProcessActor.StreamInitialized
val OnCompleteMessage = DefaultFileUploadProcessActor.StreamCompleted
val onErrorMessage = (ex: Throwable) => DefaultFileUploadProcessActor.StreamFailure(ex)

val actorSink = Sink.actorRefWithAck(
  receiver,
  onInitMessage = InitMessage,
  ackMessage = AckMessage,
  onCompleteMessage = OnCompleteMessage,
  onFailureMessage = onErrorMessage
)

val processStream =
  fileStream
    .map(byte => byte.utf8String.split(System.lineSeparator()))
    .runWith(actorSink)

Thread.sleep(9000)
log.info(s"completed distribution of data to the actors")
sender() ! ActionPerformed(uuid, "Done")

Буду очень признателен любому совету эксперта по поводу подхода, который я выбрал.

Ответы [ 3 ]

0 голосов
/ 04 сентября 2018

Актер receiver получит OnCompleteMessage или onErrorMessage после успешного завершения потока или с ошибкой, поэтому вы должны обработать эти сообщения в блоке receive получателя DefaultFileUploadProcessActor субъекта.

0 голосов
/ 04 сентября 2018

Предполагая, что fileStream является Source[ByteString, Future[IOResult], одна идея состоит в том, чтобы сохранить материализованное значение источника, а затем запустить ответ на sender(), как только это материализованное значение завершится:

val processStream: Future[IOResult] =
  fileStream
    .map(_.utf8String.split(System.lineSeparator()))
    .to(actorSink)
    .run()

processStream.onComplete {
  case Success(_) =>
    log.info("completed distribution of data to the actors")
    sender() ! ActionPerformed(uuid, "Done")
  case Failure(t) =>
    // ...
}

Приведенный выше подход гарантирует, что весь файл будет использован до того, как отправитель получит уведомление.

Обратите внимание, что Akka Streams имеет объект Framing, который может анализировать строки из потока ByteString:

val processStream: Future[IOResult] =
  fileStream
    .via(Framing.delimiter(
      ByteString(System.lineSeparator()),
      maximumFrameLenght = 256,
      allowTruncation = true))
    .map(_.ut8String)
    .to(actorSink) // the actor will have to expect String, not Array[String], messages
    .run()
0 голосов
/ 04 сентября 2018

Если у вас есть Source только с одним файлом, вы можете дождаться завершения потока, ожидая Future, который возвращается из метода runWith.

Если у вас есть источник нескольких файлов, вы должны написать что-то вроде:

filesSource
  .mapAsync(1)(data => (receiver ? data).mapTo[ProcessingResult])
  .mapAsync(1)(processingResult => (resultListener ? processingResult).mapTo[ListenerResponse])
  .runWith(Sink.ignore)
...