Недавно я начал использовать Akka и использую его для создания REST API, используя Akka HTTP для загрузки файла. Файл может содержать миллионы записей, и для каждой записи мне нужно выполнить некоторую проверку и бизнес-логику. То, как я смоделировал свои акторы, корневой актор получает поток файлов, преобразует байты в строку и затем разделяет записи по разделителю строк. После этого он отправляет поток (запись за записью) другому субъекту для обработки, который, в свою очередь, передает записи другим субъектам на основе некоторой группировки. Чтобы отправить пар от основного корневого актера к актеру для обработки, я использую Sink.actorRefWithAck
.
Это нормально работает для небольшого файла, но для большого файла я наблюдал, что я получаю несколько чанков, и первый чанк обрабатывается. Если я добавлю Thread.sleep
на несколько секунд в зависимости от нагрузки, то он будет обрабатывать весь файл. Мне интересно, могу ли я узнать, был ли поток полностью обработан субъектом обработки, чтобы мне не приходилось иметь дело с Thread.sleep
. Вот фрагмент кода, который я использовал:
val AckMessage = DefaultFileUploadProcessActor.Ack
val receiver = context.system.actorOf(
Props(new DefaultFileUploadProcessActor(uuid, sourceId)(self, ackWith = AckMessage)))
// sent from stream to actor to indicate start, end or failure of stream:
val InitMessage = DefaultFileUploadProcessActor.StreamInitialized
val OnCompleteMessage = DefaultFileUploadProcessActor.StreamCompleted
val onErrorMessage = (ex: Throwable) => DefaultFileUploadProcessActor.StreamFailure(ex)
val actorSink = Sink.actorRefWithAck(
receiver,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = OnCompleteMessage,
onFailureMessage = onErrorMessage
)
val processStream =
fileStream
.map(byte => byte.utf8String.split(System.lineSeparator()))
.runWith(actorSink)
Thread.sleep(9000)
log.info(s"completed distribution of data to the actors")
sender() ! ActionPerformed(uuid, "Done")
Буду очень признателен любому совету эксперта по поводу подхода, который я выбрал.