Как создать источник потоков Akka из потока событий Akka? - PullRequest
0 голосов
/ 30 апреля 2020

Когда MyActor получает сообщение Start, оно запускает Akka Stream и публикует sh каждый элемент, полученный на Akka Event Stream.

class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {

  override def receive: Receive = {
    case Start =>
      someSource
        .toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
        .run()
  }
}

Теперь в другом блоке кода я хотел бы построить Source из этих элементов из этого потока событий, чтобы каждый опубликованный элемент мог быть обработан в другом Akka Stream.

* 1011. * Как я могу это сделать?

На всякий случай, если это может добавить больше опций, обратите внимание, что другой рассматриваемый блок кода - это обработчик Play framework Websocket.

Ответы [ 2 ]

1 голос
/ 30 апреля 2020

Я наконец-то заставил его работать с BroadcastHub , а не с Akka Event Stream.

Мой издатель (который сам потребляет источник) выглядит так:

val publisherSource = someSource
  .toMat(BroadcastHub.sink(bufferSize = 256))(Keep.right)
  .run()

Затем в другом блоке кода мне просто нужна ссылка на источник publisher:

val subscriberSource = publisherSource
  .map(...) // Whatever

Вы можете иметь столько subscriberSource, сколько захотите, они все получат одинаковые предметы.

1 голос
/ 30 апреля 2020

Это похоже на XY проблему . Если издатель и подписчик в конечном итоге разъединены, что должно произойти, если издатель создает данные быстрее, чем подписчик?

С учетом сказанного, вот способ сделать то, что вы просили:

/** Produce a source by subscribing to the Akka actorsystem event bus for a
  * specific event type.
  * 
  * @param bufferSize max number of events to buffer up in the source
  * @param overflowStrategy what to do if the event buffer fills up
  */
def itemSource[Item : ClassTag](
  bufferSize: Int = 1000,
  overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew
)(
  implicit system: ActorSystem
): Source[Item, NotUsed] = Source
  .lazily { () =>
    val (actorRef, itemSource) = Source
      .actorRef[Item](
        completionMatcher = PartialFunction.empty,
        failureMatcher = PartialFunction.empty,
        bufferSize,
        overflowStrategy
      )
      .preMaterialize()

    system.eventStream.subscribe(actorRef, classTag[Item].runtimeClass)

    itemSource
  }
  .mapMaterializedValue(_ => NotUsed)

...