akka.streams.Source, из которого вы можете выдавать значения (аналогично monix.BehaviorSubject) - PullRequest
0 голосов
/ 05 марта 2020

Я ищу akka.stream.scaladsl.Source метод построения, который позволил бы мне просто генерировать следующее значение из другого места кода (например, для просмотра системных событий).

  • Мне нужно что-то похожее на Promise. Обещание выдает одно значение Future. Мне нужно выдать несколько значений Source.
  • , например monix.reactive.subjects.BehaviorSubject.onNext(_)
  • . Меня не слишком волнует обратное давление.

В настоящее время у меня есть реализовал это, используя monix & akka-streams (код ниже), но я ожидаю, что должно быть решение akka-streams только:

import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global

val bs = BehaviorSubject("") //monix subject is sink and source at the same time

//this is how it is currently implemented
def createSource() = { 
    val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
    Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}

//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value

Ответы [ 4 ]

2 голосов
/ 05 марта 2020

Возможно, вы ищете Источник актера

Пример из документации:

import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource

trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol

val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
  case Complete =>
}, failureMatcher = {
  case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)

val ref = source
  .collect {
    case Message(msg) => msg
  }
  .to(Sink.foreach(println))
  .run()

ref ! Message("msg1")

Таким образом, вы сможете отправлять сообщения актеру через актера система, и эти сообщения будут отправлены из ActorSource вниз по потоку.

1 голос
/ 05 марта 2020

Я думаю, что вы ищете, sink.foreach .it вызывает данную процедуру для каждого полученного элемента. Я думаю, что код будет выглядеть так:

s1.runWith(Sink.foreach(write))

По сути, то, что делается для Поток источника, сток пытается записать каждый элемент этого потока.

РЕДАКТИРОВАТЬ

Я думаю, что вы ищете maybe. Он создает источник, который излучает, как только материализованное Обещание завершается со значением. Проверьте эту документацию

РЕДАКТИРОВАТЬ

futureSource также может работать. Он передает элементы данного будущего источника, как только он успешно завершит его.

Дайте мне знать, если это поможет !!

0 голосов
/ 05 марта 2020

https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html или https://doc.akka.io/docs/akka/current/stream/operators/Source/fromPublisher.html - это то, что вам нужно, в зависимости от того, откуда ваш источник получает данные.

0 голосов
/ 05 марта 2020

Source абстракция, как следует из названия, предоставляет API для работы с источником данных. Вместо этого вам нужно взглянуть на абстракцию, которая потребляет данные - Sink. И операция Sink.foreach - это то, что вы ищете, скорее всего: https://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html

В вашем случае код будет выглядеть примерно так:

import akka.stream.scaladsl.{Sink, Source}

val s1 = Source.// your WS akka stream source
s1.runWith(Sink.foreach(write))

Надеюсь, это поможет!

...