Как добавить элемент в поток - PullRequest
0 голосов
/ 21 января 2020

У меня есть поток, в котором у меня есть два S3Sinks. В первом s3Sink я хочу файл без заголовков, а во втором s3Sink (s3SinkHeaders) я хочу, чтобы файл имел заголовки.

val header = "one,two,three"
Flow[MyRegister]
    .map(mrar => mrar.toCSV + "\n")
    .map(ByteString(_))
    .alsoToMat(sinkWithHeader)(Keep.right)
    .toMat(sinkWithoutHeader)(Keep.both)

Как можно потом добавить header только для sinkWithHeader, а не для sinkWithoutHeader?

Ответы [ 2 ]

0 голосов
/ 10 февраля 2020

Вам необходимо добавить предварительный поток к потоку приемника, который нуждается в заголовке, с источником, который будет излучать только один раз с информацией заголовка, а затем нормальный поток продолжится.

  private def sinkWithHeader: Sink[ByteString, NotUsed] = {
    Flow[ByteString]
      .prepend(Source.single(ByteString("one,two,three\n")))
      .to(Sink.ignore) //write to file
  }

  private def sinkWithoutHeader: Sink[ByteString, NotUsed] = {
    Flow[ByteString]
      .to(Sink.ignore) //write to file
  }

  val header = "one,two,three"
  Flow[MyRegister]
    .map(mrar => mrar.toCSV + "\n")
    .map(ByteString(_))
    .alsoToMat(sinkWithHeader)(Keep.right)
    .toMat(sinkWithoutHeader)(Keep.both)

Пожалуйста, прочитайте здесь о потоке [T] .prepend https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/prepend.html

0 голосов
/ 21 января 2020

Почему бы не Flow.prepend, который можно найти в документации ?

import akka.stream.scaladsl._

def foo[I, O, M](flow: Flow[I, O, M], head: O): Flow[I, O, M] =
  flow.prepend(Source single head)
...