Akka Streams: объединение потоков, которые инициализируют ресурсы - PullRequest
0 голосов
/ 10 мая 2018

У меня есть два потока: A и B .

  • A записывает файл на диск и добавляет его элементы во время выполнения.
  • B читает этот файл и выполняет некоторую обработку данных.

Только ПОСЛЕ A выполняется обработка и запись его данных в файл, я хочу начать с B .

Я пытался concat два потока с:

A.concat(
  Source.lazily { () =>
    println("B is getting initialised")
    getStreamForB()
  }
)

Но это уже инициализация B ДО A завершена.

Ответы [ 2 ]

0 голосов
/ 10 мая 2018

Существует тикет , отслеживающий тот факт, что Source#concat не поддерживает ленивую материализацию. В этом билете упоминается следующий обходной путь:

implicit class SourceLazyOps[E, M](val src: Source[E, M]) {
  def concatLazy[M1](src2: => Source[E, M1]): Source[E, NotUsed] =
    Source(List(() => src, () => src2)).flatMapConcat(_())
}

Применение вышеуказанного неявного класса к вашему случаю:

A.concatLazy(
  Source.lazily { () =>
    println("B is getting initialised")
    getStreamForB()
  }
)
0 голосов
/ 10 мая 2018

Метод FileIO.toPath материализует поток в Future[IOResult]. Если вы работаете с потоком A, который записывает в файл:

val someDataSource : Source[ByteString, _] = ???

val filePath : Path = ???

val fileWriteOptions : Set[OpenOption] = ???

val A : Future[IOResult] = 
  someDataSource
    .to(FileIO.toPath(filePath, fileWriteOptions))
    .run()

Вы можете использовать материализованное Будущее, чтобы запустить ваш поток B, как только запись будет завершена:

val fileReadOptions : Set[OpenOption] = ???

val someProcessingWithTheDataOfB : Sink[ByteString, _] = ???

A foreach { _ =>

  val B : Future[IOResult] = 
    FileIO
      .fromPath(filePath, fileReadOptions)
      .to(someProcessingWithTheDataOfB)
      .run()
}

Аналогично, вы можете провести некоторое тестирование IOResult перед чтением, чтобы убедиться, что в процессе записи не было сбоев:

A.filter(ioResult => ioResult.status.isSuccess)
 .foreach { _ =>
   val B : Future[IOResult] = 
     FileIO
       .fromPath(filePath, readOptions)
       .to(someProcessingOfB)
       .run()
 }
...