Я пытаюсь использовать комбинацию Keep
в потоках akka и создаю следующий пример:
import java.nio.file.Paths
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, IOResult}
import akka.stream.scaladsl.{FileIO, Flow, Keep, Sink, Source}
import akka.util.ByteString
import scala.concurrent.Future
import scala.util.{Failure, Success}
object FileConsumer extends App {
implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer()
val source: Source[Int, NotUsed] = Source(1 to 100)
val factorials = source.scan(BigInt(1))((acc, next) => acc * next)
val result: Future[IOResult] =
factorials.map(_.toString).runWith(lineSink("factorial2.txt"))
implicit val ec = system.dispatcher
result.onComplete {
case Success(v) => println(s"Fileinfo ${ v.count }")
case Failure(e) => println(e)
}
def lineSink(filename: String): Sink[String, Future[IOResult]] =
Flow[String].map(s => ByteString(s + "\n")).toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)
}
на сайте akka streams там написано:
Полученный план представляет собой Sink[String, Future[IOResult]]
, который
означает, что он принимает строки в качестве входных данных и при их материализации
создаст вспомогательную информацию типа Future[IOResult]
(когда
цепочка операций над источником или потоком типа вспомогательного
информация, называемая «материализованной ценностью», дается самой левой
отправная точка; так как мы хотим сохранить то, что FileIO.toPath
раковина
должен предложить, мы должны сказать Keep.right
).
но что, когда я хочу оставить ByteString
на левой стороне, я попытался:
def lineSink2(filename: String): Sink[String, Future[ByteString]] =
Flow[String].map(s => ByteString(s + "\n")).toMat(Sink.foreach(println))(Keep.left)
но он не компилируется вообще.
Я тоже не понимаю:
задается самой левой начальной точкой
Самая левая начальная точка - Flow
?
Я думаю, я еще не понимаю идею Keep
.