"tee" Scala Streams / Итераторы - PullRequest
0 голосов
/ 20 декабря 2018

У меня есть последовательный источник данных, представленный в виде простого итератора (или потока).Данные довольно большие и не умещаются в памяти.Кроме того, источник может быть пройден один раз и требует больших затрат.Этот источник используется в какой-то тяжелой процедуре (черный ящик), которая принимает Iterator (или Stream) в качестве аргумента для линейного потребления данных.Хорошо, это просто.Но что я могу сделать, если у меня есть две разные такие процедуры потребления?Как я уже сказал, я не хочу засасывать входные данные в коллекцию, например ListЯ также могу выполнить свою задачу, перечитав источник дважды с самого начала, но мне это не нравится, потому что он не эффективен.На самом деле мне нужно «тройник» (своего рода клон) итератор (или поток), чтобы использовать один дважды два параллельных процесса, не кэшируя его в коллекции памяти.Я полагаю, что такой подход должен оказывать противодавление или, скорее, ограничивать родного брата, если он слишком быстро потребляет исходный поток.Возможно, эффективное решение должно иметь некоторый параллельный безопасный буфер очереди.Кто-нибудь знает, как это сделать в Scala (или с использованием любых внешних потоковых библиотек / фреймворков)?

PS Я нашел похожий вопрос на 4 года: Один восходящий поток, питающий несколько нисходящих потоков Разница в том, что я спрашиваю, как это сделать, используя стандартные итераторы Scala (или Streams) или, что лучше, какую-нибудь существующую библиотеку.

1 Ответ

0 голосов
/ 20 декабря 2018

Вы должны проверить fs2 streams .Пример читает из файла и записывает в другой файл постепенно, используя постоянную память.Вот как вы можете изменить их пример для записи в два файла:

...

io.file.readAll[IO](Paths.get("testdata/fahrenheit.txt"), blockingEC, 4096)
  .through(text.utf8Decode)
  .through(text.lines)
  .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
  .map(line => fahrenheitToCelsius(line.toDouble).toString)
  .intersperse("\n")
  .through(text.utf8Encode)
  .observe(io.file.writeAll(Paths.get("testdata/celsius.txt"), blockingEC))
  .through(io.file.writeAll(Paths.get("testdata/celsius2.txt"), blockingEC))

...
...