"Большой файл" может быть прочитан с использованием функции streaming FileIO
:
val file = Paths.get("example.csv")
val fileSource : Source[ByteString, Future[IOResult]] = FileIO.fromPath(file)
Этот источник может быть затем проанализирован для создания "подмножеств файла".Вопрос не в том, как создаются подмножества, поэтому давайте предположим, что мы хотим отправить 256
строк на каждый узел.Framing
может использоваться для анализа, а grouped
создаст фрагменты:
val separator = ByteString("\n")
val maxLineLength = 1024
val chunkSize = 256
val lineParser : Flow[ByteString, ByteString, _] =
Framing
.delimiter(separator, maximumFrameLength = maxLineLength, allowTruncation= false)
.grouped(chunkSize)
Затем эти фрагменты могут быть отправлены на удаленные узлы,Вопрос не был конкретным о том, как взаимодействовать с узлами, поэтому используется функция-заглушка:
val sendChunk : Seq[ByteString] => Unit = ???
val chunkSink : Sink[Seq[ByteString], _] =
Sink[Seq[ByteString]].foreach(sendChunk)
fileSource
.via(lineParser)
.to(chunkSink)
.run()