Как реагировать на завершение добычи в моем потоке? - PullRequest
0 голосов
/ 24 января 2020

Предположим, что поток потока Akka определен следующим образом:

def tee = {
  var writer: Writer = ???

  Flow.fromFunction[String, String] { msg =>
    writer.write(msg)
    msg
  }
}

Когда восходящий поток закончен, ему нужно выполнить sh закрытие писателя. Есть ли способ сделать это, не прибегая к GraphStageLogic et c, как описано здесь https://doc.akka.io/docs/akka/current/stream/stream-customize.html?

1 Ответ

1 голос
/ 24 января 2020

Невозможно сделать это с Flow без преобразования его в Sink.

Если опция является раковиной, выполните следующие действия:

  def tee = {
    val writer: Writer = new StringWriter()

    Sink
      .foreach[String] { msg =>
        writer.write(msg)
      }
      .mapMaterializedValue(_.map { done =>
        writer.close()
        done
      })
  }

Аналогичное действие можно выполнить с помощью akka.stream.scaladsl.StreamConverters следующим образом

    val sink: Sink[String, Future[IOResult]] = {
      StreamConverters.fromOutputStream(() => new org.apache.commons.io.output.WriterOutputStream(writer)).contramap[String](ByteString.apply)
    }
...