Поместите элементы в поток и верните объект - PullRequest
2 голосов
/ 07 февраля 2020

В akka я хочу поместить элементы в поток и вернуть объект. Я знаю, что элементы могут быть источником для запуска графика. Но как я могу поместить элемент и вернуть объект во время выполнения?


import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.Array.range
import scala.util.Success

object StreamElement {

  implicit val system = ActorSystem("StreamElement")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  def main(args: Array[String]): Unit = {
    val (queue, value) = Source
    .queue[Int](10, OverflowStrategy.backpressure)
    .map(x => {
      x * x
    })
    .toMat(Sink.asPublisher(false))(Keep.both)
    .run()

    range(0, 10)
      .map(x => {
        queue.offer(x).onComplete {
          case Success(Enqueued) => {
          }

          case Success(Dropped) => {}
          case _ => {
             println("others")
          }
        }
      })
    }
}

Как я могу получить возвращенное значение?

Ответы [ 2 ]

2 голосов
/ 07 февраля 2020

На самом деле вы хотите вернуть значение int для каждого элемента. Таким образом, вы можете создать поток, а затем подключаться к источнику и Sink каждый раз.


package tech.parasol.scala.akka

import akka.actor.ActorSystem
import akka.stream.QueueOfferResult.{Dropped, Enqueued, Failure, QueueClosed}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}

import scala.Array.range
import scala.util.Success

object StreamElement {

  implicit val system = ActorSystem("StreamElement")
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = system.dispatcher

  val flow = Flow[Int]
    .buffer(16, OverflowStrategy.backpressure)
    .map(x => x * x)

  def main(args: Array[String]): Unit = {
    range(0, 10)
      .map(x => {
        Source.single(x).via(flow).runWith(Sink.head)
      }.map( v => println("v ===> " + v)
      ))
  }

}


1 голос
/ 08 февраля 2020

Мне непонятно, почему коллекция Scala не подается в Stream в качестве источника в вашем примере кода. Учитывая, что вы уже создали поток с материализованными значениями для захвата в очереди источника и приемнике издателя, вы можете создать источник подписчика, используя Source.fromPublisher для сбора требуемых значений, как показано ниже:

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream._

implicit val system = ActorSystem("system")
implicit val materializer = ActorMaterializer()  // Not needed for Akka 2.6+

val (queue, pub) = Source
  .queue[Int](10, OverflowStrategy.backpressure)
  .map(x =>  x * x)
  .toMat(Sink.asPublisher(false))(Keep.both)
  .run()

val fromQueue = Source(0 until 10).runForeach(queue.offer(_))

val source = Source.fromPublisher(pub)

source.runForeach(x => print(x + " "))
// Output:
// 0 1 4 9 16 25 36 49 64 81 
...