Когда MyActor
получает сообщение Start
, оно запускает Akka Stream
и публикует sh каждый элемент, полученный на Akka Event Stream
.
class MyActor (implicit system: ActorSystem, materialize: Materializer, ec: ExecutionContext) extends Actor {
override def receive: Receive = {
case Start =>
someSource
.toMat(Sink.foreach(item => system.eventStream.publish(item)))(Keep.left)
.run()
}
}
Теперь в другом блоке кода я хотел бы построить Source
из этих элементов из этого потока событий, чтобы каждый опубликованный элемент мог быть обработан в другом Akka Stream
.
* 1011. * Как я могу это сделать?
На всякий случай, если это может добавить больше опций, обратите внимание, что другой рассматриваемый блок кода - это обработчик Play framework
Websocket.