Смысл комбинации Keep? - PullRequest
       9

Смысл комбинации Keep?

4 голосов
/ 12 апреля 2019

Я пытаюсь использовать комбинацию Keep в потоках akka и создаю следующий пример:

import java.nio.file.Paths

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.util.ByteString

import scala.concurrent.Future
import scala.util.{Failure, Success}

object FileConsumer extends App {

  implicit val system = ActorSystem("reactive-tweets")
  implicit val materializer = ActorMaterializer()

  val source: Source[Int, NotUsed] = Source(1 to 100)
  val factorials = source.scan(BigInt(1))((acc, next) => acc * next)

  val result: Future[IOResult] =
    factorials.map(_.toString).runWith(lineSink("factorial2.txt"))

  implicit val ec = system.dispatcher
  result.onComplete {
    case Success(v) => println(s"Fileinfo ${ v.count }")
    case Failure(e) => println(e)
  }

  def lineSink(filename: String): Sink[String, Future[IOResult]] =
    Flow[String].map(s => ByteString(s + "\n")).toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)


} 

на сайте akka streams там написано:

Полученный план представляет собой Sink[String, Future[IOResult]], который означает, что он принимает строки в качестве входных данных и при их материализации создаст вспомогательную информацию типа Future[IOResult] (когда цепочка операций над источником или потоком типа вспомогательного информация, называемая «материализованной ценностью», дается самой левой отправная точка; так как мы хотим сохранить то, что FileIO.toPath раковина должен предложить, мы должны сказать Keep.right).

но что, когда я хочу оставить ByteString на левой стороне, я попытался:

  def lineSink2(filename: String): Sink[String, Future[ByteString]] =
Flow[String].map(s => ByteString(s + "\n")).toMat(Sink.foreach(println))(Keep.left)

но он не компилируется вообще.

Я тоже не понимаю:

задается самой левой начальной точкой

Самая левая начальная точка - Flow?

Я думаю, я еще не понимаю идею Keep.

1 Ответ

4 голосов
/ 12 апреля 2019

Определение Sink.foreach выглядит следующим образом:

def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]

Это означает, что материализованным значением является Future [Done]

В случае потока у вас есть:

 val value: Flow[String, ByteString, NotUsed] = Flow[String].map(s => ByteString(s + "\n"))

его материализованным значением является NotUsed

В этом случае:

Keep.left - NotUsed - материализованное значение источника или потока

Keep.right - Future [Done] - Materalised Value Sink

Keep.both - (NotUsed, Future [Done])

Важным фактом является материализованное значение, во многих случаях НЕ значение элементов, протекающих через поток, а скорее

  • диагностическая информация
  • состояние потока
  • другой видинформация о потоке
...